summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--.codespellrc2
-rw-r--r--.git-blame-ignore-revs2
-rw-r--r--MANIFEST.in2
-rw-r--r--README.rst13
-rw-r--r--dev-requirements.txt6
-rw-r--r--paramiko/__init__.py25
-rw-r--r--paramiko/_version.py2
-rw-r--r--paramiko/agent.py73
-rw-r--r--paramiko/auth_handler.py193
-rw-r--r--paramiko/auth_strategy.py306
-rw-r--r--paramiko/client.py30
-rw-r--r--paramiko/dsskey.py15
-rw-r--r--paramiko/ecdsakey.py8
-rw-r--r--paramiko/ed25519key.py17
-rw-r--r--paramiko/hostkeys.py47
-rw-r--r--paramiko/pkey.py200
-rw-r--r--paramiko/rsakey.py18
-rw-r--r--paramiko/server.py4
-rw-r--r--paramiko/ssh_exception.py5
-rw-r--r--paramiko/transport.py200
-rw-r--r--pytest.ini7
-rw-r--r--sites/docs/api/auth.rst8
-rw-r--r--sites/docs/index.rst1
-rw-r--r--sites/www/changelog.rst126
-rw-r--r--tasks.py22
-rw-r--r--tests/_loop.py (renamed from tests/loop.py)0
-rw-r--r--tests/_stub_sftp.py (renamed from tests/stub_sftp.py)0
-rw-r--r--tests/_support/dss.key (renamed from tests/cert_support/test_dss.key)0
-rw-r--r--tests/_support/dss.key-cert.pub (renamed from tests/cert_support/test_dss.key-cert.pub)0
-rw-r--r--tests/_support/ecdsa-256.key (renamed from tests/cert_support/test_ecdsa_256.key)0
-rw-r--r--tests/_support/ecdsa-256.key-cert.pub (renamed from tests/cert_support/test_ecdsa_256.key-cert.pub)0
-rw-r--r--tests/_support/ed25519.key (renamed from tests/cert_support/test_ed25519.key)0
-rw-r--r--tests/_support/ed25519.key-cert.pub (renamed from tests/cert_support/test_ed25519.key-cert.pub)0
-rw-r--r--tests/_support/ed448.key4
-rw-r--r--tests/_support/rsa-lonely.key (renamed from tests/cert_support/test_rsa.key)0
-rw-r--r--tests/_support/rsa-missing.key-cert.pub (renamed from tests/cert_support/test_rsa.key-cert.pub)0
-rw-r--r--tests/_support/rsa.key (renamed from tests/test_rsa.key)0
-rw-r--r--tests/_support/rsa.key-cert.pub1
-rw-r--r--tests/_util.py441
-rw-r--r--tests/agent.py151
-rw-r--r--tests/auth.py580
-rw-r--r--tests/conftest.py75
-rw-r--r--tests/pkey.py229
-rw-r--r--tests/test_agent.py50
-rw-r--r--tests/test_auth.py272
-rw-r--r--tests/test_client.py69
-rw-r--r--tests/test_config.py2
-rw-r--r--tests/test_dss.key12
-rw-r--r--tests/test_ecdsa_256.key5
-rw-r--r--tests/test_ed25519.key8
-rw-r--r--tests/test_file.py2
-rw-r--r--tests/test_gssapi.py2
-rw-r--r--tests/test_hostkeys.py25
-rw-r--r--tests/test_kex_gss.py6
-rw-r--r--tests/test_packetizer.py2
-rw-r--r--tests/test_pkey.py121
-rw-r--r--tests/test_sftp.py4
-rw-r--r--tests/test_sftp_big.py2
-rw-r--r--tests/test_ssh_gss.py8
-rw-r--r--tests/test_transport.py318
-rw-r--r--tests/test_util.py13
-rw-r--r--tests/util.py174
62 files changed, 2812 insertions, 1096 deletions
diff --git a/.codespellrc b/.codespellrc
index f6935aa7..a8d619e5 100644
--- a/.codespellrc
+++ b/.codespellrc
@@ -4,4 +4,4 @@ skip = venvs,.venv,.git,build,*.egg-info,*.lock,*.js,*.css,docs
# Certain words AUTHOR feels strongly about, plus various proper names that are
# close enough to real words that they anger codespell. (NOTE: for some reason
# codespell wants the latter listed in all-lowercase...!)
-ignore-words-list = keypair,flage
+ignore-words-list = keypair,flage,lew,welp,strat
diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs
new file mode 100644
index 00000000..f8f0117c
--- /dev/null
+++ b/.git-blame-ignore-revs
@@ -0,0 +1,2 @@
+# blackening
+7f2c35052183b400827d9949a68b41c90f90a32d
diff --git a/MANIFEST.in b/MANIFEST.in
index c7de9097..8c6fc5d9 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -1,4 +1,4 @@
-include LICENSE setup_helper.py
+include LICENSE setup_helper.py pytest.ini
recursive-include docs *
recursive-include tests *.py *.key *.pub
recursive-include tests/configs *
diff --git a/README.rst b/README.rst
index 059d152c..ef7b8ec6 100644
--- a/README.rst
+++ b/README.rst
@@ -41,8 +41,11 @@ personal site.
<https://www.paramiko.org/installing.html>`_ for details.
.. [#]
- SSH is defined in :rfc-reference:`4251`, :rfc-reference:`4252`,
- :rfc-reference:`4253` and :rfc-reference:`4254`. The primary working
- implementation of the protocol is the `OpenSSH project
- <http://openssh.org>`_. Paramiko implements a large portion of the SSH
- feature set, but there are occasional gaps.
+ OpenSSH's RFC specification page is a fantastic resource and collection of
+ links that we won't bother replicating here:
+ https://www.openssh.com/specs.html
+
+ OpenSSH itself also happens to be our primary reference implementation:
+ when in doubt, we consult how they do things, unless there are good reasons
+ not to. There are always some gaps, but we do our best to reconcile them
+ when possible.
diff --git a/dev-requirements.txt b/dev-requirements.txt
index 26d0efa4..43d01e0d 100644
--- a/dev-requirements.txt
+++ b/dev-requirements.txt
@@ -1,6 +1,6 @@
# Invocations for common project tasks
invoke>=2.0
-invocations>=3.0
+invocations>=3.2
# Testing!
pytest-relaxed>=2
# pytest-xdist for test dir watching and the inv guard task
@@ -10,12 +10,14 @@ flake8>=4,<5
# Formatting!
black>=22.8,<22.9
# Spelling!
-codespell>=2.2,<2.3
+# TODO Python 3.7: newer codespell has upgraded lists
+codespell>=2.2.1,<2.3
# Coverage!
coverage>=6.2,<7
# Documentation tools
alabaster==0.7.13
releases>=2.1
+watchdog<2
# Debuggery
icecream>=2.1
# Self (sans GSS which is a pain to bother with most of the time)
diff --git a/paramiko/__init__.py b/paramiko/__init__.py
index cbc240a6..476062ef 100644
--- a/paramiko/__init__.py
+++ b/paramiko/__init__.py
@@ -19,7 +19,11 @@
# flake8: noqa
import sys
from paramiko._version import __version__, __version_info__
-from paramiko.transport import SecurityOptions, Transport
+from paramiko.transport import (
+ SecurityOptions,
+ Transport,
+ ServiceRequestingTransport,
+)
from paramiko.client import (
SSHClient,
MissingHostKeyPolicy,
@@ -28,6 +32,18 @@ from paramiko.client import (
WarningPolicy,
)
from paramiko.auth_handler import AuthHandler
+from paramiko.auth_strategy import (
+ AuthFailure,
+ AuthStrategy,
+ AuthResult,
+ AuthSource,
+ InMemoryPrivateKey,
+ NoneAuth,
+ OnDiskPrivateKey,
+ Password,
+ PrivateKey,
+ SourceResult,
+)
from paramiko.ssh_gss import GSSAuth, GSS_AUTH_AVAILABLE, GSS_EXCEPTIONS
from paramiko.channel import (
Channel,
@@ -63,7 +79,7 @@ from paramiko.message import Message
from paramiko.packet import Packetizer
from paramiko.file import BufferedFile
from paramiko.agent import Agent, AgentKey
-from paramiko.pkey import PKey, PublicBlob
+from paramiko.pkey import PKey, PublicBlob, UnknownKeyType
from paramiko.hostkeys import HostKeys
from paramiko.config import SSHConfig, SSHConfigDict
from paramiko.proxy import ProxyCommand
@@ -94,9 +110,14 @@ from paramiko.sftp import (
from paramiko.common import io_sleep
+# TODO: I guess a real plugin system might be nice for future expansion...
+key_classes = [DSSKey, RSAKey, Ed25519Key, ECDSAKey]
+
+
__author__ = "Jeff Forcier <jeff@bitprophet.org>"
__license__ = "GNU Lesser General Public License (LGPL)"
+# TODO 4.0: remove this, jeez
__all__ = [
"Agent",
"AgentKey",
diff --git a/paramiko/_version.py b/paramiko/_version.py
index ba3f1b3c..a008f109 100644
--- a/paramiko/_version.py
+++ b/paramiko/_version.py
@@ -1,2 +1,2 @@
-__version_info__ = (3, 1, 0)
+__version_info__ = (3, 2, 0)
__version__ = ".".join(map(str, __version_info__))
diff --git a/paramiko/agent.py b/paramiko/agent.py
index 30ec1590..b29a0d14 100644
--- a/paramiko/agent.py
+++ b/paramiko/agent.py
@@ -28,13 +28,14 @@ import threading
import time
import tempfile
import stat
+from logging import DEBUG
from select import select
from paramiko.common import io_sleep, byte_chr
from paramiko.ssh_exception import SSHException, AuthenticationException
from paramiko.message import Message
-from paramiko.pkey import PKey
-from paramiko.util import asbytes
+from paramiko.pkey import PKey, UnknownKeyType
+from paramiko.util import asbytes, get_logger
cSSH2_AGENTC_REQUEST_IDENTITIES = byte_chr(11)
SSH2_AGENT_IDENTITIES_ANSWER = 12
@@ -52,8 +53,11 @@ ALGORITHM_FLAG_MAP = {
"rsa-sha2-256": SSH_AGENT_RSA_SHA2_256,
"rsa-sha2-512": SSH_AGENT_RSA_SHA2_512,
}
+for key, value in list(ALGORITHM_FLAG_MAP.items()):
+ ALGORITHM_FLAG_MAP[f"{key}-cert-v01@openssh.com"] = value
+# TODO 4.0: rename all these - including making some of their methods public?
class AgentSSH:
def __init__(self):
self._conn = None
@@ -81,8 +85,13 @@ class AgentSSH:
raise SSHException("could not get keys from ssh-agent")
keys = []
for i in range(result.get_int()):
- keys.append(AgentKey(self, result.get_binary()))
- result.get_string()
+ keys.append(
+ AgentKey(
+ agent=self,
+ blob=result.get_binary(),
+ comment=result.get_text(),
+ )
+ )
self._keys = tuple(keys)
def _close(self):
@@ -417,31 +426,69 @@ class AgentKey(PKey):
Private key held in a local SSH agent. This type of key can be used for
authenticating to a remote server (signing). Most other key operations
work as expected.
+
+ .. versionchanged:: 3.2
+ Added the ``comment`` kwarg and attribute.
+
+ .. versionchanged:: 3.2
+ Added the ``.inner_key`` attribute holding a reference to the 'real'
+ key instance this key is a proxy for, if one was obtainable, else None.
"""
- def __init__(self, agent, blob):
+ def __init__(self, agent, blob, comment=""):
self.agent = agent
self.blob = blob
- self.public_blob = None
- self.name = Message(blob).get_text()
+ self.comment = comment
+ msg = Message(blob)
+ self.name = msg.get_text()
+ self._logger = get_logger(__file__)
+ self.inner_key = None
+ try:
+ self.inner_key = PKey.from_type_string(
+ key_type=self.name, key_bytes=blob
+ )
+ except UnknownKeyType:
+ # Log, but don't explode, since inner_key is a best-effort thing.
+ err = "Unable to derive inner_key for agent key of type {!r}"
+ self.log(DEBUG, err.format(self.name))
- def asbytes(self):
- return self.blob
+ def log(self, *args, **kwargs):
+ return self._logger.log(*args, **kwargs)
- def __str__(self):
- return self.asbytes()
+ def asbytes(self):
+ # Prefer inner_key.asbytes, since that will differ for eg RSA-CERT
+ return self.inner_key.asbytes() if self.inner_key else self.blob
def get_name(self):
return self.name
+ def get_bits(self):
+ # Have to work around PKey's default get_bits being crap
+ if self.inner_key is not None:
+ return self.inner_key.get_bits()
+ return super().get_bits()
+
+ def __getattr__(self, name):
+ """
+ Proxy any un-implemented methods/properties to the inner_key.
+ """
+ if self.inner_key is None: # nothing to proxy to
+ raise AttributeError(name)
+ return getattr(self.inner_key, name)
+
@property
def _fields(self):
- raise NotImplementedError
+ fallback = [self.get_name(), self.blob]
+ return self.inner_key._fields if self.inner_key else fallback
def sign_ssh_data(self, data, algorithm=None):
msg = Message()
msg.add_byte(cSSH2_AGENTC_SIGN_REQUEST)
- msg.add_string(self.blob)
+ # NOTE: this used to be just self.blob, which is not entirely right for
+ # RSA-CERT 'keys' - those end up always degrading to ssh-rsa type
+ # signatures, for reasons probably internal to OpenSSH's agent code,
+ # even if everything else wants SHA2 (including our flag map).
+ msg.add_string(self.asbytes())
msg.add_string(data)
msg.add_int(ALGORITHM_FLAG_MAP.get(algorithm, 0))
ptype, result = self.agent._send_message(msg)
diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py
index 22f506c1..bc7f298f 100644
--- a/paramiko/auth_handler.py
+++ b/paramiko/auth_handler.py
@@ -21,6 +21,7 @@
"""
import weakref
+import threading
import time
import re
@@ -241,7 +242,9 @@ class AuthHandler:
if not self.transport.is_active():
e = self.transport.get_exception()
if (e is None) or issubclass(e.__class__, EOFError):
- e = AuthenticationException("Authentication failed.")
+ e = AuthenticationException(
+ "Authentication failed: transport shut down or saw EOF"
+ )
raise e
if event.is_set():
break
@@ -254,6 +257,7 @@ class AuthHandler:
e = AuthenticationException("Authentication failed.")
# this is horrible. Python Exception isn't yet descended from
# object, so type(e) won't work. :(
+ # TODO 4.0: lol. just lmao.
if issubclass(e.__class__, PartialAuthentication):
return e.allowed_types
raise e
@@ -289,6 +293,17 @@ class AuthHandler:
return None
return self.transport._key_info[algorithm](Message(keyblob))
+ def _choose_fallback_pubkey_algorithm(self, key_type, my_algos):
+ # Fallback: first one in our (possibly tweaked by caller) list
+ pubkey_algo = my_algos[0]
+ msg = "Server did not send a server-sig-algs list; defaulting to our first preferred algo ({!r})" # noqa
+ self._log(DEBUG, msg.format(pubkey_algo))
+ self._log(
+ DEBUG,
+ "NOTE: you may use the 'disabled_algorithms' SSHClient/Transport init kwarg to disable that or other algorithms if your server does not support them!", # noqa
+ )
+ return pubkey_algo
+
def _finalize_pubkey_algorithm(self, key_type):
# Short-circuit for non-RSA keys
if "rsa" not in key_type:
@@ -329,6 +344,7 @@ class AuthHandler:
self.transport.server_extensions.get("server-sig-algs", b(""))
)
pubkey_algo = None
+ # Prefer to match against server-sig-algs
if server_algo_str:
server_algos = server_algo_str.split(",")
self._log(
@@ -350,14 +366,10 @@ class AuthHandler:
# technically for initial key exchange, not pubkey auth.
err = "Unable to agree on a pubkey algorithm for signing a {!r} key!" # noqa
raise AuthenticationException(err.format(key_type))
+ # Fallback to something based purely on the key & our configuration
else:
- # Fallback: first one in our (possibly tweaked by caller) list
- pubkey_algo = my_algos[0]
- msg = "Server did not send a server-sig-algs list; defaulting to our first preferred algo ({!r})" # noqa
- self._log(DEBUG, msg.format(pubkey_algo))
- self._log(
- DEBUG,
- "NOTE: you may use the 'disabled_algorithms' SSHClient/Transport init kwarg to disable that or other algorithms if your server does not support them!", # noqa
+ pubkey_algo = self._choose_fallback_pubkey_algorithm(
+ key_type, my_algos
)
if key_type.endswith("-cert-v01@openssh.com"):
pubkey_algo += "-cert-v01@openssh.com"
@@ -367,9 +379,6 @@ class AuthHandler:
def _parse_service_accept(self, m):
service = m.get_text()
if service == "ssh-userauth":
- # TODO 4.0: this message sucks ass. change it to something more
- # obvious. it always appears to mean "we already authed" but no! it
- # just means "we are allowed to TRY authing!"
self._log(DEBUG, "userauth is OK")
m = Message()
m.add_byte(cMSG_USERAUTH_REQUEST)
@@ -725,6 +734,9 @@ Error Message: {}
def _parse_userauth_failure(self, m):
authlist = m.get_list()
+ # TODO 4.0: we aren't giving callers access to authlist _unless_ it's
+ # partial authentication, so eg authtype=none can't work unless we
+ # tweak this.
partial = m.get_boolean()
if partial:
self._log(INFO, "Authentication continues...")
@@ -805,7 +817,6 @@ Error Message: {}
self.auth_event.set()
return
- # TODO: do the same to the other tables, in Transport.
# TODO 4.0: MAY make sense to make these tables into actual
# classes/instances that can be fed a mode bool or whatever. Or,
# alternately (both?) make the message types small classes or enums that
@@ -814,20 +825,27 @@ Error Message: {}
# TODO: if we do that, also expose 'em publicly.
# Messages which should be handled _by_ servers (sent by clients)
- _server_handler_table = {
- MSG_SERVICE_REQUEST: _parse_service_request,
- MSG_USERAUTH_REQUEST: _parse_userauth_request,
- MSG_USERAUTH_INFO_RESPONSE: _parse_userauth_info_response,
- }
+ @property
+ def _server_handler_table(self):
+ return {
+ # TODO 4.0: MSG_SERVICE_REQUEST ought to eventually move into
+ # Transport's server mode like the client side did, just for
+ # consistency.
+ MSG_SERVICE_REQUEST: self._parse_service_request,
+ MSG_USERAUTH_REQUEST: self._parse_userauth_request,
+ MSG_USERAUTH_INFO_RESPONSE: self._parse_userauth_info_response,
+ }
# Messages which should be handled _by_ clients (sent by servers)
- _client_handler_table = {
- MSG_SERVICE_ACCEPT: _parse_service_accept,
- MSG_USERAUTH_SUCCESS: _parse_userauth_success,
- MSG_USERAUTH_FAILURE: _parse_userauth_failure,
- MSG_USERAUTH_BANNER: _parse_userauth_banner,
- MSG_USERAUTH_INFO_REQUEST: _parse_userauth_info_request,
- }
+ @property
+ def _client_handler_table(self):
+ return {
+ MSG_SERVICE_ACCEPT: self._parse_service_accept,
+ MSG_USERAUTH_SUCCESS: self._parse_userauth_success,
+ MSG_USERAUTH_FAILURE: self._parse_userauth_failure,
+ MSG_USERAUTH_BANNER: self._parse_userauth_banner,
+ MSG_USERAUTH_INFO_REQUEST: self._parse_userauth_info_request,
+ }
# NOTE: prior to the fix for #1283, this was a static dict instead of a
# property. Should be backwards compatible in most/all cases.
@@ -945,3 +963,130 @@ class GssapiWithMicAuthHandler:
# TODO: determine if we can cut this up like we did for the primary
# AuthHandler class.
return self.__handler_table
+
+
+class AuthOnlyHandler(AuthHandler):
+ """
+ AuthHandler, and just auth, no service requests!
+
+ .. versionadded:: 3.2
+ """
+
+ # NOTE: this purposefully duplicates some of the parent class in order to
+ # modernize, refactor, etc. The intent is that eventually we will collapse
+ # this one onto the parent in a backwards incompatible release.
+
+ @property
+ def _client_handler_table(self):
+ my_table = super()._client_handler_table.copy()
+ del my_table[MSG_SERVICE_ACCEPT]
+ return my_table
+
+ def send_auth_request(self, username, method, finish_message=None):
+ """
+ Submit a userauth request message & wait for response.
+
+ Performs the transport message send call, sets self.auth_event, and
+ will lock-n-block as necessary to both send, and wait for response to,
+ the USERAUTH_REQUEST.
+
+ Most callers will want to supply a callback to ``finish_message``,
+ which accepts a Message ``m`` and may call mutator methods on it to add
+ more fields.
+ """
+ # Store a few things for reference in handlers, including auth failure
+ # handler (which needs to know if we were using a bad method, etc)
+ self.auth_method = method
+ self.username = username
+ # Generic userauth request fields
+ m = Message()
+ m.add_byte(cMSG_USERAUTH_REQUEST)
+ m.add_string(username)
+ m.add_string("ssh-connection")
+ m.add_string(method)
+ # Caller usually has more to say, such as injecting password, key etc
+ finish_message(m)
+ # TODO 4.0: seems odd to have the client handle the lock and not
+ # Transport; that _may_ have been an artifact of allowing user
+ # threading event injection? Regardless, we don't want to move _this_
+ # locking into Transport._send_message now, because lots of other
+ # untouched code also uses that method and we might end up
+ # double-locking (?) but 4.0 would be a good time to revisit.
+ with self.transport.lock:
+ self.transport._send_message(m)
+ # We have cut out the higher level event args, but self.auth_event is
+ # still required for self.wait_for_response to function correctly (it's
+ # the mechanism used by the auth success/failure handlers, the abort
+ # handler, and a few other spots like in gssapi.
+ # TODO: interestingly, wait_for_response itself doesn't actually
+ # enforce that its event argument and self.auth_event are the same...
+ self.auth_event = threading.Event()
+ return self.wait_for_response(self.auth_event)
+
+ def auth_none(self, username):
+ return self.send_auth_request(username, "none")
+
+ def auth_publickey(self, username, key):
+ key_type, bits = self._get_key_type_and_bits(key)
+ algorithm = self._finalize_pubkey_algorithm(key_type)
+ blob = self._get_session_blob(
+ key,
+ "ssh-connection",
+ username,
+ algorithm,
+ )
+
+ def finish(m):
+ # This field doesn't appear to be named, but is False when querying
+ # for permission (ie knowing whether to even prompt a user for
+ # passphrase, etc) or True when just going for it. Paramiko has
+ # never bothered with the former type of message, apparently.
+ m.add_boolean(True)
+ m.add_string(algorithm)
+ m.add_string(bits)
+ m.add_string(key.sign_ssh_data(blob, algorithm))
+
+ return self.send_auth_request(username, "publickey", finish)
+
+ def auth_password(self, username, password):
+ def finish(m):
+ # Unnamed field that equates to "I am changing my password", which
+ # Paramiko clientside never supported and serverside only sort of
+ # supported.
+ m.add_boolean(False)
+ m.add_string(b(password))
+
+ return self.send_auth_request(username, "password", finish)
+
+ def auth_interactive(self, username, handler, submethods=""):
+ """
+ response_list = handler(title, instructions, prompt_list)
+ """
+ # Unlike most siblings, this auth method _does_ require other
+ # superclass handlers (eg userauth info request) to understand
+ # what's going on, so we still set some self attributes.
+ self.auth_method = "keyboard_interactive"
+ self.interactive_handler = handler
+
+ def finish(m):
+ # Empty string for deprecated language tag field, per RFC 4256:
+ # https://www.rfc-editor.org/rfc/rfc4256#section-3.1
+ m.add_string("")
+ m.add_string(submethods)
+
+ return self.send_auth_request(username, "keyboard-interactive", finish)
+
+ # NOTE: not strictly 'auth only' related, but allows users to opt-in.
+ def _choose_fallback_pubkey_algorithm(self, key_type, my_algos):
+ msg = "Server did not send a server-sig-algs list; defaulting to something in our preferred algorithms list" # noqa
+ self._log(DEBUG, msg)
+ noncert_key_type = key_type.replace("-cert-v01@openssh.com", "")
+ if key_type in my_algos or noncert_key_type in my_algos:
+ actual = key_type if key_type in my_algos else noncert_key_type
+ msg = f"Current key type, {actual!r}, is in our preferred list; using that" # noqa
+ algo = actual
+ else:
+ algo = my_algos[0]
+ msg = f"{key_type!r} not in our list - trying first list item instead, {algo!r}" # noqa
+ self._log(DEBUG, msg)
+ return algo
diff --git a/paramiko/auth_strategy.py b/paramiko/auth_strategy.py
new file mode 100644
index 00000000..03c1d877
--- /dev/null
+++ b/paramiko/auth_strategy.py
@@ -0,0 +1,306 @@
+"""
+Modern, adaptable authentication machinery.
+
+Replaces certain parts of `.SSHClient`. For a concrete implementation, see the
+``OpenSSHAuthStrategy`` class in `Fabric <https://fabfile.org>`_.
+"""
+
+from collections import namedtuple
+
+from .agent import AgentKey
+from .util import get_logger
+from .ssh_exception import AuthenticationException
+
+
+class AuthSource:
+ """
+ Some SSH authentication source, such as a password, private key, or agent.
+
+ See subclasses in this module for concrete implementations.
+
+ All implementations must accept at least a ``username`` (``str``) kwarg.
+ """
+
+ def __init__(self, username):
+ self.username = username
+
+ def _repr(self, **kwargs):
+ # TODO: are there any good libs for this? maybe some helper from
+ # structlog?
+ pairs = [f"{k}={v!r}" for k, v in kwargs.items()]
+ joined = ", ".join(pairs)
+ return f"{self.__class__.__name__}({joined})"
+
+ def __repr__(self):
+ return self._repr()
+
+ def authenticate(self, transport):
+ """
+ Perform authentication.
+ """
+ raise NotImplementedError
+
+
+class NoneAuth(AuthSource):
+ """
+ Auth type "none", ie https://www.rfc-editor.org/rfc/rfc4252#section-5.2 .
+ """
+
+ def authenticate(self, transport):
+ return transport.auth_none(self.username)
+
+
+class Password(AuthSource):
+ """
+ Password authentication.
+
+ :param callable password_getter:
+ A lazy callable that should return a `str` password value at
+ authentication time, such as a `functools.partial` wrapping
+ `getpass.getpass`, an API call to a secrets store, or similar.
+
+ If you already know the password at instantiation time, you should
+ simply use something like ``lambda: "my literal"`` (for a literal, but
+ also, shame on you!) or ``lambda: variable_name`` (for something stored
+ in a variable).
+ """
+
+ def __init__(self, username, password_getter):
+ super().__init__(username=username)
+ self.password_getter = password_getter
+
+ def __repr__(self):
+ # Password auth is marginally more 'username-caring' than pkeys, so may
+ # as well log that info here.
+ return super()._repr(user=self.username)
+
+ def authenticate(self, transport):
+ # Lazily get the password, in case it's prompting a user
+ # TODO: be nice to log source _of_ the password?
+ password = self.password_getter()
+ return transport.auth_password(self.username, password)
+
+
+# TODO 4.0: twiddle this, or PKey, or both, so they're more obviously distinct.
+# TODO 4.0: the obvious is to make this more wordy (PrivateKeyAuth), the
+# minimalist approach might be to rename PKey to just Key (esp given all the
+# subclasses are WhateverKey and not WhateverPKey)
+class PrivateKey(AuthSource):
+ """
+ Essentially a mixin for private keys.
+
+ Knows how to auth, but leaves key material discovery/loading/decryption to
+ subclasses.
+
+ Subclasses **must** ensure that they've set ``self.pkey`` to a decrypted
+ `.PKey` instance before calling ``super().authenticate``; typically
+ either in their ``__init__``, or in an overridden ``authenticate`` prior to
+ its `super` call.
+ """
+
+ def authenticate(self, transport):
+ return transport.auth_publickey(self.username, self.pkey)
+
+
+class InMemoryPrivateKey(PrivateKey):
+ """
+ An in-memory, decrypted `.PKey` object.
+ """
+
+ def __init__(self, username, pkey):
+ super().__init__(username=username)
+ # No decryption (presumably) necessary!
+ self.pkey = pkey
+
+ def __repr__(self):
+ # NOTE: most of interesting repr-bits for private keys is in PKey.
+ # TODO: tacking on agent-ness like this is a bit awkward, but, eh?
+ rep = super()._repr(pkey=self.pkey)
+ if isinstance(self.pkey, AgentKey):
+ rep += " [agent]"
+ return rep
+
+
+class OnDiskPrivateKey(PrivateKey):
+ """
+ Some on-disk private key that needs opening and possibly decrypting.
+
+ :param str source:
+ String tracking where this key's path was specified; should be one of
+ ``"ssh-config"``, ``"python-config"``, or ``"implicit-home"``.
+ :param Path path:
+ The filesystem path this key was loaded from.
+ :param PKey pkey:
+ The `PKey` object this auth source uses/represents.
+ """
+
+ def __init__(self, username, source, path, pkey):
+ super().__init__(username=username)
+ self.source = source
+ allowed = ("ssh-config", "python-config", "implicit-home")
+ if source not in allowed:
+ raise ValueError(f"source argument must be one of: {allowed!r}")
+ self.path = path
+ # Superclass wants .pkey, other two are mostly for display/debugging.
+ self.pkey = pkey
+
+ def __repr__(self):
+ return self._repr(
+ key=self.pkey, source=self.source, path=str(self.path)
+ )
+
+
+# TODO re sources: is there anything in an OpenSSH config file that doesn't fit
+# into what Paramiko already had kwargs for?
+
+
+SourceResult = namedtuple("SourceResult", ["source", "result"])
+
+# TODO: tempting to make this an OrderedDict, except the keys essentially want
+# to be rich objects (AuthSources) which do not make for useful user indexing?
+# TODO: members being vanilla tuples is pretty old-school/expedient; they
+# "really" want to be something that's type friendlier (unless the tuple's 2nd
+# member being a Union of two types is "fine"?), which I assume means yet more
+# classes, eg an abstract SourceResult with concrete AuthSuccess and
+# AuthFailure children?
+# TODO: arguably we want __init__ typechecking of the members (or to leverage
+# mypy by classifying this literally as list-of-AuthSource?)
+class AuthResult(list):
+ """
+ Represents a partial or complete SSH authentication attempt.
+
+ This class conceptually extends `AuthStrategy` by pairing the former's
+ authentication **sources** with the **results** of trying to authenticate
+ with them.
+
+ `AuthResult` is a (subclass of) `list` of `namedtuple`, which are of the
+ form ``namedtuple('SourceResult', 'source', 'result')`` (where the
+ ``source`` member is an `AuthSource` and the ``result`` member is either a
+ return value from the relevant `.Transport` method, or an exception
+ object).
+
+ .. note::
+ Transport auth method results are always themselves a ``list`` of "next
+ allowable authentication methods".
+
+ In the simple case of "you just authenticated successfully", it's an
+ empty list; if your auth was rejected but you're allowed to try again,
+ it will be a list of string method names like ``pubkey`` or
+ ``password``.
+
+ The ``__str__`` of this class represents the empty-list scenario as the
+ word ``success``, which should make reading the result of an
+ authentication session more obvious to humans.
+
+ Instances also have a `strategy` attribute referencing the `AuthStrategy`
+ which was attempted.
+ """
+
+ def __init__(self, strategy, *args, **kwargs):
+ self.strategy = strategy
+ super().__init__(*args, **kwargs)
+
+ def __str__(self):
+ # NOTE: meaningfully distinct from __repr__, which still wants to use
+ # superclass' implementation.
+ # TODO: go hog wild, use rich.Table? how is that on degraded term's?
+ # TODO: test this lol
+ return "\n".join(
+ f"{x.source} -> {x.result or 'success'}" for x in self
+ )
+
+
+# TODO 4.0: descend from SSHException or even just Exception
+class AuthFailure(AuthenticationException):
+ """
+ Basic exception wrapping an `AuthResult` indicating overall auth failure.
+
+ Note that `AuthFailure` descends from `AuthenticationException` but is
+ generally "higher level"; the latter is now only raised by individual
+ `AuthSource` attempts and should typically only be seen by users when
+ encapsulated in this class. It subclasses `AuthenticationException`
+ primarily for backwards compatibility reasons.
+ """
+
+ def __init__(self, result):
+ self.result = result
+
+ def __str__(self):
+ return "\n" + str(self.result)
+
+
+class AuthStrategy:
+ """
+ This class represents one or more attempts to auth with an SSH server.
+
+ By default, subclasses must at least accept an ``ssh_config``
+ (`.SSHConfig`) keyword argument, but may opt to accept more as needed for
+ their particular strategy.
+ """
+
+ def __init__(
+ self,
+ ssh_config,
+ ):
+ self.ssh_config = ssh_config
+ self.log = get_logger(__name__)
+
+ def get_sources(self):
+ """
+ Generator yielding `AuthSource` instances, in the order to try.
+
+ This is the primary override point for subclasses: you figure out what
+ sources you need, and ``yield`` them.
+
+ Subclasses _of_ subclasses may find themselves wanting to do things
+ like filtering or discarding around a call to `super`.
+ """
+ raise NotImplementedError
+
+ def authenticate(self, transport):
+ """
+ Handles attempting `AuthSource` instances yielded from `get_sources`.
+
+ You *normally* won't need to override this, but it's an option for
+ advanced users.
+ """
+ succeeded = False
+ overall_result = AuthResult(strategy=self)
+ # TODO: arguably we could fit in a "send none auth, record allowed auth
+ # types sent back" thing here as OpenSSH-client does, but that likely
+ # wants to live in fabric.OpenSSHAuthStrategy as not all target servers
+ # will implement it!
+ # TODO: needs better "server told us too many attempts" checking!
+ for source in self.get_sources():
+ self.log.debug(f"Trying {source}")
+ try: # NOTE: this really wants to _only_ wrap the authenticate()!
+ result = source.authenticate(transport)
+ succeeded = True
+ # TODO: 'except PartialAuthentication' is needed for 2FA and
+ # similar, as per old SSHClient.connect - it is the only way
+ # AuthHandler supplies access to the 'name-list' field from
+ # MSG_USERAUTH_FAILURE, at present.
+ except Exception as e:
+ result = e
+ # TODO: look at what this could possibly raise, we don't really
+ # want Exception here, right? just SSHException subclasses? or
+ # do we truly want to capture anything at all with assumption
+ # it's easy enough for users to look afterwards?
+ # NOTE: showing type, not message, for tersity & also most of
+ # the time it's basically just "Authentication failed."
+ source_class = e.__class__.__name__
+ self.log.info(
+ f"Authentication via {source} failed with {source_class}"
+ )
+ overall_result.append(SourceResult(source, result))
+ if succeeded:
+ break
+ # Gotta die here if nothing worked, otherwise Transport's main loop
+ # just kinda hangs out until something times out!
+ if not succeeded:
+ raise AuthFailure(result=overall_result)
+ # Success: give back what was done, in case they care.
+ return overall_result
+
+ # TODO: is there anything OpenSSH client does which _can't_ cleanly map to
+ # iterating a generator?
diff --git a/paramiko/client.py b/paramiko/client.py
index 1fe14b07..d8be9108 100644
--- a/paramiko/client.py
+++ b/paramiko/client.py
@@ -238,6 +238,7 @@ class SSHClient(ClosingContextManager):
passphrase=None,
disabled_algorithms=None,
transport_factory=None,
+ auth_strategy=None,
):
"""
Connect to an SSH server and authenticate to it. The server's host key
@@ -323,10 +324,28 @@ class SSHClient(ClosingContextManager):
functionality, and algorithm selection) and generates a
`.Transport` instance to be used by this client. Defaults to
`.Transport.__init__`.
+ :param auth_strategy:
+ an optional instance of `.AuthStrategy`, triggering use of this
+ newer authentication mechanism instead of SSHClient's legacy auth
+ method.
+
+ .. warning::
+ This parameter is **incompatible** with all other
+ authentication-related parameters (such as, but not limited to,
+ ``password``, ``key_filename`` and ``allow_agent``) and will
+ trigger an exception if given alongside them.
+
+ :returns:
+ `.AuthResult` if ``auth_strategy`` is non-``None``; otherwise,
+ returns ``None``.
:raises BadHostKeyException:
if the server's host key could not be verified.
- :raises AuthenticationException: if authentication failed.
+ :raises AuthenticationException:
+ if authentication failed.
+ :raises UnableToAuthenticate:
+ if authentication failed (when ``auth_strategy`` is non-``None``;
+ and note that this is a subclass of ``AuthenticationException``).
:raises socket.error:
if a socket error (other than connection-refused or
host-unreachable) occurred while connecting.
@@ -349,6 +368,8 @@ class SSHClient(ClosingContextManager):
Added the ``disabled_algorithms`` argument.
.. versionchanged:: 2.12
Added the ``transport_factory`` argument.
+ .. versionchanged:: 3.2
+ Added the ``auth_strategy`` argument.
"""
if not sock:
errors = {}
@@ -449,6 +470,11 @@ class SSHClient(ClosingContextManager):
if username is None:
username = getpass.getuser()
+ # New auth flow!
+ if auth_strategy is not None:
+ return auth_strategy.authenticate(transport=t)
+
+ # Old auth flow!
if key_filename is None:
key_filenames = []
elif isinstance(key_filename, str):
@@ -697,6 +723,8 @@ class SSHClient(ClosingContextManager):
if not two_factor:
for key_filename in key_filenames:
+ # TODO 4.0: leverage PKey.from_path() if we don't end up just
+ # killing SSHClient entirely
for pkey_class in (RSAKey, DSSKey, ECDSAKey, Ed25519Key):
try:
key = self._key_from_filepath(
diff --git a/paramiko/dsskey.py b/paramiko/dsskey.py
index 5a0f85eb..5215d282 100644
--- a/paramiko/dsskey.py
+++ b/paramiko/dsskey.py
@@ -43,6 +43,8 @@ class DSSKey(PKey):
data.
"""
+ name = "ssh-dss"
+
def __init__(
self,
msg=None,
@@ -71,8 +73,8 @@ class DSSKey(PKey):
else:
self._check_type_and_load_cert(
msg=msg,
- key_type="ssh-dss",
- cert_type="ssh-dss-cert-v01@openssh.com",
+ key_type=self.name,
+ cert_type=f"{self.name}-cert-v01@openssh.com",
)
self.p = msg.get_mpint()
self.q = msg.get_mpint()
@@ -82,7 +84,7 @@ class DSSKey(PKey):
def asbytes(self):
m = Message()
- m.add_string("ssh-dss")
+ m.add_string(self.name)
m.add_mpint(self.p)
m.add_mpint(self.q)
m.add_mpint(self.g)
@@ -96,8 +98,9 @@ class DSSKey(PKey):
def _fields(self):
return (self.get_name(), self.p, self.q, self.g, self.y)
+ # TODO 4.0: remove
def get_name(self):
- return "ssh-dss"
+ return self.name
def get_bits(self):
return self.size
@@ -119,7 +122,7 @@ class DSSKey(PKey):
r, s = decode_dss_signature(sig)
m = Message()
- m.add_string("ssh-dss")
+ m.add_string(self.name)
# apparently, in rare cases, r or s may be shorter than 20 bytes!
rstr = util.deflate_long(r, 0)
sstr = util.deflate_long(s, 0)
@@ -136,7 +139,7 @@ class DSSKey(PKey):
sig = msg.asbytes()
else:
kind = msg.get_text()
- if kind != "ssh-dss":
+ if kind != self.name:
return 0
sig = msg.get_binary()
diff --git a/paramiko/ecdsakey.py b/paramiko/ecdsakey.py
index e2279754..6fd95fab 100644
--- a/paramiko/ecdsakey.py
+++ b/paramiko/ecdsakey.py
@@ -114,6 +114,7 @@ class ECDSAKey(PKey):
password=None,
vals=None,
file_obj=None,
+ # TODO 4.0: remove; it does nothing since porting to cryptography.io
validate_point=True,
):
self.verifying_key = None
@@ -168,9 +169,14 @@ class ECDSAKey(PKey):
raise SSHException("Invalid public key")
@classmethod
- def supported_key_format_identifiers(cls):
+ def identifiers(cls):
return cls._ECDSA_CURVES.get_key_format_identifier_list()
+ # TODO 4.0: deprecate/remove
+ @classmethod
+ def supported_key_format_identifiers(cls):
+ return cls.identifiers()
+
def asbytes(self):
key = self.verifying_key
m = Message()
diff --git a/paramiko/ed25519key.py b/paramiko/ed25519key.py
index c8c4da6c..e5e81ac5 100644
--- a/paramiko/ed25519key.py
+++ b/paramiko/ed25519key.py
@@ -39,6 +39,8 @@ class Ed25519Key(PKey):
Added a ``file_obj`` parameter to match other key classes.
"""
+ name = "ssh-ed25519"
+
def __init__(
self, msg=None, data=None, filename=None, password=None, file_obj=None
):
@@ -49,7 +51,7 @@ class Ed25519Key(PKey):
if msg is not None:
self._check_type_and_load_cert(
msg=msg,
- key_type="ssh-ed25519",
+ key_type=self.name,
cert_type="ssh-ed25519-cert-v01@openssh.com",
)
verifying_key = nacl.signing.VerifyKey(msg.get_binary())
@@ -108,7 +110,7 @@ class Ed25519Key(PKey):
public_keys = []
for _ in range(num_keys):
pubkey = Message(message.get_binary())
- if pubkey.get_text() != "ssh-ed25519":
+ if pubkey.get_text() != self.name:
raise SSHException("Invalid key")
public_keys.append(pubkey.get_binary())
@@ -141,7 +143,7 @@ class Ed25519Key(PKey):
signing_keys = []
for i in range(num_keys):
- if message.get_text() != "ssh-ed25519":
+ if message.get_text() != self.name:
raise SSHException("Invalid key")
# A copy of the public key, again, ignore.
public = message.get_binary()
@@ -170,7 +172,7 @@ class Ed25519Key(PKey):
else:
v = self._verifying_key
m = Message()
- m.add_string("ssh-ed25519")
+ m.add_string(self.name)
m.add_string(v.encode())
return m.asbytes()
@@ -182,8 +184,9 @@ class Ed25519Key(PKey):
v = self._verifying_key
return (self.get_name(), v)
+ # TODO 4.0: remove
def get_name(self):
- return "ssh-ed25519"
+ return self.name
def get_bits(self):
return 256
@@ -193,12 +196,12 @@ class Ed25519Key(PKey):
def sign_ssh_data(self, data, algorithm=None):
m = Message()
- m.add_string("ssh-ed25519")
+ m.add_string(self.name)
m.add_string(self._signing_key.sign(data).signature)
return m
def verify_ssh_sig(self, data, msg):
- if msg.get_text() != "ssh-ed25519":
+ if msg.get_text() != self.name:
return False
try:
diff --git a/paramiko/hostkeys.py b/paramiko/hostkeys.py
index bbfa5755..4d47e950 100644
--- a/paramiko/hostkeys.py
+++ b/paramiko/hostkeys.py
@@ -27,11 +27,8 @@ from hashlib import sha1
from hmac import HMAC
-from paramiko.dsskey import DSSKey
-from paramiko.rsakey import RSAKey
+from paramiko.pkey import PKey, UnknownKeyType
from paramiko.util import get_logger, constant_time_bytes_eq, b, u
-from paramiko.ecdsakey import ECDSAKey
-from paramiko.ed25519key import Ed25519Key
from paramiko.ssh_exception import SSHException
@@ -95,16 +92,16 @@ class HostKeys(MutableMapping):
if (len(line) == 0) or (line[0] == "#"):
continue
try:
- e = HostKeyEntry.from_line(line, lineno)
+ entry = HostKeyEntry.from_line(line, lineno)
except SSHException:
continue
- if e is not None:
- _hostnames = e.hostnames
+ if entry is not None:
+ _hostnames = entry.hostnames
for h in _hostnames:
- if self.check(h, e.key):
- e.hostnames.remove(h)
- if len(e.hostnames):
- self._entries.append(e)
+ if self.check(h, entry.key):
+ entry.hostnames.remove(h)
+ if len(entry.hostnames):
+ self._entries.append(entry)
def save(self, filename):
"""
@@ -347,29 +344,27 @@ class HostKeyEntry:
return None
fields = fields[:3]
- names, keytype, key = fields
+ names, key_type, key = fields
names = names.split(",")
# Decide what kind of key we're looking at and create an object
# to hold it accordingly.
try:
- key = b(key)
- if keytype == "ssh-rsa":
- key = RSAKey(data=decodebytes(key))
- elif keytype == "ssh-dss":
- key = DSSKey(data=decodebytes(key))
- elif keytype in ECDSAKey.supported_key_format_identifiers():
- key = ECDSAKey(data=decodebytes(key), validate_point=False)
- elif keytype == "ssh-ed25519":
- key = Ed25519Key(data=decodebytes(key))
- else:
- log.info("Unable to handle key of type {}".format(keytype))
- return None
-
+ # TODO: this grew organically and doesn't seem /wrong/ per se (file
+ # read -> unicode str -> bytes for base64 decode -> decoded bytes);
+ # but in Python 3 forever land, can we simply use
+ # `base64.b64decode(str-from-file)` here?
+ key_bytes = decodebytes(b(key))
except binascii.Error as e:
raise InvalidHostKey(line, e)
- return cls(names, key)
+ try:
+ return cls(names, PKey.from_type_string(key_type, key_bytes))
+ except UnknownKeyType:
+ # TODO 4.0: consider changing HostKeys API so this just raises
+ # naturally and the exception is muted higher up in the stack?
+ log.info("Unable to handle key of type {}".format(key_type))
+ return None
def to_line(self):
"""
diff --git a/paramiko/pkey.py b/paramiko/pkey.py
index 32d8cad5..ef371002 100644
--- a/paramiko/pkey.py
+++ b/paramiko/pkey.py
@@ -24,7 +24,8 @@ import base64
from base64 import encodebytes, decodebytes
from binascii import unhexlify
import os
-from hashlib import md5
+from pathlib import Path
+from hashlib import md5, sha256
import re
import struct
@@ -33,6 +34,7 @@ import bcrypt
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.ciphers import algorithms, modes, Cipher
+from cryptography.hazmat.primitives import asymmetric
from paramiko import util
from paramiko.util import u, b
@@ -59,9 +61,25 @@ def _unpad_openssh(data):
return data[:-padding_length]
+class UnknownKeyType(Exception):
+ """
+ An unknown public/private key algorithm was attempted to be read.
+ """
+
+ def __init__(self, key_type=None, key_bytes=None):
+ self.key_type = key_type
+ self.key_bytes = key_bytes
+
+ def __str__(self):
+ return f"UnknownKeyType(type={self.key_type!r}, bytes=<{len(self.key_bytes)}>)" # noqa
+
+
class PKey:
"""
Base class for public keys.
+
+ Also includes some "meta" level convenience constructors such as
+ `.from_type_string`.
"""
# known encryption types for private key files:
@@ -92,6 +110,127 @@ class PKey:
)
END_TAG = re.compile(r"^-{5}END (RSA|DSA|EC|OPENSSH) PRIVATE KEY-{5}\s*$")
+ @staticmethod
+ def from_path(path, passphrase=None):
+ """
+ Attempt to instantiate appropriate key subclass from given file path.
+
+ :param Path path: The path to load (may also be a `str`).
+
+ :returns:
+ A `PKey` subclass instance.
+
+ :raises:
+ `UnknownKeyType`, if our crypto backend doesn't know this key type.
+
+ .. versionadded:: 3.2
+ """
+ # TODO: make sure sphinx is reading Path right in param list...
+
+ # Lazy import to avoid circular import issues
+ from paramiko import DSSKey, RSAKey, Ed25519Key, ECDSAKey
+
+ # Normalize to string, as cert suffix isn't quite an extension, so
+ # pathlib isn't useful for this.
+ path = str(path)
+
+ # Sort out cert vs key, i.e. it is 'legal' to hand this kind of API
+ # /either/ the key /or/ the cert, when there is a key/cert pair.
+ cert_suffix = "-cert.pub"
+ if str(path).endswith(cert_suffix):
+ key_path = path[: -len(cert_suffix)]
+ cert_path = path
+ else:
+ key_path = path
+ cert_path = path + cert_suffix
+
+ key_path = Path(key_path).expanduser()
+ cert_path = Path(cert_path).expanduser()
+
+ data = key_path.read_bytes()
+ # Like OpenSSH, try modern/OpenSSH-specific key load first
+ try:
+ loaded = serialization.load_ssh_private_key(
+ data=data, password=passphrase
+ )
+ # Then fall back to assuming legacy PEM type
+ except ValueError:
+ loaded = serialization.load_pem_private_key(
+ data=data, password=passphrase
+ )
+ # TODO Python 3.10: match statement? (NOTE: we cannot use a dict
+ # because the results from the loader are literal backend, eg openssl,
+ # private classes, so isinstance tests work but exact 'x class is y'
+ # tests will not work)
+ # TODO: leverage already-parsed/math'd obj to avoid duplicate cpu
+ # cycles? seemingly requires most of our key subclasses to be rewritten
+ # to be cryptography-object-forward. this is still likely faster than
+ # the old SSHClient code that just tried instantiating every class!
+ key_class = None
+ if isinstance(loaded, asymmetric.dsa.DSAPrivateKey):
+ key_class = DSSKey
+ elif isinstance(loaded, asymmetric.rsa.RSAPrivateKey):
+ key_class = RSAKey
+ elif isinstance(loaded, asymmetric.ed25519.Ed25519PrivateKey):
+ key_class = Ed25519Key
+ elif isinstance(loaded, asymmetric.ec.EllipticCurvePrivateKey):
+ key_class = ECDSAKey
+ else:
+ raise UnknownKeyType(key_bytes=data, key_type=loaded.__class__)
+ with key_path.open() as fd:
+ key = key_class.from_private_key(fd, password=passphrase)
+ if cert_path.exists():
+ # load_certificate can take Message, path-str, or value-str
+ key.load_certificate(str(cert_path))
+ return key
+
+ @staticmethod
+ def from_type_string(key_type, key_bytes):
+ """
+ Given type `str` & raw `bytes`, return a `PKey` subclass instance.
+
+ For example, ``PKey.from_type_string("ssh-ed25519", <public bytes>)``
+ will (if successful) return a new `.Ed25519Key`.
+
+ :param str key_type:
+ The key type, eg ``"ssh-ed25519"``.
+ :param bytes key_bytes:
+ The raw byte data forming the key material, as expected by
+ subclasses' ``data`` parameter.
+
+ :returns:
+ A `PKey` subclass instance.
+
+ :raises:
+ `UnknownKeyType`, if no registered classes knew about this type.
+
+ .. versionadded:: 3.2
+ """
+ from paramiko import key_classes
+
+ for key_class in key_classes:
+ if key_type in key_class.identifiers():
+ # TODO: needs to passthru things like passphrase
+ return key_class(data=key_bytes)
+ raise UnknownKeyType(key_type=key_type, key_bytes=key_bytes)
+
+ @classmethod
+ def identifiers(cls):
+ """
+ returns an iterable of key format/name strings this class can handle.
+
+ Most classes only have a single identifier, and thus this default
+ implementation suffices; see `.ECDSAKey` for one example of an
+ override.
+ """
+ return [cls.name]
+
+ # TODO 4.0: make this and subclasses consistent, some of our own
+ # classmethods even assume kwargs we don't define!
+ # TODO 4.0: prob also raise NotImplementedError instead of pass'ing; the
+ # contract is pretty obviously that you need to handle msg/data/filename
+ # appropriately. (If 'pass' is a concession to testing, see about doing the
+ # work to fix the tests instead)
def __init__(self, msg=None, data=None):
"""
Create a new instance of this public key type. If ``msg`` is given,
@@ -101,8 +240,8 @@ class PKey:
:param .Message msg:
an optional SSH `.Message` containing a public key of this type.
- :param str data: an optional string containing a public key
- of this type
+ :param bytes data:
+ optional, the bytes of a public key of this type
:raises: `.SSHException` --
if a key cannot be created from the ``data`` or ``msg`` given, or
@@ -110,6 +249,19 @@ class PKey:
"""
pass
+ # TODO: arguably this might want to be __str__ instead? ehh
+ # TODO: ditto the interplay between showing class name (currently we just
+ # say PKey writ large) and algorithm (usually == class name, but not
+ # always, also sometimes shows certificate-ness)
+ # TODO: if we do change it, we also want to tweak eg AgentKey, as it
+ # currently displays agent-ness with a suffix
+ def __repr__(self):
+ comment = ""
+ # Works for AgentKey, may work for others?
+ if hasattr(self, "comment") and self.comment:
+ comment = f", comment={self.comment!r}"
+ return f"PKey(alg={self.algorithm_name}, bits={self.get_bits()}, fp={self.fingerprint}{comment})" # noqa
+
# TODO 4.0: just merge into __bytes__ (everywhere)
def asbytes(self):
"""
@@ -142,6 +294,26 @@ class PKey:
"""
return ""
+ @property
+ def algorithm_name(self):
+ """
+ Return the key algorithm identifier for this key.
+
+ Similar to `get_name`, but aimed at pure algorithm name instead of SSH
+ protocol field value.
+ """
+ # Nuke the leading 'ssh-'
+ # TODO in Python 3.9: use .removeprefix()
+ name = self.get_name().replace("ssh-", "")
+ # Trim any cert suffix (but leave the -cert, as OpenSSH does)
+ cert_tail = "-cert-v01@openssh.com"
+ if cert_tail in name:
+ name = name.replace(cert_tail, "-cert")
+ # Nuke any eg ECDSA suffix, OpenSSH does basically this too.
+ else:
+ name = name.split("-")[0]
+ return name.upper()
+
def get_bits(self):
"""
Return the number of significant bits in this key. This is useful
@@ -149,6 +321,8 @@ class PKey:
:return: bits in the key (as an `int`)
"""
+ # TODO 4.0: raise NotImplementedError, 0 is unlikely to ever be
+ # _correct_ and nothing in the critical path seems to use this.
return 0
def can_sign(self):
@@ -169,6 +343,21 @@ class PKey:
"""
return md5(self.asbytes()).digest()
+ @property
+ def fingerprint(self):
+ """
+ Modern fingerprint property designed to be comparable to OpenSSH.
+
+ Currently only does SHA256 (the OpenSSH default).
+
+ .. versionadded:: 3.2
+ """
+ hashy = sha256(bytes(self))
+ hash_name = hashy.name.upper()
+ b64ed = encodebytes(hashy.digest())
+ cleaned = u(b64ed).strip().rstrip("=") # yes, OpenSSH does this too!
+ return f"{hash_name}:{cleaned}"
+
def get_base64(self):
"""
Return a base64 string containing the public part of this key. Nothing
@@ -278,6 +467,7 @@ class PKey:
:raises: ``IOError`` -- if there was an error writing to the file
:raises: `.SSHException` -- if the key is invalid
"""
+ # TODO 4.0: NotImplementedError (plus everywhere else in here)
raise Exception("Not implemented in PKey")
def _read_private_key_file(self, tag, filename, password=None):
@@ -613,7 +803,9 @@ class PKey:
# message; they're *IO objects at heart and their .getvalue()
# always returns the full value regardless of pointer position.
self.load_certificate(Message(msg.asbytes()))
- # Read out nonce as it comes before the public numbers.
+ # Read out nonce as it comes before the public numbers - our caller
+ # is likely going to use the (only borrowed by us, not owned)
+ # 'msg' object for loading those numbers right after this.
# TODO: usefully interpret it & other non-public-number fields
# (requires going back into per-type subclasses.)
msg.get_string()
diff --git a/paramiko/rsakey.py b/paramiko/rsakey.py
index 9ea00e95..b7ad3ce2 100644
--- a/paramiko/rsakey.py
+++ b/paramiko/rsakey.py
@@ -36,6 +36,7 @@ class RSAKey(PKey):
data.
"""
+ name = "ssh-rsa"
HASHES = {
"ssh-rsa": hashes.SHA1,
"ssh-rsa-cert-v01@openssh.com": hashes.SHA1,
@@ -71,13 +72,17 @@ class RSAKey(PKey):
msg=msg,
# NOTE: this does NOT change when using rsa2 signatures; it's
# purely about key loading, not exchange or verification
- key_type="ssh-rsa",
+ key_type=self.name,
cert_type="ssh-rsa-cert-v01@openssh.com",
)
self.key = rsa.RSAPublicNumbers(
e=msg.get_mpint(), n=msg.get_mpint()
).public_key(default_backend())
+ @classmethod
+ def identifiers(cls):
+ return list(cls.HASHES.keys())
+
@property
def size(self):
return self.key.key_size
@@ -91,7 +96,7 @@ class RSAKey(PKey):
def asbytes(self):
m = Message()
- m.add_string("ssh-rsa")
+ m.add_string(self.name)
m.add_mpint(self.public_numbers.e)
m.add_mpint(self.public_numbers.n)
return m.asbytes()
@@ -106,7 +111,7 @@ class RSAKey(PKey):
return (self.get_name(), self.public_numbers.e, self.public_numbers.n)
def get_name(self):
- return "ssh-rsa"
+ return self.name
def get_bits(self):
return self.size
@@ -114,13 +119,18 @@ class RSAKey(PKey):
def can_sign(self):
return isinstance(self.key, rsa.RSAPrivateKey)
- def sign_ssh_data(self, data, algorithm="ssh-rsa"):
+ def sign_ssh_data(self, data, algorithm=None):
+ if algorithm is None:
+ algorithm = self.name
sig = self.key.sign(
data,
padding=padding.PKCS1v15(),
+ # HASHES being just a map from long identifier to either SHA1 or
+ # SHA256 - cert'ness is not truly relevant.
algorithm=self.HASHES[algorithm](),
)
m = Message()
+ # And here again, cert'ness is irrelevant, so it is stripped out.
m.add_string(algorithm.replace("-cert-v01@openssh.com", ""))
m.add_string(sig)
return m
diff --git a/paramiko/server.py b/paramiko/server.py
index 6b0bb0f6..6923bdf5 100644
--- a/paramiko/server.py
+++ b/paramiko/server.py
@@ -254,7 +254,7 @@ class ServerInterface:
a valid krb5 principal!
We don't check if the krb5 principal is allowed to log in on
the server, because there is no way to do that in python. So
- if you develop your own SSH server with paramiko for a cetain
+ if you develop your own SSH server with paramiko for a certain
platform like Linux, you should call C{krb5_kuserok()} in
your local kerberos library to make sure that the
krb5_principal has an account on the server and is allowed to
@@ -286,7 +286,7 @@ class ServerInterface:
a valid krb5 principal!
We don't check if the krb5 principal is allowed to log in on
the server, because there is no way to do that in python. So
- if you develop your own SSH server with paramiko for a cetain
+ if you develop your own SSH server with paramiko for a certain
platform like Linux, you should call C{krb5_kuserok()} in
your local kerberos library to make sure that the
krb5_principal has an account on the server and is allowed
diff --git a/paramiko/ssh_exception.py b/paramiko/ssh_exception.py
index 9b1b44c3..a1e1cfc3 100644
--- a/paramiko/ssh_exception.py
+++ b/paramiko/ssh_exception.py
@@ -89,6 +89,11 @@ class PartialAuthentication(AuthenticationException):
)
+# TODO 4.0: stop inheriting from SSHException, move to auth.py
+class UnableToAuthenticate(AuthenticationException):
+ pass
+
+
class ChannelException(SSHException):
"""
Exception raised when an attempt to open a new `.Channel` fails.
diff --git a/paramiko/transport.py b/paramiko/transport.py
index 98cdae03..8785d6bb 100644
--- a/paramiko/transport.py
+++ b/paramiko/transport.py
@@ -34,7 +34,7 @@ from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes
import paramiko
from paramiko import util
-from paramiko.auth_handler import AuthHandler
+from paramiko.auth_handler import AuthHandler, AuthOnlyHandler
from paramiko.ssh_gss import GSSAuth
from paramiko.channel import Channel
from paramiko.common import (
@@ -64,6 +64,8 @@ from paramiko.common import (
MSG_GLOBAL_REQUEST,
MSG_REQUEST_SUCCESS,
MSG_REQUEST_FAILURE,
+ cMSG_SERVICE_REQUEST,
+ MSG_SERVICE_ACCEPT,
MSG_CHANNEL_OPEN_SUCCESS,
MSG_CHANNEL_OPEN_FAILURE,
MSG_CHANNEL_OPEN,
@@ -413,6 +415,8 @@ class Transport(threading.Thread, ClosingContextManager):
self.hostname = None
self.server_extensions = {}
+ # TODO: these two overrides on sock's type should go away sometime, too
+ # many ways to do it!
if isinstance(sock, str):
# convert "host:port" into (host, port)
hl = sock.split(":", 1)
@@ -529,6 +533,20 @@ class Transport(threading.Thread, ClosingContextManager):
self.server_accept_cv = threading.Condition(self.lock)
self.subsystem_table = {}
+ # Handler table, now set at init time for easier per-instance
+ # manipulation and subclass twiddling.
+ self._handler_table = {
+ MSG_EXT_INFO: self._parse_ext_info,
+ MSG_NEWKEYS: self._parse_newkeys,
+ MSG_GLOBAL_REQUEST: self._parse_global_request,
+ MSG_REQUEST_SUCCESS: self._parse_request_success,
+ MSG_REQUEST_FAILURE: self._parse_request_failure,
+ MSG_CHANNEL_OPEN_SUCCESS: self._parse_channel_open_success,
+ MSG_CHANNEL_OPEN_FAILURE: self._parse_channel_open_failure,
+ MSG_CHANNEL_OPEN: self._parse_channel_open,
+ MSG_KEXINIT: self._negotiate_keys,
+ }
+
def _filter_algorithm(self, type_):
default = getattr(self, "_preferred_{}".format(type_))
return tuple(
@@ -1646,7 +1664,7 @@ class Transport(threading.Thread, ClosingContextManager):
dumb wrapper around PAM.
This method will block until the authentication succeeds or fails,
- peroidically calling the handler asynchronously to get answers to
+ periodically calling the handler asynchronously to get answers to
authentication questions. The handler may be called more than once
if the server continues to ask questions.
@@ -1865,6 +1883,8 @@ class Transport(threading.Thread, ClosingContextManager):
# internals...
+ # TODO 4.0: make a public alias for this because multiple other classes
+ # already explicitly rely on it...or just rewrite logging :D
def _log(self, level, msg, *args):
if issubclass(type(msg), list):
for m in msg:
@@ -2125,6 +2145,8 @@ class Transport(threading.Thread, ClosingContextManager):
)
) # noqa
self._expected_packet = tuple()
+ # These message IDs indicate key exchange & will differ
+ # depending on exact exchange algorithm
if (ptype >= 30) and (ptype <= 41):
self.kex_engine.parse_next(ptype, m)
continue
@@ -2134,7 +2156,7 @@ class Transport(threading.Thread, ClosingContextManager):
if error_msg:
self._send_message(error_msg)
else:
- self._handler_table[ptype](self, m)
+ self._handler_table[ptype](m)
elif ptype in self._channel_handler_table:
chanid = m.get_int()
chan = self._channels.get(chanid)
@@ -2160,7 +2182,7 @@ class Transport(threading.Thread, ClosingContextManager):
and ptype in self.auth_handler._handler_table
):
handler = self.auth_handler._handler_table[ptype]
- handler(self.auth_handler, m)
+ handler(m)
if len(self._expected_packet) > 0:
continue
else:
@@ -2981,18 +3003,6 @@ class Transport(threading.Thread, ClosingContextManager):
finally:
self.lock.release()
- _handler_table = {
- MSG_EXT_INFO: _parse_ext_info,
- MSG_NEWKEYS: _parse_newkeys,
- MSG_GLOBAL_REQUEST: _parse_global_request,
- MSG_REQUEST_SUCCESS: _parse_request_success,
- MSG_REQUEST_FAILURE: _parse_request_failure,
- MSG_CHANNEL_OPEN_SUCCESS: _parse_channel_open_success,
- MSG_CHANNEL_OPEN_FAILURE: _parse_channel_open_failure,
- MSG_CHANNEL_OPEN: _parse_channel_open,
- MSG_KEXINIT: _negotiate_keys,
- }
-
_channel_handler_table = {
MSG_CHANNEL_SUCCESS: Channel._request_success,
MSG_CHANNEL_FAILURE: Channel._request_failed,
@@ -3132,3 +3142,161 @@ class ChannelMap:
return len(self._map)
finally:
self._lock.release()
+
+
+class ServiceRequestingTransport(Transport):
+ """
+ Transport, but also handling service requests, like it oughtta!
+
+ .. versionadded:: 3.2
+ """
+
+ # NOTE: this purposefully duplicates some of the parent class in order to
+ # modernize, refactor, etc. The intent is that eventually we will collapse
+ # this one onto the parent in a backwards incompatible release.
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._service_userauth_accepted = False
+ self._handler_table[MSG_SERVICE_ACCEPT] = self._parse_service_accept
+
+ def _parse_service_accept(self, m):
+ service = m.get_text()
+ # Short-circuit for any service name not ssh-userauth.
+ # NOTE: it's technically possible for 'service name' in
+ # SERVICE_REQUEST/ACCEPT messages to be "ssh-connection" --
+ # but I don't see evidence of Paramiko ever initiating or expecting to
+ # receive one of these. We /do/ see the 'service name' field in
+ # MSG_USERAUTH_REQUEST/ACCEPT/FAILURE set to this string, but that is a
+ # different set of handlers, so...!
+ if service != "ssh-userauth":
+ # TODO 4.0: consider erroring here (with an ability to opt out?)
+ # instead as it probably means something went Very Wrong.
+ self._log(
+ DEBUG, 'Service request "{}" accepted (?)'.format(service)
+ )
+ return
+ # Record that we saw a service-userauth acceptance, meaning we are free
+ # to submit auth requests.
+ self._service_userauth_accepted = True
+ self._log(DEBUG, "MSG_SERVICE_ACCEPT received; auth may begin")
+
+ def ensure_session(self):
+ # Make sure we're not trying to auth on a not-yet-open or
+ # already-closed transport session; that's our responsibility, not that
+ # of AuthHandler.
+ if (not self.active) or (not self.initial_kex_done):
+ # TODO: better error message? this can happen in many places, eg
+ # user error (authing before connecting) or developer error (some
+ # improperly handled pre/mid auth shutdown didn't become fatal
+ # enough). The latter is much more common & should ideally be fixed
+ # by terminating things harder?
+ raise SSHException("No existing session")
+ # Also make sure we've actually been told we are allowed to auth.
+ if self._service_userauth_accepted:
+ return
+ # Or request to do so, otherwise.
+ m = Message()
+ m.add_byte(cMSG_SERVICE_REQUEST)
+ m.add_string("ssh-userauth")
+ self._log(DEBUG, "Sending MSG_SERVICE_REQUEST: ssh-userauth")
+ self._send_message(m)
+ # Now we wait to hear back; the user is expecting a blocking-style auth
+ # request so there's no point giving control back anywhere.
+ while not self._service_userauth_accepted:
+ # TODO: feels like we're missing an AuthHandler Event like
+ # 'self.auth_event' which is set when AuthHandler shuts down in
+ # ways good AND bad. Transport only seems to have completion_event
+ # which is unclear re: intent, eg it's set by newkeys which always
+ # happens on connection, so it'll always be set by the time we get
+ # here.
+ # NOTE: this copies the timing of event.wait() in
+ # AuthHandler.wait_for_response, re: 1/10 of a second. Could
+ # presumably be smaller, but seems unlikely this period is going to
+ # be "too long" for any code doing ssh networking...
+ time.sleep(0.1)
+ self.auth_handler = self.get_auth_handler()
+
+ def get_auth_handler(self):
+ # NOTE: using new sibling subclass instead of classic AuthHandler
+ return AuthOnlyHandler(self)
+
+ def auth_none(self, username):
+ # TODO 4.0: merge to parent, preserving (most of) docstring
+ self.ensure_session()
+ return self.auth_handler.auth_none(username)
+
+ def auth_password(self, username, password, fallback=True):
+ # TODO 4.0: merge to parent, preserving (most of) docstring
+ self.ensure_session()
+ try:
+ return self.auth_handler.auth_password(username, password)
+ except BadAuthenticationType as e:
+ # if password auth isn't allowed, but keyboard-interactive *is*,
+ # try to fudge it
+ if not fallback or ("keyboard-interactive" not in e.allowed_types):
+ raise
+ try:
+
+ def handler(title, instructions, fields):
+ if len(fields) > 1:
+ raise SSHException("Fallback authentication failed.")
+ if len(fields) == 0:
+ # for some reason, at least on os x, a 2nd request will
+ # be made with zero fields requested. maybe it's just
+ # to try to fake out automated scripting of the exact
+ # type we're doing here. *shrug* :)
+ return []
+ return [password]
+
+ return self.auth_interactive(username, handler)
+ except SSHException:
+ # attempt to fudge failed; just raise the original exception
+ raise e
+
+ def auth_publickey(self, username, key):
+ # TODO 4.0: merge to parent, preserving (most of) docstring
+ self.ensure_session()
+ return self.auth_handler.auth_publickey(username, key)
+
+ def auth_interactive(self, username, handler, submethods=""):
+ # TODO 4.0: merge to parent, preserving (most of) docstring
+ self.ensure_session()
+ return self.auth_handler.auth_interactive(
+ username, handler, submethods
+ )
+
+ def auth_interactive_dumb(self, username, handler=None, submethods=""):
+ # TODO 4.0: merge to parent, preserving (most of) docstring
+ # NOTE: legacy impl omitted equiv of ensure_session since it just wraps
+ # another call to an auth method. however we reinstate it for
+ # consistency reasons.
+ self.ensure_session()
+ if not handler:
+
+ def handler(title, instructions, prompt_list):
+ answers = []
+ if title:
+ print(title.strip())
+ if instructions:
+ print(instructions.strip())
+ for prompt, show_input in prompt_list:
+ print(prompt.strip(), end=" ")
+ answers.append(input())
+ return answers
+
+ return self.auth_interactive(username, handler, submethods)
+
+ def auth_gssapi_with_mic(self, username, gss_host, gss_deleg_creds):
+ # TODO 4.0: merge to parent, preserving (most of) docstring
+ self.ensure_session()
+ self.auth_handler = self.get_auth_handler()
+ return self.auth_handler.auth_gssapi_with_mic(
+ username, gss_host, gss_deleg_creds
+ )
+
+ def auth_gssapi_keyex(self, username):
+ # TODO 4.0: merge to parent, preserving (most of) docstring
+ self.ensure_session()
+ self.auth_handler = self.get_auth_handler()
+ return self.auth_handler.auth_gssapi_keyex(username)
diff --git a/pytest.ini b/pytest.ini
index 62fef863..f51e190a 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -1,6 +1,3 @@
[pytest]
-# We use pytest-relaxed just for its utils at the moment, so disable it at the
-# plugin level until we adapt test organization to really use it.
-addopts = -p no:relaxed
-# Loop on failure
-looponfailroots = tests paramiko
+testpaths = tests
+python_files = *
diff --git a/sites/docs/api/auth.rst b/sites/docs/api/auth.rst
new file mode 100644
index 00000000..b6bce36c
--- /dev/null
+++ b/sites/docs/api/auth.rst
@@ -0,0 +1,8 @@
+Authentication modules
+======================
+
+.. automodule:: paramiko.auth_strategy
+ :member-order: bysource
+
+.. automodule:: paramiko.auth_handler
+ :member-order: bysource
diff --git a/sites/docs/index.rst b/sites/docs/index.rst
index 87265d95..675fe596 100644
--- a/sites/docs/index.rst
+++ b/sites/docs/index.rst
@@ -47,6 +47,7 @@ Authentication & keys
---------------------
.. toctree::
+ api/auth
api/agent
api/hostkeys
api/keys
diff --git a/sites/www/changelog.rst b/sites/www/changelog.rst
index 0b2022ca..c18890eb 100644
--- a/sites/www/changelog.rst
+++ b/sites/www/changelog.rst
@@ -2,6 +2,132 @@
Changelog
=========
+- :release:`3.2.0 <2023-05-25>`
+- :bug:`- major` Fixed a very sneaky bug found at the apparently
+ rarely-traveled intersection of ``RSA-SHA2`` keys, certificates, SSH agents,
+ and stricter-than-OpenSSH server targets. This manifested as yet another
+ "well, if we turn off SHA2 at one end or another, everything works again"
+ problem, for example with version 12 of the Teleport server endpoint.
+
+ This has been fixed; Paramiko tweaked multiple aspects of how it requests
+ agent signatures, and the agent appears to do the right thing now.
+
+ Thanks to Ryan Stoner for the bug report and testing.
+- :bug:`2012 major` (also :issue:`1961` and countless others) The
+ ``server-sig-algs`` and ``RSA-SHA2`` features added around Paramiko 2.9 or
+ so, had the annoying side effect of not working with servers that don't
+ support *either* of those feature sets, requiring use of
+ ``disabled_algorithms`` to forcibly disable the SHA2 algorithms on Paramiko's
+ end.
+
+ The **experimental** `~paramiko.transport.ServiceRequestingTransport` (noted
+ in its own entry in this changelog) includes a fix for this issue,
+ specifically by falling back to the same algorithm as the in-use pubkey if
+ it's in the algorithm list (leaving the "first algorithm in said list" as an
+ absolute final fallback).
+- :feature:`-` Implement ``_fields()`` on `~paramiko.agent.AgentKey` so that it
+ may be compared (via ``==``) with other `~paramiko.pkey.PKey` instances.
+- :bug:`23 major` Since its inception, Paramiko has (for reasons lost to time)
+ implemented authentication as a side effect of handling affirmative replies
+ to ``MSG_SERVICE_REQUEST`` protocol messages. What this means is Paramiko
+ makes one such request before every ``MSG_USERAUTH_REQUEST``, i.e. every auth
+ attempt.
+
+ OpenSSH doesn't care if clients send multiple service requests, but other
+ server implementations are often stricter in what they accept after an
+ initial service request (due to the RFCs not being clear). This can result in
+ odd behavior when a user doesn't authenticate successfully on the very first
+ try (for example, when the right key for a target host is the third in one's
+ ssh-agent).
+
+ This version of Paramiko now contains an opt-in
+ `~paramiko.transport.Transport` subclass,
+ `~paramiko.transport.ServiceRequestingTransport`, which more-correctly
+ implements service request handling in the Transport, and uses an
+ auth-handler subclass internally which has been similarly adapted. Users
+ wanting to try this new experimental code path may hand this class to
+ `SSHClient.connect <paramiko.client.SSHClient.connect>` as its
+ ``transport_factory`` kwarg.
+
+ .. warning::
+ This feature is **EXPERIMENTAL** and its code may be subject to change.
+
+ In addition:
+ - minor backwards incompatible changes exist in the new code paths,
+ most notably the removal of the (inconsistently applied and rarely
+ used) ``event`` arguments to the ``auth_xxx`` methods.
+ - GSSAPI support has only been partially implemented, and is untested.
+
+ .. note::
+ Some minor backwards-*compatible* changes were made to the **existing**
+ Transport and AuthHandler classes to facilitate the new code. For
+ example, ``Transport._handler_table`` and
+ ``AuthHandler._client_handler_table`` are now properties instead of raw
+ attributes.
+
+- :feature:`387` Users of `~paramiko.client.SSHClient` can now configure the
+ authentication logic Paramiko uses when connecting to servers; this
+ functionality is intended for advanced users and higher-level libraries such
+ as `Fabric <https://fabfile.org>`_. See `~paramiko.auth_strategy` for
+ details.
+
+ Fabric's co-temporal release includes a proof-of-concept use of this feature,
+ implementing an auth flow much closer to that of the OpenSSH client (versus
+ Paramiko's legacy behavior). It is **strongly recommended** that if this
+ interests you, investigate replacing any direct use of ``SSHClient`` with
+ Fabric's ``Connection``.
+
+ .. warning::
+ This feature is **EXPERIMENTAL**; please see its docs for details.
+
+- :feature:`-` Enhanced `~paramiko.agent.AgentKey` with new attributes, such
+ as:
+
+ - Added a ``comment`` attribute (and constructor argument);
+ `Agent.get_keys() <paramiko.agent.Agent.get_keys>` now uses this kwarg to
+ store any comment field sent over by the agent. The original version of
+ the agent feature inexplicably did not store the comment anywhere.
+ - Agent-derived keys now attempt to instantiate a copy of the appropriate
+ key class for access to other algorithm-specific members (eg key size).
+ This is available as the ``.inner_key`` attribute.
+
+ .. note::
+ This functionality is now in use in Fabric's new ``--list-agent-keys``
+ feature, as well as in Paramiko's debug logging.
+
+- :feature:`-` `~paramiko.pkey.PKey` now offers convenience
+ "meta-constructors", static methods that simplify the process of
+ instantiating the correct subclass for a given key input.
+
+ For example, `PKey.from_path <paramiko.pkey.PKey.from_path>` can load a file
+ path without knowing *a priori* what type of key it is (thanks to some handy
+ methods within our cryptography dependency). Going forwards, we expect this
+ to be the primary method of loading keys by user code that runs on "human
+ time" (i.e. where some minor efficiencies are worth the convenience).
+
+ In addition, `PKey.from_type_string <paramiko.pkey.PKey.from_type_string>`
+ now exists, and is being used in some internals to load ssh-agent keys.
+
+ As part of these changes, `~paramiko.pkey.PKey` and friends grew an
+ `~paramiko.pkey.PKey.identifiers` classmethod; this is inspired by the
+ `~paramiko.ecdsakey.ECDSAKey.supported_key_format_identifiers` classmethod
+ (which now refers to the new method.) This also includes adding a ``.name``
+ attribute to most key classes (which will eventually replace ``.get_name()``.
+
+- :feature:`-` `~paramiko.pkey.PKey` grew a new ``.algorithm_name`` property
+ which displays the key algorithm; this is typically derived from the value of
+ `~paramiko.pkey.PKey.get_name`. For example, ED25519 keys have a ``get_name``
+ of ``ssh-ed25519`` (the SSH protocol key type field value), and now have a
+ ``algorithm_name`` of ``ED25519``.
+- :feature:`-` `~paramiko.pkey.PKey` grew a new ``.fingerprint`` property which
+ emits a fingerprint string matching the SHA256+Base64 values printed by
+ various OpenSSH tooling (eg ``ssh-add -l``, ``ssh -v``). This is intended to
+ help troubleshoot Paramiko-vs-OpenSSH behavior and will eventually replace
+ the venerable ``get_fingerprint`` method.
+- :bug:`- major` `~paramiko.agent.AgentKey` had a dangling Python 3
+ incompatible ``__str__`` method returning bytes. This method has been
+ removed, allowing the superclass' (`~paramiko.pkey.PKey`) method to run
+ instead.
- :release:`3.1.0 <2023-03-10>`
- :feature:`2013` (solving :issue:`2009`, plus others) Add an explicit
``channel_timeout`` keyword argument to `paramiko.client.SSHClient.connect`,
diff --git a/tasks.py b/tasks.py
index c14abacb..361d9cda 100644
--- a/tasks.py
+++ b/tasks.py
@@ -1,10 +1,11 @@
import os
+from pathlib import Path
from os.path import join
from shutil import rmtree, copytree
from invoke import Collection, task
-from invocations.checks import blacken
-from invocations.docs import docs, www, sites
+from invocations import checks
+from invocations.docs import docs, www, sites, watch_docs
from invocations.packaging.release import ns as release_coll, publish
from invocations.testing import count_errors
@@ -50,8 +51,10 @@ def test(
opts += " -f"
modstr = ""
if module is not None:
- # NOTE: implicit test_ prefix as we're not on pytest-relaxed yet
- modstr = " tests/test_{}.py".format(module)
+ base = f"{module}.py"
+ tests = Path("tests")
+ legacy = tests / f"test_{base}"
+ modstr = str(legacy if legacy.exists() else tests / base)
# Switch runner depending on coverage or no coverage.
# TODO: get pytest's coverage plugin working, IIRC it has issues?
runner = "pytest"
@@ -98,7 +101,7 @@ def guard(ctx, opts=""):
# projects do it.
@task
def publish_(
- ctx, sdist=True, wheel=True, sign=True, dry_run=False, index=None
+ ctx, sdist=True, wheel=True, sign=False, dry_run=False, index=None
):
"""
Wraps invocations.packaging.publish to add baked-in docs folder.
@@ -137,9 +140,11 @@ ns = Collection(
release_coll,
docs,
www,
+ watch_docs,
sites,
count_errors,
- blacken,
+ checks.blacken,
+ checks,
)
ns.configure(
{
@@ -147,11 +152,12 @@ ns.configure(
# NOTE: many of these are also set in kwarg defaults above; but
# having them here too means once we get rid of our custom
# release(), the behavior stays.
- "sign": True,
+ "sign": False,
"wheel": True,
"changelog_file": join(
www.configuration()["sphinx"]["source"], "changelog.rst"
),
- }
+ },
+ "docs": {"browse": "remote"},
}
)
diff --git a/tests/loop.py b/tests/_loop.py
index a3740013..a3740013 100644
--- a/tests/loop.py
+++ b/tests/_loop.py
diff --git a/tests/stub_sftp.py b/tests/_stub_sftp.py
index 0c0372e9..0c0372e9 100644
--- a/tests/stub_sftp.py
+++ b/tests/_stub_sftp.py
diff --git a/tests/cert_support/test_dss.key b/tests/_support/dss.key
index e10807f1..e10807f1 100644
--- a/tests/cert_support/test_dss.key
+++ b/tests/_support/dss.key
diff --git a/tests/cert_support/test_dss.key-cert.pub b/tests/_support/dss.key-cert.pub
index 07fd5578..07fd5578 100644
--- a/tests/cert_support/test_dss.key-cert.pub
+++ b/tests/_support/dss.key-cert.pub
diff --git a/tests/cert_support/test_ecdsa_256.key b/tests/_support/ecdsa-256.key
index 42d44734..42d44734 100644
--- a/tests/cert_support/test_ecdsa_256.key
+++ b/tests/_support/ecdsa-256.key
diff --git a/tests/cert_support/test_ecdsa_256.key-cert.pub b/tests/_support/ecdsa-256.key-cert.pub
index f2c93ccf..f2c93ccf 100644
--- a/tests/cert_support/test_ecdsa_256.key-cert.pub
+++ b/tests/_support/ecdsa-256.key-cert.pub
diff --git a/tests/cert_support/test_ed25519.key b/tests/_support/ed25519.key
index eb9f94c2..eb9f94c2 100644
--- a/tests/cert_support/test_ed25519.key
+++ b/tests/_support/ed25519.key
diff --git a/tests/cert_support/test_ed25519.key-cert.pub b/tests/_support/ed25519.key-cert.pub
index 4e01415a..4e01415a 100644
--- a/tests/cert_support/test_ed25519.key-cert.pub
+++ b/tests/_support/ed25519.key-cert.pub
diff --git a/tests/_support/ed448.key b/tests/_support/ed448.key
new file mode 100644
index 00000000..887b51c6
--- /dev/null
+++ b/tests/_support/ed448.key
@@ -0,0 +1,4 @@
+-----BEGIN PRIVATE KEY-----
+MEcCAQAwBQYDK2VxBDsEOcvcl9IoD0ktR5RWtW84NM7O2e4LmD2cWfRg7Wht/OA9
+POkmRW12VNvlP6BsXKir5yygumIjD91SQQ==
+-----END PRIVATE KEY-----
diff --git a/tests/cert_support/test_rsa.key b/tests/_support/rsa-lonely.key
index f50e9c53..f50e9c53 100644
--- a/tests/cert_support/test_rsa.key
+++ b/tests/_support/rsa-lonely.key
diff --git a/tests/cert_support/test_rsa.key-cert.pub b/tests/_support/rsa-missing.key-cert.pub
index 7487ab66..7487ab66 100644
--- a/tests/cert_support/test_rsa.key-cert.pub
+++ b/tests/_support/rsa-missing.key-cert.pub
diff --git a/tests/test_rsa.key b/tests/_support/rsa.key
index f50e9c53..f50e9c53 100644
--- a/tests/test_rsa.key
+++ b/tests/_support/rsa.key
diff --git a/tests/_support/rsa.key-cert.pub b/tests/_support/rsa.key-cert.pub
new file mode 100644
index 00000000..7487ab66
--- /dev/null
+++ b/tests/_support/rsa.key-cert.pub
@@ -0,0 +1 @@
+ssh-rsa-cert-v01@openssh.com AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgsZlXTd5NE4uzGAn6TyAqQj+IPbsTEFGap2x5pTRwQR8AAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4cAAAAAAAAE0gAAAAEAAAAmU2FtcGxlIHNlbGYtc2lnbmVkIE9wZW5TU0ggY2VydGlmaWNhdGUAAAASAAAABXVzZXIxAAAABXVzZXIyAAAAAAAAAAD//////////wAAAAAAAACCAAAAFXBlcm1pdC1YMTEtZm9yd2FyZGluZwAAAAAAAAAXcGVybWl0LWFnZW50LWZvcndhcmRpbmcAAAAAAAAAFnBlcm1pdC1wb3J0LWZvcndhcmRpbmcAAAAAAAAACnBlcm1pdC1wdHkAAAAAAAAADnBlcm1pdC11c2VyLXJjAAAAAAAAAAAAAACVAAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4cAAACPAAAAB3NzaC1yc2EAAACATFHFsARDgQevc6YLxNnDNjsFtZ08KPMyYVx0w5xm95IVZHVWSOc5w+ccjqN9HRwxV3kP7IvL91qx0Uc3MJdB9g/O6HkAP+rpxTVoTb2EAMekwp5+i8nQJW4CN2BSsbQY1M6r7OBZ5nmF4hOW/5Pu4l22lXe2ydy8kEXOEuRpUeQ= test_rsa.key.pub
diff --git a/tests/_util.py b/tests/_util.py
new file mode 100644
index 00000000..eaf6aac4
--- /dev/null
+++ b/tests/_util.py
@@ -0,0 +1,441 @@
+from contextlib import contextmanager
+from os.path import dirname, realpath, join
+import builtins
+import os
+from pathlib import Path
+import socket
+import struct
+import sys
+import unittest
+from time import sleep
+import threading
+
+import pytest
+
+from paramiko import (
+ ServerInterface,
+ RSAKey,
+ DSSKey,
+ AUTH_FAILED,
+ AUTH_PARTIALLY_SUCCESSFUL,
+ AUTH_SUCCESSFUL,
+ OPEN_SUCCEEDED,
+ OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED,
+ InteractiveQuery,
+ Transport,
+)
+from paramiko.ssh_gss import GSS_AUTH_AVAILABLE
+
+from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.asymmetric import padding, rsa
+
+tests_dir = dirname(realpath(__file__))
+
+from ._loop import LoopSocket
+
+
+def _support(filename):
+ base = Path(tests_dir)
+ top = base / filename
+ deeper = base / "_support" / filename
+ return str(deeper if deeper.exists() else top)
+
+
+def _config(name):
+ return join(tests_dir, "configs", name)
+
+
+needs_gssapi = pytest.mark.skipif(
+ not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test"
+)
+
+
+def needs_builtin(name):
+ """
+ Skip decorated test if builtin name does not exist.
+ """
+ reason = "Test requires a builtin '{}'".format(name)
+ return pytest.mark.skipif(not hasattr(builtins, name), reason=reason)
+
+
+slow = pytest.mark.slow
+
+# GSSAPI / Kerberos related tests need a working Kerberos environment.
+# The class `KerberosTestCase` provides such an environment or skips all tests.
+# There are 3 distinct cases:
+#
+# - A Kerberos environment has already been created and the environment
+# contains the required information.
+#
+# - We can use the package 'k5test' to setup an working kerberos environment on
+# the fly.
+#
+# - We skip all tests.
+#
+# ToDo: add a Windows specific implementation?
+
+if (
+ os.environ.get("K5TEST_USER_PRINC", None)
+ and os.environ.get("K5TEST_HOSTNAME", None)
+ and os.environ.get("KRB5_KTNAME", None)
+): # add other vars as needed
+
+ # The environment provides the required information
+ class DummyK5Realm:
+ def __init__(self):
+ for k in os.environ:
+ if not k.startswith("K5TEST_"):
+ continue
+ setattr(self, k[7:].lower(), os.environ[k])
+ self.env = {}
+
+ class KerberosTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.realm = DummyK5Realm()
+
+ @classmethod
+ def tearDownClass(cls):
+ del cls.realm
+
+else:
+ try:
+ # Try to setup a kerberos environment
+ from k5test import KerberosTestCase
+ except Exception:
+ # Use a dummy, that skips all tests
+ class KerberosTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ raise unittest.SkipTest(
+ "Missing extension package k5test. "
+ 'Please run "pip install k5test" '
+ "to install it."
+ )
+
+
+def update_env(testcase, mapping, env=os.environ):
+ """Modify os.environ during a test case and restore during cleanup."""
+ saved_env = env.copy()
+
+ def replace(target, source):
+ target.update(source)
+ for k in list(target):
+ if k not in source:
+ target.pop(k, None)
+
+ testcase.addCleanup(replace, env, saved_env)
+ env.update(mapping)
+ return testcase
+
+
+def k5shell(args=None):
+ """Create a shell with an kerberos environment
+
+ This can be used to debug paramiko or to test the old GSSAPI.
+ To test a different GSSAPI, simply activate a suitable venv
+ within the shell.
+ """
+ import k5test
+ import atexit
+ import subprocess
+
+ k5 = k5test.K5Realm()
+ atexit.register(k5.stop)
+ os.environ.update(k5.env)
+ for n in ("realm", "user_princ", "hostname"):
+ os.environ["K5TEST_" + n.upper()] = getattr(k5, n)
+
+ if not args:
+ args = sys.argv[1:]
+ if not args:
+ args = [os.environ.get("SHELL", "bash")]
+ sys.exit(subprocess.call(args))
+
+
+def is_low_entropy():
+ """
+ Attempts to detect whether running interpreter is low-entropy.
+
+ "low-entropy" is defined as being in 32-bit mode and with the hash seed set
+ to zero.
+ """
+ is_32bit = struct.calcsize("P") == 32 / 8
+ # I don't see a way to tell internally if the hash seed was set this
+ # way, but env should be plenty sufficient, this is only for testing.
+ return is_32bit and os.environ.get("PYTHONHASHSEED", None) == "0"
+
+
+def sha1_signing_unsupported():
+ """
+ This is used to skip tests in environments where SHA-1 signing is
+ not supported by the backend.
+ """
+ private_key = rsa.generate_private_key(
+ public_exponent=65537, key_size=2048, backend=default_backend()
+ )
+ message = b"Some dummy text"
+ try:
+ private_key.sign(
+ message,
+ padding.PSS(
+ mgf=padding.MGF1(hashes.SHA1()),
+ salt_length=padding.PSS.MAX_LENGTH,
+ ),
+ hashes.SHA1(),
+ )
+ return False
+ except UnsupportedAlgorithm as e:
+ return e._reason is _Reasons.UNSUPPORTED_HASH
+
+
+requires_sha1_signing = unittest.skipIf(
+ sha1_signing_unsupported(), "SHA-1 signing not supported"
+)
+
+_disable_sha2 = dict(
+ disabled_algorithms=dict(keys=["rsa-sha2-256", "rsa-sha2-512"])
+)
+_disable_sha1 = dict(disabled_algorithms=dict(keys=["ssh-rsa"]))
+_disable_sha2_pubkey = dict(
+ disabled_algorithms=dict(pubkeys=["rsa-sha2-256", "rsa-sha2-512"])
+)
+_disable_sha1_pubkey = dict(disabled_algorithms=dict(pubkeys=["ssh-rsa"]))
+
+
+unicodey = "\u2022"
+
+
+class TestServer(ServerInterface):
+ paranoid_did_password = False
+ paranoid_did_public_key = False
+ # TODO: make this ed25519 or something else modern? (_is_ this used??)
+ paranoid_key = DSSKey.from_private_key_file(_support("dss.key"))
+
+ def __init__(self, allowed_keys=None):
+ self.allowed_keys = allowed_keys if allowed_keys is not None else []
+
+ def check_channel_request(self, kind, chanid):
+ if kind == "bogus":
+ return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
+ return OPEN_SUCCEEDED
+
+ def check_channel_exec_request(self, channel, command):
+ if command != b"yes":
+ return False
+ return True
+
+ def check_channel_shell_request(self, channel):
+ return True
+
+ def check_global_request(self, kind, msg):
+ self._global_request = kind
+ # NOTE: for w/e reason, older impl of this returned False always, even
+ # tho that's only supposed to occur if the request cannot be served.
+ # For now, leaving that the default unless test supplies specific
+ # 'acceptable' request kind
+ return kind == "acceptable"
+
+ def check_channel_x11_request(
+ self,
+ channel,
+ single_connection,
+ auth_protocol,
+ auth_cookie,
+ screen_number,
+ ):
+ self._x11_single_connection = single_connection
+ self._x11_auth_protocol = auth_protocol
+ self._x11_auth_cookie = auth_cookie
+ self._x11_screen_number = screen_number
+ return True
+
+ def check_port_forward_request(self, addr, port):
+ self._listen = socket.socket()
+ self._listen.bind(("127.0.0.1", 0))
+ self._listen.listen(1)
+ return self._listen.getsockname()[1]
+
+ def cancel_port_forward_request(self, addr, port):
+ self._listen.close()
+ self._listen = None
+
+ def check_channel_direct_tcpip_request(self, chanid, origin, destination):
+ self._tcpip_dest = destination
+ return OPEN_SUCCEEDED
+
+ def get_allowed_auths(self, username):
+ if username == "slowdive":
+ return "publickey,password"
+ if username == "paranoid":
+ if (
+ not self.paranoid_did_password
+ and not self.paranoid_did_public_key
+ ):
+ return "publickey,password"
+ elif self.paranoid_did_password:
+ return "publickey"
+ else:
+ return "password"
+ if username == "commie":
+ return "keyboard-interactive"
+ if username == "utf8":
+ return "password"
+ if username == "non-utf8":
+ return "password"
+ return "publickey"
+
+ def check_auth_password(self, username, password):
+ if (username == "slowdive") and (password == "pygmalion"):
+ return AUTH_SUCCESSFUL
+ if (username == "paranoid") and (password == "paranoid"):
+ # 2-part auth (even openssh doesn't support this)
+ self.paranoid_did_password = True
+ if self.paranoid_did_public_key:
+ return AUTH_SUCCESSFUL
+ return AUTH_PARTIALLY_SUCCESSFUL
+ if (username == "utf8") and (password == unicodey):
+ return AUTH_SUCCESSFUL
+ if (username == "non-utf8") and (password == "\xff"):
+ return AUTH_SUCCESSFUL
+ if username == "bad-server":
+ raise Exception("Ack!")
+ if username == "unresponsive-server":
+ sleep(5)
+ return AUTH_SUCCESSFUL
+ return AUTH_FAILED
+
+ def check_auth_publickey(self, username, key):
+ if (username == "paranoid") and (key == self.paranoid_key):
+ # 2-part auth
+ self.paranoid_did_public_key = True
+ if self.paranoid_did_password:
+ return AUTH_SUCCESSFUL
+ return AUTH_PARTIALLY_SUCCESSFUL
+ # TODO: make sure all tests incidentally using this to pass, _without
+ # sending a username oops_, get updated somehow - probably via server()
+ # default always injecting a username
+ elif key in self.allowed_keys:
+ return AUTH_SUCCESSFUL
+ return AUTH_FAILED
+
+ def check_auth_interactive(self, username, submethods):
+ if username == "commie":
+ self.username = username
+ return InteractiveQuery(
+ "password", "Please enter a password.", ("Password", False)
+ )
+ return AUTH_FAILED
+
+ def check_auth_interactive_response(self, responses):
+ if self.username == "commie":
+ if (len(responses) == 1) and (responses[0] == "cat"):
+ return AUTH_SUCCESSFUL
+ return AUTH_FAILED
+
+
+@contextmanager
+def server(
+ hostkey=None,
+ init=None,
+ server_init=None,
+ client_init=None,
+ connect=None,
+ pubkeys=None,
+ catch_error=False,
+ transport_factory=None,
+ defer=False,
+ skip_verify=False,
+):
+ """
+ SSH server contextmanager for testing.
+
+ :param hostkey:
+ Host key to use for the server; if None, loads
+ ``rsa.key``.
+ :param init:
+ Default `Transport` constructor kwargs to use for both sides.
+ :param server_init:
+ Extends and/or overrides ``init`` for server transport only.
+ :param client_init:
+ Extends and/or overrides ``init`` for client transport only.
+ :param connect:
+ Kwargs to use for ``connect()`` on the client.
+ :param pubkeys:
+ List of public keys for auth.
+ :param catch_error:
+ Whether to capture connection errors & yield from contextmanager.
+ Necessary for connection_time exception testing.
+ :param transport_factory:
+ Like the same-named param in SSHClient: which Transport class to use.
+ :param bool defer:
+ Whether to defer authentication during connecting.
+
+ This is really just shorthand for ``connect={}`` which would do roughly
+ the same thing. Also: this implies skip_verify=True automatically!
+ :param bool skip_verify:
+ Whether NOT to do the default "make sure auth passed" check.
+ """
+ if init is None:
+ init = {}
+ if server_init is None:
+ server_init = {}
+ if client_init is None:
+ client_init = {}
+ if connect is None:
+ # No auth at all please
+ if defer:
+ connect = dict()
+ # Default username based auth
+ else:
+ connect = dict(username="slowdive", password="pygmalion")
+ socks = LoopSocket()
+ sockc = LoopSocket()
+ sockc.link(socks)
+ if transport_factory is None:
+ transport_factory = Transport
+ tc = transport_factory(sockc, **dict(init, **client_init))
+ ts = transport_factory(socks, **dict(init, **server_init))
+
+ if hostkey is None:
+ hostkey = RSAKey.from_private_key_file(_support("rsa.key"))
+ ts.add_server_key(hostkey)
+ event = threading.Event()
+ server = TestServer(allowed_keys=pubkeys)
+ assert not event.is_set()
+ assert not ts.is_active()
+ assert tc.get_username() is None
+ assert ts.get_username() is None
+ assert not tc.is_authenticated()
+ assert not ts.is_authenticated()
+
+ err = None
+ # Trap errors and yield instead of raising right away; otherwise callers
+ # cannot usefully deal with problems at connect time which stem from errors
+ # in the server side.
+ try:
+ ts.start_server(event, server)
+ tc.connect(**connect)
+
+ event.wait(1.0)
+ assert event.is_set()
+ assert ts.is_active()
+ assert tc.is_active()
+
+ except Exception as e:
+ if not catch_error:
+ raise
+ err = e
+
+ yield (tc, ts, err) if catch_error else (tc, ts)
+
+ if not (catch_error or skip_verify):
+ assert ts.is_authenticated()
+ assert tc.is_authenticated()
+
+ tc.close()
+ ts.close()
+ socks.close()
+ sockc.close()
diff --git a/tests/agent.py b/tests/agent.py
new file mode 100644
index 00000000..bcbfb216
--- /dev/null
+++ b/tests/agent.py
@@ -0,0 +1,151 @@
+from unittest.mock import Mock
+
+from pytest import mark, raises
+
+from paramiko import AgentKey, Message, RSAKey
+from paramiko.agent import (
+ SSH2_AGENT_SIGN_RESPONSE,
+ SSH_AGENT_RSA_SHA2_256,
+ SSH_AGENT_RSA_SHA2_512,
+ cSSH2_AGENTC_SIGN_REQUEST,
+)
+
+from ._util import _support
+
+
+# AgentKey with no inner_key
+class _BareAgentKey(AgentKey):
+ def __init__(self, name, blob):
+ self.name = name
+ self.blob = blob
+ self.inner_key = None
+
+
+class AgentKey_:
+ def str_is_repr(self):
+ # Tests for a missed spot in Python 3 upgrades: AgentKey.__str__ was
+ # returning bytes, as if under Python 2. When bug present, this
+ # explodes with "__str__ returned non-string".
+ key = AgentKey(None, b"secret!!!")
+ assert str(key) == repr(key)
+
+ class init:
+ def needs_at_least_two_arguments(self):
+ with raises(TypeError):
+ AgentKey()
+ with raises(TypeError):
+ AgentKey(None)
+
+ def sets_attributes_and_parses_blob(self):
+ agent = Mock()
+ blob = Message()
+ blob.add_string("bad-type")
+ key = AgentKey(agent=agent, blob=bytes(blob))
+ assert key.agent is agent
+ assert key.name == "bad-type"
+ assert key.blob == bytes(blob)
+ assert key.comment == "" # default
+ # TODO: logger testing
+ assert key.inner_key is None # no 'bad-type' algorithm
+
+ def comment_optional(self):
+ blob = Message()
+ blob.add_string("bad-type")
+ key = AgentKey(agent=Mock(), blob=bytes(blob), comment="hi!")
+ assert key.comment == "hi!"
+
+ def sets_inner_key_when_known_type(self, keys):
+ key = AgentKey(agent=Mock(), blob=bytes(keys.pkey))
+ assert key.inner_key == keys.pkey
+
+ class fields:
+ def defaults_to_get_name_and_blob(self):
+ key = _BareAgentKey(name="lol", blob=b"lmao")
+ assert key._fields == ["lol", b"lmao"]
+
+ # TODO: pytest-relaxed is buggy (now?), this shows up under get_bits?
+ def defers_to_inner_key_when_present(self, keys):
+ key = AgentKey(agent=None, blob=keys.pkey.asbytes())
+ assert key._fields == keys.pkey._fields
+ assert key == keys.pkey
+
+ class get_bits:
+ def defaults_to_superclass_implementation(self):
+ # TODO 4.0: assert raises NotImplementedError like changed parent?
+ assert _BareAgentKey(None, None).get_bits() == 0
+
+ def defers_to_inner_key_when_present(self, keys):
+ key = AgentKey(agent=None, blob=keys.pkey.asbytes())
+ assert key.get_bits() == keys.pkey.get_bits()
+
+ class asbytes:
+ def defaults_to_owned_blob(self):
+ blob = Mock()
+ assert _BareAgentKey(name=None, blob=blob).asbytes() is blob
+
+ def defers_to_inner_key_when_present(self, keys):
+ key = AgentKey(agent=None, blob=keys.pkey_with_cert.asbytes())
+ # Artificially make outer key blob != inner key blob; comment in
+ # AgentKey.asbytes implies this can sometimes really happen but I
+ # no longer recall when that could be?
+ key.blob = b"nope"
+ assert key.asbytes() == key.inner_key.asbytes()
+
+ @mark.parametrize(
+ "sign_kwargs,expected_flag",
+ [
+ # No algorithm kwarg: no flags (bitfield -> 0 int)
+ (dict(), 0),
+ (dict(algorithm="rsa-sha2-256"), SSH_AGENT_RSA_SHA2_256),
+ (dict(algorithm="rsa-sha2-512"), SSH_AGENT_RSA_SHA2_512),
+ # TODO: ideally we only send these when key is a cert,
+ # but it doesn't actually break when not; meh. Really just wants
+ # all the parameterization of this test rethought.
+ (
+ dict(algorithm="rsa-sha2-256-cert-v01@openssh.com"),
+ SSH_AGENT_RSA_SHA2_256,
+ ),
+ (
+ dict(algorithm="rsa-sha2-512-cert-v01@openssh.com"),
+ SSH_AGENT_RSA_SHA2_512,
+ ),
+ ],
+ )
+ def signing_data(self, sign_kwargs, expected_flag):
+ class FakeAgent:
+ def _send_message(self, msg):
+ # The thing we actually care most about, we're not testing
+ # ssh-agent itself here
+ self._sent_message = msg
+ sig = Message()
+ sig.add_string("lol")
+ sig.rewind()
+ return SSH2_AGENT_SIGN_RESPONSE, sig
+
+ for do_cert in (False, True):
+ agent = FakeAgent()
+ # Get key kinda like how a real agent would give it to us - if
+ # cert, it'd be the entire public blob, not just the pubkey. This
+ # ensures the code under test sends _just the pubkey part_ back to
+ # the agent during signature requests (bug was us sending _the
+ # entire cert blob_, which somehow "worked ok" but always got us
+ # SHA1)
+ # NOTE: using lower level loader to avoid auto-cert-load when
+ # testing regular key (agents expose them separately)
+ inner_key = RSAKey.from_private_key_file(_support("rsa.key"))
+ blobby = inner_key.asbytes()
+ # NOTE: expected key blob always wants to be the real key, even
+ # when the "key" is a certificate.
+ expected_request_key_blob = blobby
+ if do_cert:
+ inner_key.load_certificate(_support("rsa.key-cert.pub"))
+ blobby = inner_key.public_blob.key_blob
+ key = AgentKey(agent, blobby)
+ result = key.sign_ssh_data(b"data-to-sign", **sign_kwargs)
+ assert result == b"lol"
+ msg = agent._sent_message
+ msg.rewind()
+ assert msg.get_byte() == cSSH2_AGENTC_SIGN_REQUEST
+ assert msg.get_string() == expected_request_key_blob
+ assert msg.get_string() == b"data-to-sign"
+ assert msg.get_int() == expected_flag
diff --git a/tests/auth.py b/tests/auth.py
new file mode 100644
index 00000000..c0afe889
--- /dev/null
+++ b/tests/auth.py
@@ -0,0 +1,580 @@
+"""
+Tests focusing primarily on the authentication step.
+
+Thus, they concern AuthHandler and AuthStrategy, with a side of Transport.
+"""
+
+from logging import Logger
+from unittest.mock import Mock
+
+from pytest import raises
+
+from paramiko import (
+ AgentKey,
+ AuthenticationException,
+ AuthFailure,
+ AuthResult,
+ AuthSource,
+ AuthStrategy,
+ BadAuthenticationType,
+ DSSKey,
+ InMemoryPrivateKey,
+ NoneAuth,
+ OnDiskPrivateKey,
+ Password,
+ PrivateKey,
+ PKey,
+ RSAKey,
+ SSHException,
+ ServiceRequestingTransport,
+ SourceResult,
+)
+
+from ._util import (
+ _disable_sha1_pubkey,
+ _disable_sha2,
+ _disable_sha2_pubkey,
+ _support,
+ requires_sha1_signing,
+ server,
+ unicodey,
+)
+
+
+class AuthHandler_:
+ """
+ Most of these tests are explicit about the auth method they call.
+
+ This is because not too many other tests do so (they rely on the implicit
+ auth trigger of various connect() kwargs).
+ """
+
+ def bad_auth_type(self):
+ """
+ verify that we get the right exception when an unsupported auth
+ type is requested.
+ """
+ # Server won't allow password auth for this user, so should fail
+ # and return just publickey allowed types
+ with server(
+ connect=dict(username="unknown", password="error"),
+ catch_error=True,
+ ) as (_, _, err):
+ assert isinstance(err, BadAuthenticationType)
+ assert err.allowed_types == ["publickey"]
+
+ def bad_password(self):
+ """
+ verify that a bad password gets the right exception, and that a retry
+ with the right password works.
+ """
+ # NOTE: Transport.connect doesn't do any auth upfront if no userauth
+ # related kwargs given.
+ with server(defer=True) as (tc, ts):
+ # Auth once, badly
+ with raises(AuthenticationException):
+ tc.auth_password(username="slowdive", password="error")
+ # And again, correctly
+ tc.auth_password(username="slowdive", password="pygmalion")
+
+ def multipart_auth(self):
+ """
+ verify that multipart auth works.
+ """
+ with server(defer=True) as (tc, ts):
+ assert tc.auth_password(
+ username="paranoid", password="paranoid"
+ ) == ["publickey"]
+ key = DSSKey.from_private_key_file(_support("dss.key"))
+ assert tc.auth_publickey(username="paranoid", key=key) == []
+
+ def interactive_auth(self):
+ """
+ verify keyboard-interactive auth works.
+ """
+
+ def handler(title, instructions, prompts):
+ self.got_title = title
+ self.got_instructions = instructions
+ self.got_prompts = prompts
+ return ["cat"]
+
+ with server(defer=True) as (tc, ts):
+ assert tc.auth_interactive("commie", handler) == []
+ assert self.got_title == "password"
+ assert self.got_prompts == [("Password", False)]
+
+ def interactive_fallback(self):
+ """
+ verify that a password auth attempt will fallback to "interactive"
+ if password auth isn't supported but interactive is.
+ """
+ with server(defer=True) as (tc, ts):
+ # This username results in an allowed_auth of just kbd-int,
+ # and has a configured interactive->response on the server.
+ assert tc.auth_password("commie", "cat") == []
+
+ def utf8(self):
+ """
+ verify that utf-8 encoding happens in authentication.
+ """
+ with server(defer=True) as (tc, ts):
+ assert tc.auth_password("utf8", unicodey) == []
+
+ def non_utf8(self):
+ """
+ verify that non-utf-8 encoded passwords can be used for broken
+ servers.
+ """
+ with server(defer=True) as (tc, ts):
+ assert tc.auth_password("non-utf8", "\xff") == []
+
+ def auth_exception_when_disconnected(self):
+ """
+ verify that we catch a server disconnecting during auth, and report
+ it as an auth failure.
+ """
+ with server(defer=True, skip_verify=True) as (tc, ts), raises(
+ AuthenticationException
+ ):
+ tc.auth_password("bad-server", "hello")
+
+ def non_responsive_triggers_auth_exception(self):
+ """
+ verify that authentication times out if server takes to long to
+ respond (or never responds).
+ """
+ with server(defer=True, skip_verify=True) as (tc, ts), raises(
+ AuthenticationException
+ ) as info:
+ tc.auth_timeout = 1 # 1 second, to speed up test
+ tc.auth_password("unresponsive-server", "hello")
+ assert "Authentication timeout" in str(info.value)
+
+
+class AuthOnlyHandler_:
+ def _server(self, *args, **kwargs):
+ kwargs.setdefault("transport_factory", ServiceRequestingTransport)
+ return server(*args, **kwargs)
+
+ class fallback_pubkey_algorithm:
+ @requires_sha1_signing
+ def key_type_algo_selected_when_no_server_sig_algs(self):
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
+ # Server pretending to be an apparently common setup:
+ # - doesn't support (or have enabled) sha2
+ # - also doesn't support (or have enabled) server-sig-algs/ext-info
+ # This is the scenario in which Paramiko has to guess-the-algo, and
+ # where servers that don't support sha2 or server-sig-algs can give
+ # us trouble.
+ server_init = dict(_disable_sha2_pubkey, server_sig_algs=False)
+ with self._server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ server_init=server_init,
+ catch_error=True,
+ ) as (tc, ts, err):
+ # Auth did work
+ assert tc.is_authenticated()
+ # Selected ssh-rsa, instead of first-in-the-list (rsa-sha2-512)
+ assert tc._agreed_pubkey_algorithm == "ssh-rsa"
+
+ @requires_sha1_signing
+ def key_type_algo_selection_is_cert_suffix_aware(self):
+ # This key has a cert next to it, which should trigger cert-aware
+ # loading within key classes.
+ privkey = PKey.from_path(_support("rsa.key"))
+ server_init = dict(_disable_sha2_pubkey, server_sig_algs=False)
+ with self._server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ server_init=server_init,
+ catch_error=True,
+ ) as (tc, ts, err):
+ assert not err
+ # Auth did work
+ assert tc.is_authenticated()
+ # Selected expected cert type
+ assert (
+ tc._agreed_pubkey_algorithm
+ == "ssh-rsa-cert-v01@openssh.com"
+ )
+
+ @requires_sha1_signing
+ def uses_first_preferred_algo_if_key_type_not_in_list(self):
+ # This is functionally the same as legacy AuthHandler, just
+ # arriving at the same place in a different manner.
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
+ server_init = dict(_disable_sha2_pubkey, server_sig_algs=False)
+ with self._server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ server_init=server_init,
+ client_init=_disable_sha1_pubkey, # no ssh-rsa
+ catch_error=True,
+ ) as (tc, ts, err):
+ assert not tc.is_authenticated()
+ assert isinstance(err, AuthenticationException)
+ assert tc._agreed_pubkey_algorithm == "rsa-sha2-512"
+
+
+class SHA2SignaturePubkeys:
+ def pubkey_auth_honors_disabled_algorithms(self):
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
+ with server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ init=dict(
+ disabled_algorithms=dict(
+ pubkeys=["ssh-rsa", "rsa-sha2-256", "rsa-sha2-512"]
+ )
+ ),
+ catch_error=True,
+ ) as (_, _, err):
+ assert isinstance(err, SSHException)
+ assert "no RSA pubkey algorithms" in str(err)
+
+ def client_sha2_disabled_server_sha1_disabled_no_match(self):
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
+ with server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ client_init=_disable_sha2_pubkey,
+ server_init=_disable_sha1_pubkey,
+ catch_error=True,
+ ) as (tc, ts, err):
+ assert isinstance(err, AuthenticationException)
+
+ def client_sha1_disabled_server_sha2_disabled_no_match(self):
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
+ with server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ client_init=_disable_sha1_pubkey,
+ server_init=_disable_sha2_pubkey,
+ catch_error=True,
+ ) as (tc, ts, err):
+ assert isinstance(err, AuthenticationException)
+
+ @requires_sha1_signing
+ def ssh_rsa_still_used_when_sha2_disabled(self):
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
+ # NOTE: this works because key obj comparison uses public bytes
+ # TODO: would be nice for PKey to grow a legit "give me another obj of
+ # same class but just the public bits" using asbytes()
+ with server(
+ pubkeys=[privkey], connect=dict(pkey=privkey), init=_disable_sha2
+ ) as (tc, _):
+ assert tc.is_authenticated()
+
+ @requires_sha1_signing
+ def first_client_preferred_algo_used_when_no_server_sig_algs(self):
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
+ # Server pretending to be an apparently common setup:
+ # - doesn't support (or have enabled) sha2
+ # - also doesn't support (or have enabled) server-sig-algs/ext-info
+ # This is the scenario in which Paramiko has to guess-the-algo, and
+ # where servers that don't support sha2 or server-sig-algs give us
+ # trouble.
+ server_init = dict(_disable_sha2_pubkey, server_sig_algs=False)
+ with server(
+ pubkeys=[privkey],
+ connect=dict(username="slowdive", pkey=privkey),
+ server_init=server_init,
+ catch_error=True,
+ ) as (tc, ts, err):
+ assert not tc.is_authenticated()
+ assert isinstance(err, AuthenticationException)
+ # Oh no! this isn't ssh-rsa, and our server doesn't support sha2!
+ assert tc._agreed_pubkey_algorithm == "rsa-sha2-512"
+
+ def sha2_512(self):
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
+ with server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ init=dict(
+ disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-256"])
+ ),
+ ) as (tc, ts):
+ assert tc.is_authenticated()
+ assert tc._agreed_pubkey_algorithm == "rsa-sha2-512"
+
+ def sha2_256(self):
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
+ with server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ init=dict(
+ disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"])
+ ),
+ ) as (tc, ts):
+ assert tc.is_authenticated()
+ assert tc._agreed_pubkey_algorithm == "rsa-sha2-256"
+
+ def sha2_256_when_client_only_enables_256(self):
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
+ with server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ # Client-side only; server still accepts all 3.
+ client_init=dict(
+ disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"])
+ ),
+ ) as (tc, ts):
+ assert tc.is_authenticated()
+ assert tc._agreed_pubkey_algorithm == "rsa-sha2-256"
+
+
+class AuthSource_:
+ class base_class:
+ def init_requires_and_saves_username(self):
+ with raises(TypeError):
+ AuthSource()
+ assert AuthSource(username="foo").username == "foo"
+
+ def dunder_repr_delegates_to_helper(self):
+ source = AuthSource("foo")
+ source._repr = Mock(wraps=lambda: "whatever")
+ repr(source)
+ source._repr.assert_called_once_with()
+
+ def repr_helper_prints_basic_kv_pairs(self):
+ assert repr(AuthSource("foo")) == "AuthSource()"
+ assert (
+ AuthSource("foo")._repr(bar="open") == "AuthSource(bar='open')"
+ )
+
+ def authenticate_takes_transport_and_is_abstract(self):
+ # TODO: this test kinda just goes away once we're typed?
+ with raises(TypeError):
+ AuthSource("foo").authenticate()
+ with raises(NotImplementedError):
+ AuthSource("foo").authenticate(None)
+
+ class NoneAuth_:
+ def authenticate_auths_none(self):
+ trans = Mock()
+ result = NoneAuth("foo").authenticate(trans)
+ trans.auth_none.assert_called_once_with("foo")
+ assert result is trans.auth_none.return_value
+
+ def repr_shows_class(self):
+ assert repr(NoneAuth("foo")) == "NoneAuth()"
+
+ class Password_:
+ def init_takes_and_stores_password_getter(self):
+ with raises(TypeError):
+ Password("foo")
+ getter = Mock()
+ pw = Password("foo", password_getter=getter)
+ assert pw.password_getter is getter
+
+ def repr_adds_username(self):
+ pw = Password("foo", password_getter=Mock())
+ assert repr(pw) == "Password(user='foo')"
+
+ def authenticate_gets_and_supplies_password(self):
+ getter = Mock(return_value="bar")
+ trans = Mock()
+ pw = Password("foo", password_getter=getter)
+ result = pw.authenticate(trans)
+ trans.auth_password.assert_called_once_with("foo", "bar")
+ assert result is trans.auth_password.return_value
+
+ class PrivateKey_:
+ def authenticate_calls_publickey_with_pkey(self):
+ source = PrivateKey(username="foo")
+ source.pkey = Mock() # set by subclasses
+ trans = Mock()
+ result = source.authenticate(trans)
+ trans.auth_publickey.assert_called_once_with("foo", source.pkey)
+ assert result is trans.auth_publickey.return_value
+
+ class InMemoryPrivateKey_:
+ def init_takes_pkey_object(self):
+ with raises(TypeError):
+ InMemoryPrivateKey("foo")
+ pkey = Mock()
+ source = InMemoryPrivateKey(username="foo", pkey=pkey)
+ assert source.pkey is pkey
+
+ def repr_shows_pkey_repr(self):
+ pkey = PKey.from_path(_support("ed25519.key"))
+ source = InMemoryPrivateKey("foo", pkey)
+ assert (
+ repr(source)
+ == "InMemoryPrivateKey(pkey=PKey(alg=ED25519, bits=256, fp=SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow))" # noqa
+ )
+
+ def repr_appends_agent_flag_when_AgentKey(self):
+ real_key = PKey.from_path(_support("ed25519.key"))
+ pkey = AgentKey(agent=None, blob=bytes(real_key))
+ source = InMemoryPrivateKey("foo", pkey)
+ assert (
+ repr(source)
+ == "InMemoryPrivateKey(pkey=PKey(alg=ED25519, bits=256, fp=SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow)) [agent]" # noqa
+ )
+
+ class OnDiskPrivateKey_:
+ def init_takes_source_path_and_pkey(self):
+ with raises(TypeError):
+ OnDiskPrivateKey("foo")
+ with raises(TypeError):
+ OnDiskPrivateKey("foo", "bar")
+ with raises(TypeError):
+ OnDiskPrivateKey("foo", "bar", "biz")
+ source = OnDiskPrivateKey(
+ username="foo",
+ source="ssh-config",
+ path="of-exile",
+ pkey="notreally",
+ )
+ assert source.username == "foo"
+ assert source.source == "ssh-config"
+ assert source.path == "of-exile"
+ assert source.pkey == "notreally"
+
+ def init_requires_specific_value_for_source(self):
+ with raises(
+ ValueError,
+ match=r"source argument must be one of: \('ssh-config', 'python-config', 'implicit-home'\)", # noqa
+ ):
+ OnDiskPrivateKey("foo", source="what?", path="meh", pkey="no")
+
+ def repr_reflects_source_path_and_pkey(self):
+ source = OnDiskPrivateKey(
+ username="foo",
+ source="ssh-config",
+ path="of-exile",
+ pkey="notreally",
+ )
+ assert (
+ repr(source)
+ == "OnDiskPrivateKey(key='notreally', source='ssh-config', path='of-exile')" # noqa
+ )
+
+
+class AuthResult_:
+ def setup_method(self):
+ self.strat = AuthStrategy(None)
+
+ def acts_like_list_with_strategy_attribute(self):
+ with raises(TypeError):
+ AuthResult()
+ # kwarg works by itself
+ AuthResult(strategy=self.strat)
+ # or can be given as posarg w/ regular list() args after
+ result = AuthResult(self.strat, [1, 2, 3])
+ assert result.strategy is self.strat
+ assert result == [1, 2, 3]
+ assert isinstance(result, list)
+
+ def repr_is_list_repr_untouched(self):
+ result = AuthResult(self.strat, [1, 2, 3])
+ assert repr(result) == "[1, 2, 3]"
+
+ class dunder_str:
+ def is_multiline_display_of_sourceresult_tuples(self):
+ result = AuthResult(self.strat)
+ result.append(SourceResult("foo", "bar"))
+ result.append(SourceResult("biz", "baz"))
+ assert str(result) == "foo -> bar\nbiz -> baz"
+
+ def shows_str_not_repr_of_auth_source_and_result(self):
+ result = AuthResult(self.strat)
+ result.append(
+ SourceResult(NoneAuth("foo"), ["password", "pubkey"])
+ )
+ assert str(result) == "NoneAuth() -> ['password', 'pubkey']"
+
+ def empty_list_result_values_show_success_string(self):
+ result = AuthResult(self.strat)
+ result.append(SourceResult(NoneAuth("foo"), []))
+ assert str(result) == "NoneAuth() -> success"
+
+
+class AuthFailure_:
+ def is_an_AuthenticationException(self):
+ assert isinstance(AuthFailure(None), AuthenticationException)
+
+ def init_requires_result(self):
+ with raises(TypeError):
+ AuthFailure()
+ result = AuthResult(None)
+ fail = AuthFailure(result=result)
+ assert fail.result is result
+
+ def str_is_newline_plus_result_str(self):
+ result = AuthResult(None)
+ result.append(SourceResult(NoneAuth("foo"), Exception("onoz")))
+ fail = AuthFailure(result)
+ assert str(fail) == "\nNoneAuth() -> onoz"
+
+
+class AuthStrategy_:
+ def init_requires_ssh_config_param_and_sets_up_a_logger(self):
+ with raises(TypeError):
+ AuthStrategy()
+ conf = object()
+ strat = AuthStrategy(ssh_config=conf)
+ assert strat.ssh_config is conf
+ assert isinstance(strat.log, Logger)
+ assert strat.log.name == "paramiko.auth_strategy"
+
+ def get_sources_is_abstract(self):
+ with raises(NotImplementedError):
+ AuthStrategy(None).get_sources()
+
+ class authenticate:
+ def setup_method(self):
+ self.strat = AuthStrategy(None) # ssh_config not used directly
+ self.source, self.transport = NoneAuth(None), Mock()
+ self.source.authenticate = Mock()
+ self.strat.get_sources = Mock(return_value=[self.source])
+
+ def requires_and_uses_transport_with_methods_returning_result(self):
+ with raises(TypeError):
+ self.strat.authenticate()
+ result = self.strat.authenticate(self.transport)
+ self.strat.get_sources.assert_called_once_with()
+ self.source.authenticate.assert_called_once_with(self.transport)
+ assert isinstance(result, AuthResult)
+ assert result.strategy is self.strat
+ assert len(result) == 1
+ source_res = result[0]
+ assert isinstance(source_res, SourceResult)
+ assert source_res.source is self.source
+ assert source_res.result is self.source.authenticate.return_value
+
+ def logs_sources_attempted(self):
+ self.strat.log = Mock()
+ self.strat.authenticate(self.transport)
+ self.strat.log.debug.assert_called_once_with("Trying NoneAuth()")
+
+ def raises_AuthFailure_if_no_successes(self):
+ self.strat.log = Mock()
+ oops = Exception("onoz")
+ self.source.authenticate.side_effect = oops
+ with raises(AuthFailure) as info:
+ self.strat.authenticate(self.transport)
+ result = info.value.result
+ assert isinstance(result, AuthResult)
+ assert len(result) == 1
+ source_res = result[0]
+ assert isinstance(source_res, SourceResult)
+ assert source_res.source is self.source
+ assert source_res.result is oops
+ self.strat.log.info.assert_called_once_with(
+ "Authentication via NoneAuth() failed with Exception"
+ )
+
+ def short_circuits_on_successful_auth(self):
+ kaboom = Mock(authenticate=Mock(side_effect=Exception("onoz")))
+ self.strat.get_sources.return_value = [self.source, kaboom]
+ result = self.strat.authenticate(self.transport)
+ # No exception, and it's just a regular ol Result
+ assert isinstance(result, AuthResult)
+ # And it did not capture any attempt to execute the 2nd source
+ assert len(result) == 1
+ assert result[0].source is self.source
diff --git a/tests/conftest.py b/tests/conftest.py
index b28d2a17..12b97283 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -2,17 +2,30 @@ import logging
import os
import shutil
import threading
+from pathlib import Path
+
+from invoke.vendor.lexicon import Lexicon
import pytest
-from paramiko import RSAKey, SFTPServer, SFTP, Transport
+from paramiko import (
+ SFTPServer,
+ SFTP,
+ Transport,
+ DSSKey,
+ RSAKey,
+ Ed25519Key,
+ ECDSAKey,
+ PKey,
+)
-from .loop import LoopSocket
-from .stub_sftp import StubServer, StubSFTPServer
-from .util import _support
+from ._loop import LoopSocket
+from ._stub_sftp import StubServer, StubSFTPServer
+from ._util import _support
from icecream import ic, install as install_ic
+# Better print() for debugging - use ic()!
install_ic()
ic.configureOutput(includeContext=True)
@@ -68,10 +81,11 @@ def sftp_server():
socks = LoopSocket()
sockc = LoopSocket()
sockc.link(socks)
+ # TODO: reuse with new server fixture if possible
tc = Transport(sockc)
ts = Transport(socks)
# Auth
- host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ host_key = RSAKey.from_private_key_file(_support("rsa.key"))
ts.add_server_key(host_key)
# Server setup
event = threading.Event()
@@ -103,3 +117,54 @@ def sftp(sftp_server):
yield client
# Clean up - as in make_sftp_folder, we assume local-only exec for now.
shutil.rmtree(client.FOLDER, ignore_errors=True)
+
+
+key_data = [
+ ["ssh-rsa", RSAKey, "SHA256:OhNL391d/beeFnxxg18AwWVYTAHww+D4djEE7Co0Yng"],
+ ["ssh-dss", DSSKey, "SHA256:uHwwykG099f4M4kfzvFpKCTino0/P03DRbAidpAmPm0"],
+ [
+ "ssh-ed25519",
+ Ed25519Key,
+ "SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow",
+ ],
+ [
+ "ecdsa-sha2-nistp256",
+ ECDSAKey,
+ "SHA256:BrQG04oNKUETjKCeL4ifkARASg3yxS/pUHl3wWM26Yg",
+ ],
+]
+for datum in key_data:
+ # Add true first member with human-facing short algo name
+ short = datum[0].replace("ssh-", "").replace("sha2-nistp", "")
+ datum.insert(0, short)
+
+
+@pytest.fixture(scope="session", params=key_data, ids=lambda x: x[0])
+def keys(request):
+ """
+ Yield an object for each known type of key, with attributes:
+
+ - ``short_type``: short identifier, eg ``rsa`` or ``ecdsa-256``
+ - ``full_type``: the "message style" key identifier, eg ``ssh-rsa``, or
+ ``ecdsa-sha2-nistp256``.
+ - ``path``: a pathlib Path object to the fixture key file
+ - ``pkey``: PKey object, which may or may not also have a cert loaded
+ - ``expected_fp``: the expected fingerprint of said key
+ """
+ short_type, key_type, key_class, fingerprint = request.param
+ bag = Lexicon()
+ bag.short_type = short_type
+ bag.full_type = key_type
+ bag.path = Path(_support(f"{short_type}.key"))
+ with bag.path.open() as fd:
+ bag.pkey = key_class.from_private_key(fd)
+ # Second copy for things like equality-but-not-identity testing
+ with bag.path.open() as fd:
+ bag.pkey2 = key_class.from_private_key(fd)
+ bag.expected_fp = fingerprint
+ # Also tack on the cert-bearing variant for some tests
+ cert = bag.path.with_suffix(".key-cert.pub")
+ bag.pkey_with_cert = PKey.from_path(cert) if cert.exists() else None
+ # Safety checks
+ assert bag.pkey.fingerprint == fingerprint
+ yield bag
diff --git a/tests/pkey.py b/tests/pkey.py
new file mode 100644
index 00000000..691fda0f
--- /dev/null
+++ b/tests/pkey.py
@@ -0,0 +1,229 @@
+from pathlib import Path
+from unittest.mock import patch, call
+
+from pytest import raises
+
+from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey
+from paramiko import (
+ DSSKey,
+ ECDSAKey,
+ Ed25519Key,
+ Message,
+ PKey,
+ PublicBlob,
+ RSAKey,
+ UnknownKeyType,
+)
+
+from ._util import _support
+
+
+class PKey_:
+ # NOTE: this is incidentally tested by a number of other tests, such as the
+ # agent.py test suite
+ class from_type_string:
+ def loads_from_type_and_bytes(self, keys):
+ obj = PKey.from_type_string(keys.full_type, keys.pkey.asbytes())
+ assert obj == keys.pkey
+
+ # TODO: exceptions
+ #
+ # TODO: passphrase? OTOH since this is aimed at the agent...irrelephant
+
+ class from_path:
+ def loads_from_Path(self, keys):
+ obj = PKey.from_path(keys.path)
+ assert obj == keys.pkey
+
+ def loads_from_str(self):
+ key = PKey.from_path(str(_support("rsa.key")))
+ assert isinstance(key, RSAKey)
+
+ @patch("paramiko.pkey.Path")
+ def expands_user(self, mPath):
+ # real key for guts that want a real key format
+ mykey = Path(_support("rsa.key"))
+ pathy = mPath.return_value.expanduser.return_value
+ # read_bytes for cryptography.io's loaders
+ pathy.read_bytes.return_value = mykey.read_bytes()
+ # open() for our own class loader
+ pathy.open.return_value = mykey.open()
+ # fake out exists() to avoid attempts to load cert
+ pathy.exists.return_value = False
+ PKey.from_path("whatever") # we're not testing expanduser itself
+ # Both key and cert paths
+ mPath.return_value.expanduser.assert_has_calls([call(), call()])
+
+ def raises_UnknownKeyType_for_unknown_types(self):
+ # I.e. a real, becomes a useful object via cryptography.io, key
+ # class that we do NOT support. Chose Ed448 randomly as OpenSSH
+ # doesn't seem to support it either, going by ssh-keygen...
+ keypath = _support("ed448.key")
+ with raises(UnknownKeyType) as exc:
+ PKey.from_path(keypath)
+ assert issubclass(exc.value.key_type, Ed448PrivateKey)
+ with open(keypath, "rb") as fd:
+ assert exc.value.key_bytes == fd.read()
+
+ def leaves_cryptography_exceptions_untouched(self):
+ # a Python file is not a private key!
+ with raises(ValueError):
+ PKey.from_path(__file__)
+
+ # TODO: passphrase support tested
+
+ class automatically_loads_certificates:
+ def existing_cert_loaded_when_given_key_path(self):
+ key = PKey.from_path(_support("rsa.key"))
+ # Public blob exists despite no .load_certificate call
+ assert key.public_blob is not None
+ assert (
+ key.public_blob.key_type == "ssh-rsa-cert-v01@openssh.com"
+ )
+ # And it's definitely the one we expected
+ assert key.public_blob == PublicBlob.from_file(
+ _support("rsa.key-cert.pub")
+ )
+
+ def can_be_given_cert_path_instead(self):
+ key = PKey.from_path(_support("rsa.key-cert.pub"))
+ # It's still a key, not a PublicBlob
+ assert isinstance(key, RSAKey)
+ # Public blob exists despite no .load_certificate call
+ assert key.public_blob is not None
+ assert (
+ key.public_blob.key_type == "ssh-rsa-cert-v01@openssh.com"
+ )
+ # And it's definitely the one we expected
+ assert key.public_blob == PublicBlob.from_file(
+ _support("rsa.key-cert.pub")
+ )
+
+ def no_cert_load_if_no_cert(self):
+ # This key exists (it's a copy of the regular one) but has no
+ # matching -cert.pub
+ key = PKey.from_path(_support("rsa-lonely.key"))
+ assert key.public_blob is None
+
+ def excepts_usefully_if_no_key_only_cert(self):
+ # TODO: is that truly an error condition? the cert is ~the
+ # pubkey and we still require the privkey for signing, yea?
+ # This cert exists (it's a copy of the regular one) but there's
+ # no rsa-missing.key to load.
+ with raises(FileNotFoundError) as info:
+ PKey.from_path(_support("rsa-missing.key-cert.pub"))
+ assert info.value.filename.endswith("rsa-missing.key")
+
+ class load_certificate:
+ def rsa_public_cert_blobs(self):
+ # Data to test signing with (arbitrary)
+ data = b"ice weasels"
+ # Load key w/o cert at first (so avoiding .from_path)
+ key = RSAKey.from_private_key_file(_support("rsa.key"))
+ assert key.public_blob is None
+ # Sign regular-style (using, arbitrarily, SHA2)
+ msg = key.sign_ssh_data(data, "rsa-sha2-256")
+ msg.rewind()
+ assert "rsa-sha2-256" == msg.get_text()
+ signed = msg.get_binary() # for comparison later
+
+ # Load cert and inspect its internals
+ key.load_certificate(_support("rsa.key-cert.pub"))
+ assert key.public_blob is not None
+ assert key.public_blob.key_type == "ssh-rsa-cert-v01@openssh.com"
+ assert key.public_blob.comment == "test_rsa.key.pub"
+ msg = Message(key.public_blob.key_blob)
+ # cert type
+ assert msg.get_text() == "ssh-rsa-cert-v01@openssh.com"
+ # nonce
+ msg.get_string()
+ # public numbers
+ assert msg.get_mpint() == key.public_numbers.e
+ assert msg.get_mpint() == key.public_numbers.n
+ # serial number
+ assert msg.get_int64() == 1234
+ # TODO: whoever wrote the OG tests didn't care about the remaining
+ # fields from
+ # https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.certkeys
+ # so neither do I, for now...
+
+ # Sign cert-style (still SHA256 - so this actually does almost
+ # exactly the same thing under the hood as the previous sign)
+ msg = key.sign_ssh_data(data, "rsa-sha2-256-cert-v01@openssh.com")
+ msg.rewind()
+ assert "rsa-sha2-256" == msg.get_text()
+ assert signed == msg.get_binary() # same signature as above
+ msg.rewind()
+ assert key.verify_ssh_sig(b"ice weasels", msg) # our data verified
+
+ def loading_cert_of_different_type_from_key_raises_ValueError(self):
+ edkey = Ed25519Key.from_private_key_file(_support("ed25519.key"))
+ err = "PublicBlob type ssh-rsa-cert-v01@openssh.com incompatible with key type ssh-ed25519" # noqa
+ with raises(ValueError, match=err):
+ edkey.load_certificate(_support("rsa.key-cert.pub"))
+
+ def fingerprint(self, keys):
+ # NOTE: Hardcoded fingerprint expectation stored in fixture.
+ assert keys.pkey.fingerprint == keys.expected_fp
+
+ def algorithm_name(self, keys):
+ key = keys.pkey
+ if isinstance(key, RSAKey):
+ assert key.algorithm_name == "RSA"
+ elif isinstance(key, DSSKey):
+ assert key.algorithm_name == "DSS"
+ elif isinstance(key, ECDSAKey):
+ assert key.algorithm_name == "ECDSA"
+ elif isinstance(key, Ed25519Key):
+ assert key.algorithm_name == "ED25519"
+ # TODO: corner case: AgentKey, whose .name can be cert-y (due to the
+ # value of the name field passed via agent protocol) and thus
+ # algorithm_name is eg "RSA-CERT" - keys loaded directly from disk will
+ # never look this way, even if they have a .public_blob attached.
+
+ class equality_and_hashing:
+ def same_key_is_equal_to_itself(self, keys):
+ assert keys.pkey == keys.pkey2
+
+ def same_key_same_hash(self, keys):
+ # NOTE: this isn't a great test due to hashseed randomization under
+ # Python 3 preventing use of static values, but it does still prove
+ # that __hash__ is implemented/doesn't explode & works across
+ # instances
+ assert hash(keys.pkey) == hash(keys.pkey2)
+
+ def keys_are_not_equal_to_other_types(self, keys):
+ for value in [None, True, ""]:
+ assert keys.pkey != value
+
+ class identifiers_classmethods:
+ def default_is_class_name_attribute(self):
+ # NOTE: not all classes _have_ this, only the ones that don't
+ # customize identifiers().
+ class MyKey(PKey):
+ name = "it me"
+
+ assert MyKey.identifiers() == ["it me"]
+
+ def rsa_is_all_combos_of_cert_and_sha_type(self):
+ assert RSAKey.identifiers() == [
+ "ssh-rsa",
+ "ssh-rsa-cert-v01@openssh.com",
+ "rsa-sha2-256",
+ "rsa-sha2-256-cert-v01@openssh.com",
+ "rsa-sha2-512",
+ "rsa-sha2-512-cert-v01@openssh.com",
+ ]
+
+ def dss_is_protocol_name(self):
+ assert DSSKey.identifiers() == ["ssh-dss"]
+
+ def ed25519_is_protocol_name(self):
+ assert Ed25519Key.identifiers() == ["ssh-ed25519"]
+
+ def ecdsa_is_all_curve_names(self):
+ assert ECDSAKey.identifiers() == [
+ "ecdsa-sha2-nistp256",
+ "ecdsa-sha2-nistp384",
+ "ecdsa-sha2-nistp521",
+ ]
diff --git a/tests/test_agent.py b/tests/test_agent.py
deleted file mode 100644
index 01602d0e..00000000
--- a/tests/test_agent.py
+++ /dev/null
@@ -1,50 +0,0 @@
-import unittest
-
-from paramiko.message import Message
-from paramiko.agent import (
- SSH2_AGENT_SIGN_RESPONSE,
- cSSH2_AGENTC_SIGN_REQUEST,
- SSH_AGENT_RSA_SHA2_256,
- SSH_AGENT_RSA_SHA2_512,
- AgentKey,
-)
-from paramiko.util import b
-
-
-class ChaosAgent:
- def _send_message(self, msg):
- self._sent_message = msg
- sig = Message()
- sig.add_string(b("lol"))
- sig.rewind()
- return SSH2_AGENT_SIGN_RESPONSE, sig
-
-
-class AgentTests(unittest.TestCase):
- def _sign_with_agent(self, kwargs, expectation):
- agent = ChaosAgent()
- key = AgentKey(agent, b("secret!!!"))
- result = key.sign_ssh_data(b("token"), **kwargs)
- assert result == b("lol")
- msg = agent._sent_message
- msg.rewind()
- assert msg.get_byte() == cSSH2_AGENTC_SIGN_REQUEST
- assert msg.get_string() == b("secret!!!")
- assert msg.get_string() == b("token")
- assert msg.get_int() == expectation
-
- def test_agent_signing_defaults_to_0_for_flags_field(self):
- # No algorithm kwarg at all
- self._sign_with_agent(kwargs=dict(), expectation=0)
-
- def test_agent_signing_is_2_for_SHA256(self):
- self._sign_with_agent(
- kwargs=dict(algorithm="rsa-sha2-256"),
- expectation=SSH_AGENT_RSA_SHA2_256,
- )
-
- def test_agent_signing_is_2_for_SHA512(self):
- self._sign_with_agent(
- kwargs=dict(algorithm="rsa-sha2-512"),
- expectation=SSH_AGENT_RSA_SHA2_512,
- )
diff --git a/tests/test_auth.py b/tests/test_auth.py
deleted file mode 100644
index 4a960deb..00000000
--- a/tests/test_auth.py
+++ /dev/null
@@ -1,272 +0,0 @@
-# Copyright (C) 2008 Robey Pointer <robeypointer@gmail.com>
-#
-# This file is part of paramiko.
-#
-# Paramiko is free software; you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation; either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
-# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
-# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-"""
-Some unit tests for authenticating over a Transport.
-"""
-
-import sys
-import threading
-import unittest
-from time import sleep
-
-from paramiko import (
- Transport,
- ServerInterface,
- RSAKey,
- DSSKey,
- BadAuthenticationType,
- InteractiveQuery,
- AuthenticationException,
-)
-from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL
-from paramiko.util import u
-
-from .loop import LoopSocket
-from .util import _support, slow
-
-
-_pwd = u("\u2022")
-
-
-class NullServer(ServerInterface):
- paranoid_did_password = False
- paranoid_did_public_key = False
- paranoid_key = DSSKey.from_private_key_file(_support("test_dss.key"))
-
- def get_allowed_auths(self, username):
- if username == "slowdive":
- return "publickey,password"
- if username == "paranoid":
- if (
- not self.paranoid_did_password
- and not self.paranoid_did_public_key
- ):
- return "publickey,password"
- elif self.paranoid_did_password:
- return "publickey"
- else:
- return "password"
- if username == "commie":
- return "keyboard-interactive"
- if username == "utf8":
- return "password"
- if username == "non-utf8":
- return "password"
- return "publickey"
-
- def check_auth_password(self, username, password):
- if (username == "slowdive") and (password == "pygmalion"):
- return AUTH_SUCCESSFUL
- if (username == "paranoid") and (password == "paranoid"):
- # 2-part auth (even openssh doesn't support this)
- self.paranoid_did_password = True
- if self.paranoid_did_public_key:
- return AUTH_SUCCESSFUL
- return AUTH_PARTIALLY_SUCCESSFUL
- if (username == "utf8") and (password == _pwd):
- return AUTH_SUCCESSFUL
- if (username == "non-utf8") and (password == "\xff"):
- return AUTH_SUCCESSFUL
- if username == "bad-server":
- raise Exception("Ack!")
- if username == "unresponsive-server":
- sleep(5)
- return AUTH_SUCCESSFUL
- return AUTH_FAILED
-
- def check_auth_publickey(self, username, key):
- if (username == "paranoid") and (key == self.paranoid_key):
- # 2-part auth
- self.paranoid_did_public_key = True
- if self.paranoid_did_password:
- return AUTH_SUCCESSFUL
- return AUTH_PARTIALLY_SUCCESSFUL
- return AUTH_FAILED
-
- def check_auth_interactive(self, username, submethods):
- if username == "commie":
- self.username = username
- return InteractiveQuery(
- "password", "Please enter a password.", ("Password", False)
- )
- return AUTH_FAILED
-
- def check_auth_interactive_response(self, responses):
- if self.username == "commie":
- if (len(responses) == 1) and (responses[0] == "cat"):
- return AUTH_SUCCESSFUL
- return AUTH_FAILED
-
-
-class AuthTest(unittest.TestCase):
- def setUp(self):
- self.socks = LoopSocket()
- self.sockc = LoopSocket()
- self.sockc.link(self.socks)
- self.tc = Transport(self.sockc)
- self.ts = Transport(self.socks)
-
- def tearDown(self):
- self.tc.close()
- self.ts.close()
- self.socks.close()
- self.sockc.close()
-
- def start_server(self):
- host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
- self.public_host_key = RSAKey(data=host_key.asbytes())
- self.ts.add_server_key(host_key)
- self.event = threading.Event()
- self.server = NullServer()
- self.assertTrue(not self.event.is_set())
- self.ts.start_server(self.event, self.server)
-
- def verify_finished(self):
- self.event.wait(1.0)
- self.assertTrue(self.event.is_set())
- self.assertTrue(self.ts.is_active())
-
- def test_bad_auth_type(self):
- """
- verify that we get the right exception when an unsupported auth
- type is requested.
- """
- self.start_server()
- try:
- self.tc.connect(
- hostkey=self.public_host_key,
- username="unknown",
- password="error",
- )
- self.assertTrue(False)
- except:
- etype, evalue, etb = sys.exc_info()
- self.assertEqual(BadAuthenticationType, etype)
- self.assertEqual(["publickey"], evalue.allowed_types)
-
- def test_bad_password(self):
- """
- verify that a bad password gets the right exception, and that a retry
- with the right password works.
- """
- self.start_server()
- self.tc.connect(hostkey=self.public_host_key)
- try:
- self.tc.auth_password(username="slowdive", password="error")
- self.assertTrue(False)
- except:
- etype, evalue, etb = sys.exc_info()
- self.assertTrue(issubclass(etype, AuthenticationException))
- self.tc.auth_password(username="slowdive", password="pygmalion")
- self.verify_finished()
-
- def test_multipart_auth(self):
- """
- verify that multipart auth works.
- """
- self.start_server()
- self.tc.connect(hostkey=self.public_host_key)
- remain = self.tc.auth_password(
- username="paranoid", password="paranoid"
- )
- self.assertEqual(["publickey"], remain)
- key = DSSKey.from_private_key_file(_support("test_dss.key"))
- remain = self.tc.auth_publickey(username="paranoid", key=key)
- self.assertEqual([], remain)
- self.verify_finished()
-
- def test_interactive_auth(self):
- """
- verify keyboard-interactive auth works.
- """
- self.start_server()
- self.tc.connect(hostkey=self.public_host_key)
-
- def handler(title, instructions, prompts):
- self.got_title = title
- self.got_instructions = instructions
- self.got_prompts = prompts
- return ["cat"]
-
- remain = self.tc.auth_interactive("commie", handler)
- self.assertEqual(self.got_title, "password")
- self.assertEqual(self.got_prompts, [("Password", False)])
- self.assertEqual([], remain)
- self.verify_finished()
-
- def test_interactive_auth_fallback(self):
- """
- verify that a password auth attempt will fallback to "interactive"
- if password auth isn't supported but interactive is.
- """
- self.start_server()
- self.tc.connect(hostkey=self.public_host_key)
- remain = self.tc.auth_password("commie", "cat")
- self.assertEqual([], remain)
- self.verify_finished()
-
- def test_auth_utf8(self):
- """
- verify that utf-8 encoding happens in authentication.
- """
- self.start_server()
- self.tc.connect(hostkey=self.public_host_key)
- remain = self.tc.auth_password("utf8", _pwd)
- self.assertEqual([], remain)
- self.verify_finished()
-
- def test_auth_non_utf8(self):
- """
- verify that non-utf-8 encoded passwords can be used for broken
- servers.
- """
- self.start_server()
- self.tc.connect(hostkey=self.public_host_key)
- remain = self.tc.auth_password("non-utf8", "\xff")
- self.assertEqual([], remain)
- self.verify_finished()
-
- def test_auth_gets_disconnected(self):
- """
- verify that we catch a server disconnecting during auth, and report
- it as an auth failure.
- """
- self.start_server()
- self.tc.connect(hostkey=self.public_host_key)
- try:
- self.tc.auth_password("bad-server", "hello")
- except:
- etype, evalue, etb = sys.exc_info()
- self.assertTrue(issubclass(etype, AuthenticationException))
-
- @slow
- def test_auth_non_responsive(self):
- """
- verify that authentication times out if server takes to long to
- respond (or never responds).
- """
- self.tc.auth_timeout = 1 # 1 second, to speed up test
- self.start_server()
- self.tc.connect()
- try:
- self.tc.auth_password("unresponsive-server", "hello")
- except:
- etype, evalue, etb = sys.exc_info()
- self.assertTrue(issubclass(etype, AuthenticationException))
- self.assertTrue("Authentication timeout" in str(evalue))
diff --git a/tests/test_client.py b/tests/test_client.py
index 21ecd72a..1c0c6c84 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -41,7 +41,7 @@ from paramiko import SSHClient
from paramiko.pkey import PublicBlob
from paramiko.ssh_exception import SSHException, AuthenticationException
-from .util import _support, requires_sha1_signing, slow
+from ._util import _support, requires_sha1_signing, slow
requires_gss_auth = unittest.skipUnless(
@@ -171,10 +171,10 @@ class ClientTest(unittest.TestCase):
self.ts = paramiko.Transport(self.socks)
if server_name is not None:
self.ts.local_version = server_name
- keypath = _support("test_rsa.key")
+ keypath = _support("rsa.key")
host_key = paramiko.RSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
- keypath = _support("test_ecdsa_256.key")
+ keypath = _support("ecdsa-256.key")
host_key = paramiko.ECDSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob)
@@ -194,9 +194,7 @@ class ClientTest(unittest.TestCase):
run_kwargs[key] = kwargs.pop(key, None)
# Server setup
threading.Thread(target=self._run, kwargs=run_kwargs).start()
- host_key = paramiko.RSAKey.from_private_key_file(
- _support("test_rsa.key")
- )
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
# Client setup
@@ -256,25 +254,25 @@ class SSHClientTest(ClientTest):
"""
verify that SSHClient works with a DSA key.
"""
- self._test_connection(key_filename=_support("test_dss.key"))
+ self._test_connection(key_filename=_support("dss.key"))
@requires_sha1_signing
def test_client_rsa(self):
"""
verify that SSHClient works with an RSA key.
"""
- self._test_connection(key_filename=_support("test_rsa.key"))
+ self._test_connection(key_filename=_support("rsa.key"))
@requires_sha1_signing
def test_client_ecdsa(self):
"""
verify that SSHClient works with an ECDSA key.
"""
- self._test_connection(key_filename=_support("test_ecdsa_256.key"))
+ self._test_connection(key_filename=_support("ecdsa-256.key"))
@requires_sha1_signing
def test_client_ed25519(self):
- self._test_connection(key_filename=_support("test_ed25519.key"))
+ self._test_connection(key_filename=_support("ed25519.key"))
@requires_sha1_signing
def test_multiple_key_files(self):
@@ -289,16 +287,17 @@ class SSHClientTest(ClientTest):
}
# Various combos of attempted & valid keys
# TODO: try every possible combo using itertools functions
+ # TODO: use new key(s) fixture(s)
for attempt, accept in (
(["rsa", "dss"], ["dss"]), # Original test #3
(["dss", "rsa"], ["dss"]), # Ordering matters sometimes, sadly
- (["dss", "rsa", "ecdsa_256"], ["dss"]), # Try ECDSA but fail
- (["rsa", "ecdsa_256"], ["ecdsa"]), # ECDSA success
+ (["dss", "rsa", "ecdsa-256"], ["dss"]), # Try ECDSA but fail
+ (["rsa", "ecdsa-256"], ["ecdsa"]), # ECDSA success
):
try:
self._test_connection(
key_filename=[
- _support("test_{}.key".format(x)) for x in attempt
+ _support("{}.key".format(x)) for x in attempt
],
allowed_keys=[types_[x] for x in accept],
)
@@ -318,7 +317,7 @@ class SSHClientTest(ClientTest):
self.assertRaises(
SSHException,
self._test_connection,
- key_filename=[_support("test_rsa.key")],
+ key_filename=[_support("rsa.key")],
allowed_keys=["ecdsa-sha2-nistp256"],
)
@@ -328,30 +327,26 @@ class SSHClientTest(ClientTest):
# They're similar except for which path is given; the expected auth and
# server-side behavior is 100% identical.)
# NOTE: only bothered whipping up one cert per overall class/family.
- for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"):
- cert_name = "test_{}.key-cert.pub".format(type_)
- cert_path = _support(os.path.join("cert_support", cert_name))
+ for type_ in ("rsa", "dss", "ecdsa-256", "ed25519"):
+ key_path = _support(f"{type_}.key")
self._test_connection(
- key_filename=cert_path,
- public_blob=PublicBlob.from_file(cert_path),
+ key_filename=key_path,
+ public_blob=PublicBlob.from_file(f"{key_path}-cert.pub"),
)
@requires_sha1_signing
def test_certs_implicitly_loaded_alongside_key_filename_keys(self):
- # NOTE: a regular test_connection() w/ test_rsa.key would incidentally
+ # NOTE: a regular test_connection() w/ rsa.key would incidentally
# test this (because test_xxx.key-cert.pub exists) but incidental tests
# stink, so NullServer and friends were updated to allow assertions
# about the server-side key object's public blob. Thus, we can prove
# that a specific cert was found, along with regular authorization
# succeeding proving that the overall flow works.
- for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"):
- key_name = "test_{}.key".format(type_)
- key_path = _support(os.path.join("cert_support", key_name))
+ for type_ in ("rsa", "dss", "ecdsa-256", "ed25519"):
+ key_path = _support(f"{type_}.key")
self._test_connection(
key_filename=key_path,
- public_blob=PublicBlob.from_file(
- "{}-cert.pub".format(key_path)
- ),
+ public_blob=PublicBlob.from_file(f"{key_path}-cert.pub"),
)
def _cert_algo_test(self, ver, alg):
@@ -360,9 +355,7 @@ class SSHClientTest(ClientTest):
self._test_connection(
# NOTE: SSHClient is able to take either the key or the cert & will
# set up its internals as needed
- key_filename=_support(
- os.path.join("cert_support", "test_rsa.key-cert.pub")
- ),
+ key_filename=_support("rsa.key-cert.pub"),
server_name="SSH-2.0-OpenSSH_{}".format(ver),
)
assert (
@@ -391,7 +384,7 @@ class SSHClientTest(ClientTest):
"""
threading.Thread(target=self._run).start()
hostname = f"[{self.addr}]:{self.port}"
- key_file = _support("test_ecdsa_256.key")
+ key_file = _support("ecdsa-256.key")
public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file)
self.tc = SSHClient()
@@ -414,9 +407,7 @@ class SSHClientTest(ClientTest):
"""
warnings.filterwarnings("ignore", "tempnam.*")
- host_key = paramiko.RSAKey.from_private_key_file(
- _support("test_rsa.key")
- )
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
fd, localname = mkstemp()
os.close(fd)
@@ -516,9 +507,7 @@ class SSHClientTest(ClientTest):
"""
# Start the thread with a 1 second wait.
threading.Thread(target=self._run, kwargs={"delay": 1}).start()
- host_key = paramiko.RSAKey.from_private_key_file(
- _support("test_rsa.key")
- )
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = SSHClient()
@@ -593,7 +582,7 @@ class SSHClientTest(ClientTest):
"""
Failed gssapi-keyex doesn't prevent subsequent key from succeeding
"""
- kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")])
+ kwargs = dict(gss_kex=True, key_filename=[_support("rsa.key")])
self._test_connection(**kwargs)
@requires_gss_auth
@@ -601,7 +590,7 @@ class SSHClientTest(ClientTest):
"""
Failed gssapi-with-mic doesn't prevent subsequent key from succeeding
"""
- kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")])
+ kwargs = dict(gss_auth=True, key_filename=[_support("rsa.key")])
self._test_connection(**kwargs)
def test_reject_policy(self):
@@ -683,11 +672,11 @@ class SSHClientTest(ClientTest):
self._client_host_key_bad(host_key)
def test_host_key_negotiation_3(self):
- self._client_host_key_good(paramiko.ECDSAKey, "test_ecdsa_256.key")
+ self._client_host_key_good(paramiko.ECDSAKey, "ecdsa-256.key")
@requires_sha1_signing
def test_host_key_negotiation_4(self):
- self._client_host_key_good(paramiko.RSAKey, "test_rsa.key")
+ self._client_host_key_good(paramiko.RSAKey, "rsa.key")
def _setup_for_env(self):
threading.Thread(target=self._run).start()
diff --git a/tests/test_config.py b/tests/test_config.py
index a2c60a32..fcb120b6 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -19,7 +19,7 @@ from paramiko import (
ConfigParseError,
)
-from .util import _config
+from ._util import _config
@fixture
diff --git a/tests/test_dss.key b/tests/test_dss.key
deleted file mode 100644
index e10807f1..00000000
--- a/tests/test_dss.key
+++ /dev/null
@@ -1,12 +0,0 @@
------BEGIN DSA PRIVATE KEY-----
-MIIBuwIBAAKBgQDngaYDZ30c6/7cJgEEbtl8FgKdwhba1Z7oOrOn4MI/6C42G1bY
-wMuqZf4dBCglsdq39SHrcjbE8Vq54gPSOh3g4+uV9Rcg5IOoPLbwp2jQfF6f1FIb
-sx7hrDCIqUcQccPSxetPBKmXI9RN8rZLaFuQeTnI65BKM98Ruwvq6SI2LwIVAPDP
-hSeawaJI27mKqOfe5PPBSmyHAoGBAJMXxXmPD9sGaQ419DIpmZecJKBUAy9uXD8x
-gbgeDpwfDaFJP8owByCKREocPFfi86LjCuQkyUKOfjYMN6iHIf1oEZjB8uJAatUr
-FzI0ArXtUqOhwTLwTyFuUojE5own2WYsOAGByvgfyWjsGhvckYNhI4ODpNdPlxQ8
-ZamaPGPsAoGARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmn
-jO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacI
-BlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgECFGI9QPSc
-h9pT9XHqn+1rZ4bK+QGA
------END DSA PRIVATE KEY-----
diff --git a/tests/test_ecdsa_256.key b/tests/test_ecdsa_256.key
deleted file mode 100644
index 42d44734..00000000
--- a/tests/test_ecdsa_256.key
+++ /dev/null
@@ -1,5 +0,0 @@
------BEGIN EC PRIVATE KEY-----
-MHcCAQEEIKB6ty3yVyKEnfF/zprx0qwC76MsMlHY4HXCnqho2eKioAoGCCqGSM49
-AwEHoUQDQgAElI9mbdlaS+T9nHxY/59lFnn80EEecZDBHq4gLpccY8Mge5ZTMiMD
-ADRvOqQ5R98Sxst765CAqXmRtz8vwoD96g==
------END EC PRIVATE KEY-----
diff --git a/tests/test_ed25519.key b/tests/test_ed25519.key
deleted file mode 100644
index eb9f94c2..00000000
--- a/tests/test_ed25519.key
+++ /dev/null
@@ -1,8 +0,0 @@
------BEGIN OPENSSH PRIVATE KEY-----
-b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
-QyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXwAAAKhjwAdrY8AH
-awAAAAtzc2gtZWQyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXw
-AAAEA9tGQi2IrprbOSbDCF+RmAHd6meNSXBUQ2ekKXm4/8xnr1K9komH/1WBIvQbbtvnFV
-hryd62EfcgRFuLRiokNfAAAAI2FsZXhfZ2F5bm9yQEFsZXhzLU1hY0Jvb2stQWlyLmxvY2
-FsAQI=
------END OPENSSH PRIVATE KEY-----
diff --git a/tests/test_file.py b/tests/test_file.py
index 456c0388..9344495b 100644
--- a/tests/test_file.py
+++ b/tests/test_file.py
@@ -26,7 +26,7 @@ from io import BytesIO
from paramiko.common import linefeed_byte, crlf, cr_byte
from paramiko.file import BufferedFile
-from .util import needs_builtin
+from ._util import needs_builtin
class LoopbackFile(BufferedFile):
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index 671f1ba0..da62fd97 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -24,7 +24,7 @@ Test the used APIs for GSS-API / SSPI authentication
import socket
-from .util import needs_gssapi, KerberosTestCase, update_env
+from ._util import needs_gssapi, KerberosTestCase, update_env
#
# NOTE: KerberosTestCase skips all tests if it was unable to import k5test
diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py
index bdda295a..a028411d 100644
--- a/tests/test_hostkeys.py
+++ b/tests/test_hostkeys.py
@@ -36,6 +36,11 @@ broken.example.com ssh-rsa AAAA
happy.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31M\
BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M=
+modern.example.com ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIKHEChAIxsh2hr8Q\
++Ea1AAHZyfEB2elEc2YgduVzBtp+
+curvy.example.com ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlz\
+dHAyNTYAAABBBAa+pY7djSpbg5viAcZhPt56AO3U3Sd7h7dnlUp0EjfDgyYHYQxl2QZ4JGgfwR5iv9\
+T9iRZjQzvJd5s+kBAZtpk=
"""
test_hosts_file_tabs = """\
@@ -45,9 +50,11 @@ D+jrpI9cycZHqilK0HmxDeCuxbwyMuaCygU9gS2qoRvNLWZk70OpIKSSpBo0Wl3/XUmz9uhc=
happy.example.com\tssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31M\
BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M=
-doublespace.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA1PD6U2/TVxET6lkp\
-KhOk5r9q/kAYG6sP9f5zuUYP8i7FOFp/6ncCEbbtg/lB+A3iidyxoSWl+9jtoyyDOOVX4UIDV9G11M\
-l8om3D+jrpI9cycZHqilK0HmxDeCuxbwyMuaCygU9gS2qoRvNLWZk70OpIKSSpBo0Wl3/XUmz8BtZ=
+modern.example.com\tssh-ed25519\tAAAAC3NzaC1lZDI1NTE5AAAAIKHEChAIxsh2hr8Q\
++Ea1AAHZyfEB2elEc2YgduVzBtp+
+curvy.example.com\tecdsa-sha2-nistp256\tAAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbml\
+zdHAyNTYAAABBBAa+pY7djSpbg5viAcZhPt56AO3U3Sd7h7dnlUp0EjfDgyYHYQxl2QZ4JGgfwR5iv\
+9T9iRZjQzvJd5s+kBAZtpk=
"""
keyblob = b"""\
@@ -76,7 +83,7 @@ class HostKeysTest(unittest.TestCase):
def test_load(self):
hostdict = paramiko.HostKeys("hostfile.temp")
- self.assertEqual(2, len(hostdict))
+ assert len(hostdict) == 4
self.assertEqual(1, len(list(hostdict.values())[0]))
self.assertEqual(1, len(list(hostdict.values())[1]))
fp = hexlify(
@@ -89,7 +96,7 @@ class HostKeysTest(unittest.TestCase):
hh = "|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c="
key = paramiko.RSAKey(data=decodebytes(keyblob))
hostdict.add(hh, "ssh-rsa", key)
- self.assertEqual(3, len(list(hostdict)))
+ assert len(hostdict) == 5
x = hostdict["foo.example.com"]
fp = hexlify(x["ssh-rsa"].get_fingerprint()).upper()
self.assertEqual(b"7EC91BB336CB6D810B124B1353C32396", fp)
@@ -105,10 +112,8 @@ class HostKeysTest(unittest.TestCase):
self.assertTrue(x is not None)
fp = hexlify(x["ssh-rsa"].get_fingerprint()).upper()
self.assertEqual(b"E6684DB30E109B67B70FF1DC5C7F1363", fp)
- i = 0
- for key in hostdict:
- i += 1
- self.assertEqual(2, i)
+ assert list(hostdict) == hostdict.keys()
+ assert len(list(hostdict)) == len(hostdict.keys()) == 4
def test_dict_set(self):
hostdict = paramiko.HostKeys("hostfile.temp")
@@ -118,7 +123,7 @@ class HostKeysTest(unittest.TestCase):
hostdict["fake.example.com"] = {}
hostdict["fake.example.com"]["ssh-rsa"] = key
- self.assertEqual(3, len(hostdict))
+ assert len(hostdict) == 5
self.assertEqual(2, len(list(hostdict.values())[0]))
self.assertEqual(1, len(list(hostdict.values())[1]))
self.assertEqual(1, len(list(hostdict.values())[2]))
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index d4868f4a..a81b1959 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -31,7 +31,7 @@ import unittest
import paramiko
-from .util import needs_gssapi, KerberosTestCase, update_env
+from ._util import needs_gssapi, KerberosTestCase, update_env, _support
class NullServer(paramiko.ServerInterface):
@@ -80,7 +80,7 @@ class GSSKexTest(KerberosTestCase):
def _run(self):
self.socks, addr = self.sockl.accept()
self.ts = paramiko.Transport(self.socks, gss_kex=True)
- host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
self.ts.add_server_key(host_key)
self.ts.set_gss_host(self.realm.hostname)
try:
@@ -96,7 +96,7 @@ class GSSKexTest(KerberosTestCase):
Diffie-Hellman Key Exchange and user authentication with the GSS-API
context created during key exchange.
"""
- host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py
index d4dd58ad..aee21c21 100644
--- a/tests/test_packetizer.py
+++ b/tests/test_packetizer.py
@@ -30,7 +30,7 @@ from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes
from paramiko import Message, Packetizer, util
from paramiko.common import byte_chr, zero_byte
-from .loop import LoopSocket
+from ._loop import LoopSocket
x55 = byte_chr(0x55)
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 4d74d8aa..d4d193b8 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -45,7 +45,7 @@ from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateNumbers
from unittest.mock import patch, Mock
import pytest
-from .util import _support, is_low_entropy, requires_sha1_signing
+from ._util import _support, is_low_entropy, requires_sha1_signing
# from openssh's ssh-keygen
@@ -138,12 +138,6 @@ TEST_KEY_BYTESTR = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\
class KeyTest(unittest.TestCase):
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
def assert_keyfile_is_encrypted(self, keyfile):
"""
A quick check that filename looks like an encrypted key.
@@ -161,7 +155,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(exp, key)
def test_load_rsa(self):
- key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ key = RSAKey.from_private_key_file(_support("rsa.key"))
self.assertEqual("ssh-rsa", key.get_name())
exp_rsa = b(FINGER_RSA.split()[1].replace(":", ""))
my_rsa = hexlify(key.get_fingerprint())
@@ -184,7 +178,7 @@ class KeyTest(unittest.TestCase):
) as loader:
loader.side_effect = exception
with pytest.raises(SSHException, match=str(exception)):
- RSAKey.from_private_key_file(_support("test_rsa.key"))
+ RSAKey.from_private_key_file(_support("rsa.key"))
def test_loading_empty_keys_errors_usefully(self):
# #1599 - raise SSHException instead of IndexError
@@ -203,7 +197,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(1024, key.get_bits())
def test_load_dss(self):
- key = DSSKey.from_private_key_file(_support("test_dss.key"))
+ key = DSSKey.from_private_key_file(_support("dss.key"))
self.assertEqual("ssh-dss", key.get_name())
exp_dss = b(FINGER_DSS.split()[1].replace(":", ""))
my_dss = hexlify(key.get_fingerprint())
@@ -231,7 +225,7 @@ class KeyTest(unittest.TestCase):
def test_compare_rsa(self):
# verify that the private & public keys compare equal
- key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ key = RSAKey.from_private_key_file(_support("rsa.key"))
self.assertEqual(key, key)
pub = RSAKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -240,7 +234,7 @@ class KeyTest(unittest.TestCase):
def test_compare_dss(self):
# verify that the private & public keys compare equal
- key = DSSKey.from_private_key_file(_support("test_dss.key"))
+ key = DSSKey.from_private_key_file(_support("dss.key"))
self.assertEqual(key, key)
pub = DSSKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -248,7 +242,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key, pub)
def _sign_and_verify_rsa(self, algorithm, saved_sig):
- key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ key = RSAKey.from_private_key_file(_support("rsa.key"))
msg = key.sign_ssh_data(b"ice weasels", algorithm)
assert isinstance(msg, Message)
msg.rewind()
@@ -273,7 +267,7 @@ class KeyTest(unittest.TestCase):
def test_sign_dss(self):
# verify that the dss private key can sign and verify
- key = DSSKey.from_private_key_file(_support("test_dss.key"))
+ key = DSSKey.from_private_key_file(_support("dss.key"))
msg = key.sign_ssh_data(b"ice weasels")
self.assertTrue(type(msg) is Message)
msg.rewind()
@@ -329,7 +323,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key.get_name(), "ecdsa-sha2-nistp521")
def test_load_ecdsa_256(self):
- key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key"))
+ key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key"))
self.assertEqual("ecdsa-sha2-nistp256", key.get_name())
exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(":", ""))
my_ecdsa = hexlify(key.get_fingerprint())
@@ -357,7 +351,7 @@ class KeyTest(unittest.TestCase):
def test_compare_ecdsa_256(self):
# verify that the private & public keys compare equal
- key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key"))
+ key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key"))
self.assertEqual(key, key)
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -366,7 +360,7 @@ class KeyTest(unittest.TestCase):
def test_sign_ecdsa_256(self):
# verify that the rsa private key can sign and verify
- key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key"))
+ key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key"))
msg = key.sign_ssh_data(b"ice weasels")
self.assertTrue(type(msg) is Message)
msg.rewind()
@@ -408,7 +402,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(384, key.get_bits())
def test_load_ecdsa_transmutes_crypto_exceptions(self):
- path = _support("test_ecdsa_256.key")
+ path = _support("ecdsa-256.key")
# TODO: nix unittest for pytest
for exception in (TypeError("onoz"), UnsupportedAlgorithm("oops")):
with patch(
@@ -569,12 +563,12 @@ class KeyTest(unittest.TestCase):
RSAKey.from_private_key_file(_support("test_rsa_openssh_nopad.key"))
def test_stringification(self):
- key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ key = RSAKey.from_private_key_file(_support("rsa.key"))
comparable = TEST_KEY_BYTESTR
self.assertEqual(str(key), comparable)
def test_ed25519(self):
- key1 = Ed25519Key.from_private_key_file(_support("test_ed25519.key"))
+ key1 = Ed25519Key.from_private_key_file(_support("ed25519.key"))
key2 = Ed25519Key.from_private_key_file(
_support("test_ed25519_password.key"), b"abc123"
)
@@ -594,7 +588,7 @@ class KeyTest(unittest.TestCase):
def test_ed25519_compare(self):
# verify that the private & public keys compare equal
- key = Ed25519Key.from_private_key_file(_support("test_ed25519.key"))
+ key = Ed25519Key.from_private_key_file(_support("ed25519.key"))
self.assertEqual(key, key)
pub = Ed25519Key(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -616,33 +610,6 @@ class KeyTest(unittest.TestCase):
)
assert original != generated
- def keys(self):
- for key_class, filename in [
- (RSAKey, "test_rsa.key"),
- (DSSKey, "test_dss.key"),
- (ECDSAKey, "test_ecdsa_256.key"),
- (Ed25519Key, "test_ed25519.key"),
- ]:
- key1 = key_class.from_private_key_file(_support(filename))
- key2 = key_class.from_private_key_file(_support(filename))
- yield key1, key2
-
- def test_keys_are_comparable(self):
- for key1, key2 in self.keys():
- assert key1 == key2
-
- def test_keys_are_not_equal_to_other(self):
- for value in [None, True, ""]:
- for key1, _ in self.keys():
- assert key1 != value
-
- def test_keys_are_hashable(self):
- # NOTE: this isn't a great test due to hashseed randomization under
- # Python 3 preventing use of static values, but it does still prove
- # that __hash__ is implemented/doesn't explode & works across instances
- for key1, key2 in self.keys():
- assert hash(key1) == hash(key2)
-
def test_ed25519_nonbytes_password(self):
# https://github.com/paramiko/paramiko/issues/1039
Ed25519Key.from_private_key_file(
@@ -654,7 +621,7 @@ class KeyTest(unittest.TestCase):
# No exception -> it's good. Meh.
def test_ed25519_load_from_file_obj(self):
- with open(_support("test_ed25519.key")) as pkey_fileobj:
+ with open(_support("ed25519.key")) as pkey_fileobj:
key = Ed25519Key.from_private_key(pkey_fileobj)
self.assertEqual(key, key)
self.assertTrue(key.can_sign())
@@ -674,43 +641,6 @@ class KeyTest(unittest.TestCase):
finally:
os.remove(newfile)
- def test_certificates(self):
- # NOTE: we also test 'live' use of cert auth for all key types in
- # test_client.py; this and nearby cert tests are more about the gritty
- # details.
- # PKey.load_certificate
- key_path = _support(os.path.join("cert_support", "test_rsa.key"))
- key = RSAKey.from_private_key_file(key_path)
- self.assertTrue(key.public_blob is None)
- cert_path = _support(
- os.path.join("cert_support", "test_rsa.key-cert.pub")
- )
- key.load_certificate(cert_path)
- self.assertTrue(key.public_blob is not None)
- self.assertEqual(
- key.public_blob.key_type, "ssh-rsa-cert-v01@openssh.com"
- )
- self.assertEqual(key.public_blob.comment, "test_rsa.key.pub")
- # Delve into blob contents, for test purposes
- msg = Message(key.public_blob.key_blob)
- self.assertEqual(msg.get_text(), "ssh-rsa-cert-v01@openssh.com")
- msg.get_string()
- e = msg.get_mpint()
- n = msg.get_mpint()
- self.assertEqual(e, key.public_numbers.e)
- self.assertEqual(n, key.public_numbers.n)
- # Serial number
- self.assertEqual(msg.get_int64(), 1234)
-
- # Prevented from loading certificate that doesn't match
- key_path = _support(os.path.join("cert_support", "test_ed25519.key"))
- key1 = Ed25519Key.from_private_key_file(key_path)
- self.assertRaises(
- ValueError,
- key1.load_certificate,
- _support("test_rsa.key-cert.pub"),
- )
-
@patch("paramiko.pkey.os")
def _test_keyfile_race(self, os_, exists):
# Re: CVE-2022-24302
@@ -764,22 +694,3 @@ class KeyTest(unittest.TestCase):
finally:
if os.path.exists(new):
os.unlink(new)
-
- def test_sign_rsa_with_certificate(self):
- data = b"ice weasels"
- key_path = _support(os.path.join("cert_support", "test_rsa.key"))
- key = RSAKey.from_private_key_file(key_path)
- msg = key.sign_ssh_data(data, "rsa-sha2-256")
- msg.rewind()
- assert "rsa-sha2-256" == msg.get_text()
- sign = msg.get_binary()
- cert_path = _support(
- os.path.join("cert_support", "test_rsa.key-cert.pub")
- )
- key.load_certificate(cert_path)
- msg = key.sign_ssh_data(data, "rsa-sha2-256-cert-v01@openssh.com")
- msg.rewind()
- assert "rsa-sha2-256" == msg.get_text()
- assert sign == msg.get_binary()
- msg.rewind()
- assert key.verify_ssh_sig(b"ice weasels", msg)
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index be123de4..7fd274bc 100644
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -38,8 +38,8 @@ from paramiko.sftp_attr import SFTPAttributes
from paramiko.util import b, u
from tests import requireNonAsciiLocale
-from .util import needs_builtin
-from .util import slow
+from ._util import needs_builtin
+from ._util import slow
ARTICLE = """
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index 5192f657..acfe71e3 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -30,7 +30,7 @@ import time
from paramiko.common import o660
-from .util import slow
+from ._util import slow
@slow
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index a8175ccb..b441a225 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -28,7 +28,7 @@ import threading
import paramiko
-from .util import _support, needs_gssapi, KerberosTestCase, update_env
+from ._util import _support, needs_gssapi, KerberosTestCase, update_env
from .test_client import FINGERPRINTS
@@ -89,7 +89,7 @@ class GSSAuthTest(KerberosTestCase):
def _run(self):
self.socks, addr = self.sockl.accept()
self.ts = paramiko.Transport(self.socks)
- host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
self.ts.add_server_key(host_key)
server = NullServer()
self.ts.start_server(self.event, server)
@@ -100,7 +100,7 @@ class GSSAuthTest(KerberosTestCase):
The exception is ... no exception yet
"""
- host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
@@ -154,7 +154,7 @@ class GSSAuthTest(KerberosTestCase):
"this_host_does_not_exists_and_causes_a_GSSAPI-exception"
)
self._test_connection(
- key_filename=[_support("test_rsa.key")],
+ key_filename=[_support("rsa.key")],
allow_agent=False,
look_for_keys=False,
)
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 4062d767..b2efd637 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -22,7 +22,6 @@ Some unit tests for the ssh2 protocol in Transport.
from binascii import hexlify
-from contextlib import contextmanager
import select
import socket
import time
@@ -34,18 +33,16 @@ from unittest.mock import Mock
from paramiko import (
AuthHandler,
ChannelException,
- DSSKey,
Packetizer,
RSAKey,
SSHException,
- AuthenticationException,
IncompatiblePeer,
SecurityOptions,
- ServerInterface,
+ ServiceRequestingTransport,
Transport,
)
-from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
-from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
+from paramiko.auth_handler import AuthOnlyHandler
+from paramiko import OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
from paramiko.common import (
DEFAULT_MAX_PACKET_SIZE,
DEFAULT_WINDOW_SIZE,
@@ -60,8 +57,17 @@ from paramiko.common import (
)
from paramiko.message import Message
-from .util import needs_builtin, _support, requires_sha1_signing, slow
-from .loop import LoopSocket
+from ._util import (
+ needs_builtin,
+ _support,
+ requires_sha1_signing,
+ slow,
+ server,
+ _disable_sha2,
+ _disable_sha1,
+ TestServer as NullServer,
+)
+from ._loop import LoopSocket
LONG_BANNER = """\
@@ -77,80 +83,11 @@ Maybe.
"""
-class NullServer(ServerInterface):
- paranoid_did_password = False
- paranoid_did_public_key = False
- paranoid_key = DSSKey.from_private_key_file(_support("test_dss.key"))
-
- def __init__(self, allowed_keys=None):
- self.allowed_keys = allowed_keys if allowed_keys is not None else []
-
- def get_allowed_auths(self, username):
- if username == "slowdive":
- return "publickey,password"
- return "publickey"
-
- def check_auth_password(self, username, password):
- if (username == "slowdive") and (password == "pygmalion"):
- return AUTH_SUCCESSFUL
- return AUTH_FAILED
-
- def check_auth_publickey(self, username, key):
- if key in self.allowed_keys:
- return AUTH_SUCCESSFUL
- return AUTH_FAILED
-
- def check_channel_request(self, kind, chanid):
- if kind == "bogus":
- return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
- return OPEN_SUCCEEDED
-
- def check_channel_exec_request(self, channel, command):
- if command != b"yes":
- return False
- return True
-
- def check_channel_shell_request(self, channel):
- return True
-
- def check_global_request(self, kind, msg):
- self._global_request = kind
- # NOTE: for w/e reason, older impl of this returned False always, even
- # tho that's only supposed to occur if the request cannot be served.
- # For now, leaving that the default unless test supplies specific
- # 'acceptable' request kind
- return kind == "acceptable"
-
- def check_channel_x11_request(
- self,
- channel,
- single_connection,
- auth_protocol,
- auth_cookie,
- screen_number,
- ):
- self._x11_single_connection = single_connection
- self._x11_auth_protocol = auth_protocol
- self._x11_auth_cookie = auth_cookie
- self._x11_screen_number = screen_number
- return True
-
- def check_port_forward_request(self, addr, port):
- self._listen = socket.socket()
- self._listen.bind(("127.0.0.1", 0))
- self._listen.listen(1)
- return self._listen.getsockname()[1]
-
- def cancel_port_forward_request(self, addr, port):
- self._listen.close()
- self._listen = None
-
- def check_channel_direct_tcpip_request(self, chanid, origin, destination):
- self._tcpip_dest = destination
- return OPEN_SUCCEEDED
-
-
class TransportTest(unittest.TestCase):
+ # TODO: this can get nuked once ServiceRequestingTransport becomes the
+ # only Transport, as it has this baked in.
+ _auth_handler_class = AuthHandler
+
def setUp(self):
self.socks = LoopSocket()
self.sockc = LoopSocket()
@@ -168,7 +105,7 @@ class TransportTest(unittest.TestCase):
def setup_test_server(
self, client_options=None, server_options=None, connect_kwargs=None
):
- host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ host_key = RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
@@ -234,7 +171,7 @@ class TransportTest(unittest.TestCase):
loopback sockets. this is hardly "simple" but it's simpler than the
later tests. :)
"""
- host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ host_key = RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -260,7 +197,7 @@ class TransportTest(unittest.TestCase):
"""
verify that a long banner doesn't mess up the handshake.
"""
- host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ host_key = RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -910,7 +847,7 @@ class TransportTest(unittest.TestCase):
# be fine. Even tho it's a bit squicky.
self.tc.packetizer = SlowPacketizer(self.tc.sock)
# Continue with regular test red tape.
- host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ host_key = RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -1099,7 +1036,8 @@ class TransportTest(unittest.TestCase):
def test_server_transports_reject_client_message_types(self):
# TODO: handle Transport's own tables too, not just its inner auth
# handler's table. See TODOs in auth_handler.py
- for message_type in AuthHandler._client_handler_table:
+ some_handler = self._auth_handler_class(self.tc)
+ for message_type in some_handler._client_handler_table:
self._send_client_message(message_type)
self._expect_unimplemented()
# Reset for rest of loop
@@ -1115,6 +1053,21 @@ class TransportTest(unittest.TestCase):
self._expect_unimplemented()
+# TODO: for now this is purely a regression test. It needs actual tests of the
+# intentional new behavior too!
+class ServiceRequestingTransportTest(TransportTest):
+ _auth_handler_class = AuthOnlyHandler
+
+ def setUp(self):
+ # Copypasta (Transport init is load-bearing)
+ self.socks = LoopSocket()
+ self.sockc = LoopSocket()
+ self.sockc.link(self.socks)
+ # New class who dis
+ self.tc = ServiceRequestingTransport(self.sockc)
+ self.ts = ServiceRequestingTransport(self.socks)
+
+
class AlgorithmDisablingTests(unittest.TestCase):
def test_preferred_lists_default_to_private_attribute_contents(self):
t = Transport(sock=Mock())
@@ -1188,98 +1141,6 @@ class AlgorithmDisablingTests(unittest.TestCase):
assert "zlib" not in compressions
-@contextmanager
-def server(
- hostkey=None,
- init=None,
- server_init=None,
- client_init=None,
- connect=None,
- pubkeys=None,
- catch_error=False,
-):
- """
- SSH server contextmanager for testing.
-
- :param hostkey:
- Host key to use for the server; if None, loads
- ``test_rsa.key``.
- :param init:
- Default `Transport` constructor kwargs to use for both sides.
- :param server_init:
- Extends and/or overrides ``init`` for server transport only.
- :param client_init:
- Extends and/or overrides ``init`` for client transport only.
- :param connect:
- Kwargs to use for ``connect()`` on the client.
- :param pubkeys:
- List of public keys for auth.
- :param catch_error:
- Whether to capture connection errors & yield from contextmanager.
- Necessary for connection_time exception testing.
- """
- if init is None:
- init = {}
- if server_init is None:
- server_init = {}
- if client_init is None:
- client_init = {}
- if connect is None:
- connect = dict(username="slowdive", password="pygmalion")
- socks = LoopSocket()
- sockc = LoopSocket()
- sockc.link(socks)
- tc = Transport(sockc, **dict(init, **client_init))
- ts = Transport(socks, **dict(init, **server_init))
-
- if hostkey is None:
- hostkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
- ts.add_server_key(hostkey)
- event = threading.Event()
- server = NullServer(allowed_keys=pubkeys)
- assert not event.is_set()
- assert not ts.is_active()
- assert tc.get_username() is None
- assert ts.get_username() is None
- assert not tc.is_authenticated()
- assert not ts.is_authenticated()
-
- err = None
- # Trap errors and yield instead of raising right away; otherwise callers
- # cannot usefully deal with problems at connect time which stem from errors
- # in the server side.
- try:
- ts.start_server(event, server)
- tc.connect(**connect)
-
- event.wait(1.0)
- assert event.is_set()
- assert ts.is_active()
- assert tc.is_active()
-
- except Exception as e:
- if not catch_error:
- raise
- err = e
-
- yield (tc, ts, err) if catch_error else (tc, ts)
-
- tc.close()
- ts.close()
- socks.close()
- sockc.close()
-
-
-_disable_sha2 = dict(
- disabled_algorithms=dict(keys=["rsa-sha2-256", "rsa-sha2-512"])
-)
-_disable_sha1 = dict(disabled_algorithms=dict(keys=["ssh-rsa"]))
-_disable_sha2_pubkey = dict(
- disabled_algorithms=dict(pubkeys=["rsa-sha2-256", "rsa-sha2-512"])
-)
-_disable_sha1_pubkey = dict(disabled_algorithms=dict(pubkeys=["ssh-rsa"]))
-
-
class TestSHA2SignatureKeyExchange(unittest.TestCase):
# NOTE: these all rely on the default server() hostkey being RSA
# NOTE: these rely on both sides being properly implemented re: agreed-upon
@@ -1343,8 +1204,11 @@ class TestSHA2SignatureKeyExchange(unittest.TestCase):
# (This is a regression test vs previous implementation which overwrote
# the entire preferred-hostkeys structure when given an explicit key as
# a client.)
- hostkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
- with server(hostkey=hostkey, connect=dict(hostkey=hostkey)) as (tc, _):
+ hostkey = RSAKey.from_private_key_file(_support("rsa.key"))
+ connect = dict(
+ hostkey=hostkey, username="slowdive", password="pygmalion"
+ )
+ with server(hostkey=hostkey, connect=connect) as (tc, _):
assert tc.host_key_type == "rsa-sha2-512"
@@ -1358,7 +1222,7 @@ class TestExtInfo(unittest.TestCase):
}
def test_client_uses_server_sig_algs_for_pubkey_auth(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ privkey = RSAKey.from_private_key_file(_support("rsa.key"))
with server(
pubkeys=[privkey],
connect=dict(pkey=privkey),
@@ -1367,94 +1231,6 @@ class TestExtInfo(unittest.TestCase):
),
) as (tc, _):
assert tc.is_authenticated()
- # Client settled on 256 despite itself not having 512 disabled
- assert tc._agreed_pubkey_algorithm == "rsa-sha2-256"
-
-
-# TODO: these could move into test_auth.py but that badly needs refactoring
-# with this module anyways...
-class TestSHA2SignaturePubkeys(unittest.TestCase):
- def test_pubkey_auth_honors_disabled_algorithms(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
- with server(
- pubkeys=[privkey],
- connect=dict(pkey=privkey),
- init=dict(
- disabled_algorithms=dict(
- pubkeys=["ssh-rsa", "rsa-sha2-256", "rsa-sha2-512"]
- )
- ),
- catch_error=True,
- ) as (_, _, err):
- assert isinstance(err, SSHException)
- assert "no RSA pubkey algorithms" in str(err)
-
- def test_client_sha2_disabled_server_sha1_disabled_no_match(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
- with server(
- pubkeys=[privkey],
- connect=dict(pkey=privkey),
- client_init=_disable_sha2_pubkey,
- server_init=_disable_sha1_pubkey,
- catch_error=True,
- ) as (tc, ts, err):
- assert isinstance(err, AuthenticationException)
-
- def test_client_sha1_disabled_server_sha2_disabled_no_match(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
- with server(
- pubkeys=[privkey],
- connect=dict(pkey=privkey),
- client_init=_disable_sha1_pubkey,
- server_init=_disable_sha2_pubkey,
- catch_error=True,
- ) as (tc, ts, err):
- assert isinstance(err, AuthenticationException)
-
- @requires_sha1_signing
- def test_ssh_rsa_still_used_when_sha2_disabled(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
- # NOTE: this works because key obj comparison uses public bytes
- # TODO: would be nice for PKey to grow a legit "give me another obj of
- # same class but just the public bits" using asbytes()
- with server(
- pubkeys=[privkey], connect=dict(pkey=privkey), init=_disable_sha2
- ) as (tc, _):
- assert tc.is_authenticated()
-
- def test_sha2_512(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
- with server(
- pubkeys=[privkey],
- connect=dict(pkey=privkey),
- init=dict(
- disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-256"])
- ),
- ) as (tc, ts):
- assert tc.is_authenticated()
- assert tc._agreed_pubkey_algorithm == "rsa-sha2-512"
-
- def test_sha2_256(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
- with server(
- pubkeys=[privkey],
- connect=dict(pkey=privkey),
- init=dict(
- disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"])
- ),
- ) as (tc, ts):
- assert tc.is_authenticated()
- assert tc._agreed_pubkey_algorithm == "rsa-sha2-256"
-
- def test_sha2_256_when_client_only_enables_256(self):
- privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
- with server(
- pubkeys=[privkey],
- connect=dict(pkey=privkey),
- # Client-side only; server still accepts all 3.
- client_init=dict(
- disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"])
- ),
- ) as (tc, ts):
- assert tc.is_authenticated()
+ # Client settled on 256 despite itself not having 512 disabled (and
+ # otherwise, 512 would have been earlier in the preferred list)
assert tc._agreed_pubkey_algorithm == "rsa-sha2-256"
diff --git a/tests/test_util.py b/tests/test_util.py
index ec03846b..a2a8224e 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -49,6 +49,11 @@ class UtilTest(unittest.TestCase):
"Agent",
"AgentKey",
"AuthenticationException",
+ "AuthFailure",
+ "AuthHandler",
+ "AuthResult",
+ "AuthSource",
+ "AuthStrategy",
"AutoAddPolicy",
"BadAuthenticationType",
"BufferedFile",
@@ -58,9 +63,14 @@ class UtilTest(unittest.TestCase):
"CouldNotCanonicalize",
"DSSKey",
"HostKeys",
+ "InMemoryPrivateKey",
"Message",
"MissingHostKeyPolicy",
+ "NoneAuth",
+ "OnDiskPrivateKey",
+ "Password",
"PasswordRequiredException",
+ "PrivateKey",
"RSAKey",
"RejectPolicy",
"SFTP",
@@ -77,12 +87,13 @@ class UtilTest(unittest.TestCase):
"SSHException",
"SecurityOptions",
"ServerInterface",
+ "SourceResult",
"SubsystemHandler",
"Transport",
"WarningPolicy",
"util",
):
- assert name in paramiko.__all__
+ assert name in dir(paramiko)
def test_generate_key_bytes(self):
key_bytes = paramiko.util.generate_key_bytes(
diff --git a/tests/util.py b/tests/util.py
deleted file mode 100644
index 0639f8ae..00000000
--- a/tests/util.py
+++ /dev/null
@@ -1,174 +0,0 @@
-from os.path import dirname, realpath, join
-import builtins
-import os
-import struct
-import sys
-import unittest
-
-import pytest
-
-from paramiko.ssh_gss import GSS_AUTH_AVAILABLE
-
-from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
-from cryptography.hazmat.backends import default_backend
-from cryptography.hazmat.primitives import hashes
-from cryptography.hazmat.primitives.asymmetric import padding, rsa
-
-tests_dir = dirname(realpath(__file__))
-
-
-def _support(filename):
- return join(tests_dir, filename)
-
-
-def _config(name):
- return join(tests_dir, "configs", name)
-
-
-needs_gssapi = pytest.mark.skipif(
- not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test"
-)
-
-
-def needs_builtin(name):
- """
- Skip decorated test if builtin name does not exist.
- """
- reason = "Test requires a builtin '{}'".format(name)
- return pytest.mark.skipif(not hasattr(builtins, name), reason=reason)
-
-
-slow = pytest.mark.slow
-
-# GSSAPI / Kerberos related tests need a working Kerberos environment.
-# The class `KerberosTestCase` provides such an environment or skips all tests.
-# There are 3 distinct cases:
-#
-# - A Kerberos environment has already been created and the environment
-# contains the required information.
-#
-# - We can use the package 'k5test' to setup an working kerberos environment on
-# the fly.
-#
-# - We skip all tests.
-#
-# ToDo: add a Windows specific implementation?
-
-if (
- os.environ.get("K5TEST_USER_PRINC", None)
- and os.environ.get("K5TEST_HOSTNAME", None)
- and os.environ.get("KRB5_KTNAME", None)
-): # add other vars as needed
-
- # The environment provides the required information
- class DummyK5Realm:
- def __init__(self):
- for k in os.environ:
- if not k.startswith("K5TEST_"):
- continue
- setattr(self, k[7:].lower(), os.environ[k])
- self.env = {}
-
- class KerberosTestCase(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- cls.realm = DummyK5Realm()
-
- @classmethod
- def tearDownClass(cls):
- del cls.realm
-
-else:
- try:
- # Try to setup a kerberos environment
- from k5test import KerberosTestCase
- except Exception:
- # Use a dummy, that skips all tests
- class KerberosTestCase(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- raise unittest.SkipTest(
- "Missing extension package k5test. "
- 'Please run "pip install k5test" '
- "to install it."
- )
-
-
-def update_env(testcase, mapping, env=os.environ):
- """Modify os.environ during a test case and restore during cleanup."""
- saved_env = env.copy()
-
- def replace(target, source):
- target.update(source)
- for k in list(target):
- if k not in source:
- target.pop(k, None)
-
- testcase.addCleanup(replace, env, saved_env)
- env.update(mapping)
- return testcase
-
-
-def k5shell(args=None):
- """Create a shell with an kerberos environment
-
- This can be used to debug paramiko or to test the old GSSAPI.
- To test a different GSSAPI, simply activate a suitable venv
- within the shell.
- """
- import k5test
- import atexit
- import subprocess
-
- k5 = k5test.K5Realm()
- atexit.register(k5.stop)
- os.environ.update(k5.env)
- for n in ("realm", "user_princ", "hostname"):
- os.environ["K5TEST_" + n.upper()] = getattr(k5, n)
-
- if not args:
- args = sys.argv[1:]
- if not args:
- args = [os.environ.get("SHELL", "bash")]
- sys.exit(subprocess.call(args))
-
-
-def is_low_entropy():
- """
- Attempts to detect whether running interpreter is low-entropy.
-
- "low-entropy" is defined as being in 32-bit mode and with the hash seed set
- to zero.
- """
- is_32bit = struct.calcsize("P") == 32 / 8
- # I don't see a way to tell internally if the hash seed was set this
- # way, but env should be plenty sufficient, this is only for testing.
- return is_32bit and os.environ.get("PYTHONHASHSEED", None) == "0"
-
-
-def sha1_signing_unsupported():
- """
- This is used to skip tests in environments where SHA-1 signing is
- not supported by the backend.
- """
- private_key = rsa.generate_private_key(
- public_exponent=65537, key_size=2048, backend=default_backend()
- )
- message = b"Some dummy text"
- try:
- private_key.sign(
- message,
- padding.PSS(
- mgf=padding.MGF1(hashes.SHA1()),
- salt_length=padding.PSS.MAX_LENGTH,
- ),
- hashes.SHA1(),
- )
- return False
- except UnsupportedAlgorithm as e:
- return e._reason is _Reasons.UNSUPPORTED_HASH
-
-
-requires_sha1_signing = unittest.skipIf(
- sha1_signing_unsupported(), "SHA-1 signing not supported"
-)