summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2023-05-08 17:52:15 -0400
committerJeff Forcier <jeff@bitprophet.org>2023-05-18 13:57:19 -0400
commit992c9967330bf977e45e21079e6f2e974ac4f045 (patch)
treeafb9b753c7c88c354ff849723fb7b7c7f31f5381
parentebc96706233346fcfc3071a390037cf26129727b (diff)
Made PKey.from_path cert-aware & tilde-friendly
This was previously only done in SSHClient. It's not relevant for from_type_string which is aimed at ssh-agents, which tend to do their own cert loading where necessary
-rw-r--r--paramiko/auth_handler.py8
-rw-r--r--paramiko/client.py2
-rw-r--r--paramiko/pkey.py39
-rw-r--r--tests/_support/rsa-lonely.key15
-rw-r--r--tests/_support/rsa-missing.key-cert.pub1
-rw-r--r--tests/auth.py22
-rw-r--r--tests/pkey.py50
7 files changed, 127 insertions, 10 deletions
diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py
index 539bf973..bc7f298f 100644
--- a/paramiko/auth_handler.py
+++ b/paramiko/auth_handler.py
@@ -1080,9 +1080,11 @@ class AuthOnlyHandler(AuthHandler):
def _choose_fallback_pubkey_algorithm(self, key_type, my_algos):
msg = "Server did not send a server-sig-algs list; defaulting to something in our preferred algorithms list" # noqa
self._log(DEBUG, msg)
- if key_type in my_algos:
- msg = f"Current key type, {key_type!r}, is in our preferred list; using that" # noqa
- algo = key_type
+ noncert_key_type = key_type.replace("-cert-v01@openssh.com", "")
+ if key_type in my_algos or noncert_key_type in my_algos:
+ actual = key_type if key_type in my_algos else noncert_key_type
+ msg = f"Current key type, {actual!r}, is in our preferred list; using that" # noqa
+ algo = actual
else:
algo = my_algos[0]
msg = f"{key_type!r} not in our list - trying first list item instead, {algo!r}" # noqa
diff --git a/paramiko/client.py b/paramiko/client.py
index 1fe14b07..3d1b801c 100644
--- a/paramiko/client.py
+++ b/paramiko/client.py
@@ -697,6 +697,8 @@ class SSHClient(ClosingContextManager):
if not two_factor:
for key_filename in key_filenames:
+ # TODO 4.0: leverage PKey.from_path() if we don't end up just
+ # killing SSHClient entirely
for pkey_class in (RSAKey, DSSKey, ECDSAKey, Ed25519Key):
try:
key = self._key_from_filepath(
diff --git a/paramiko/pkey.py b/paramiko/pkey.py
index 4eabac5d..26c11e95 100644
--- a/paramiko/pkey.py
+++ b/paramiko/pkey.py
@@ -115,7 +115,7 @@ class PKey:
"""
Attempt to instantiate appropriate key subclass from given file path.
- :param Path path: The path to load (may be a `str`).
+ :param Path path: The path to load (may also be a `str`).
:returns:
A `PKey` subclass instance.
@@ -126,11 +126,28 @@ class PKey:
.. versionadded:: 3.2
"""
# TODO: make sure sphinx is reading Path right in param list...
+
+ # Lazy import to avoid circular import issues
from paramiko import DSSKey, RSAKey, Ed25519Key, ECDSAKey
- if not isinstance(path, Path):
- path = Path(path)
- data = path.read_bytes()
+ # Normalize to string, as cert suffix isn't quite an extension, so
+ # pathlib isn't useful for this.
+ path = str(path)
+
+ # Sort out cert vs key, i.e. it is 'legal' to hand this kind of API
+ # /either/ the key /or/ the cert, when there is a key/cert pair.
+ cert_suffix = "-cert.pub"
+ if str(path).endswith(cert_suffix):
+ key_path = path[: -len(cert_suffix)]
+ cert_path = path
+ else:
+ key_path = path
+ cert_path = path + cert_suffix
+
+ key_path = Path(key_path).expanduser()
+ cert_path = Path(cert_path).expanduser()
+
+ data = key_path.read_bytes()
# Like OpenSSH, try modern/OpenSSH-specific key load first
try:
loaded = serialization.load_ssh_private_key(
@@ -160,8 +177,12 @@ class PKey:
key_class = ECDSAKey
else:
raise UnknownKeyType(key_bytes=data, key_type=loaded.__class__)
- with path.open() as fd:
- return key_class.from_private_key(fd, password=passphrase)
+ with key_path.open() as fd:
+ key = key_class.from_private_key(fd, password=passphrase)
+ if cert_path.exists():
+ # load_certificate can take Message, path-str, or value-str
+ key.load_certificate(str(cert_path))
+ return key
@staticmethod
def from_type_string(key_type, key_bytes):
@@ -204,6 +225,12 @@ class PKey:
"""
return [cls.name]
+ # TODO 4.0: make this and subclasses consistent, some of our own
+ # classmethods even assume kwargs we don't define!
+ # TODO 4.0: prob also raise NotImplementedError instead of pass'ing; the
+ # contract is pretty obviously that you need to handle msg/data/filename
+ # appropriately. (If 'pass' is a concession to testing, see about doing the
+ # work to fix the tests instead)
def __init__(self, msg=None, data=None):
"""
Create a new instance of this public key type. If ``msg`` is given,
diff --git a/tests/_support/rsa-lonely.key b/tests/_support/rsa-lonely.key
new file mode 100644
index 00000000..f50e9c53
--- /dev/null
+++ b/tests/_support/rsa-lonely.key
@@ -0,0 +1,15 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
+oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
+d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
+gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
+EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
+soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
+tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
+avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
+4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
+H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
+qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
+HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
+nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
+-----END RSA PRIVATE KEY-----
diff --git a/tests/_support/rsa-missing.key-cert.pub b/tests/_support/rsa-missing.key-cert.pub
new file mode 100644
index 00000000..7487ab66
--- /dev/null
+++ b/tests/_support/rsa-missing.key-cert.pub
@@ -0,0 +1 @@
+ssh-rsa-cert-v01@openssh.com AAAAHHNzaC1yc2EtY2VydC12MDFAb3BlbnNzaC5jb20AAAAgsZlXTd5NE4uzGAn6TyAqQj+IPbsTEFGap2x5pTRwQR8AAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4cAAAAAAAAE0gAAAAEAAAAmU2FtcGxlIHNlbGYtc2lnbmVkIE9wZW5TU0ggY2VydGlmaWNhdGUAAAASAAAABXVzZXIxAAAABXVzZXIyAAAAAAAAAAD//////////wAAAAAAAACCAAAAFXBlcm1pdC1YMTEtZm9yd2FyZGluZwAAAAAAAAAXcGVybWl0LWFnZW50LWZvcndhcmRpbmcAAAAAAAAAFnBlcm1pdC1wb3J0LWZvcndhcmRpbmcAAAAAAAAACnBlcm1pdC1wdHkAAAAAAAAADnBlcm1pdC11c2VyLXJjAAAAAAAAAAAAAACVAAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4cAAACPAAAAB3NzaC1yc2EAAACATFHFsARDgQevc6YLxNnDNjsFtZ08KPMyYVx0w5xm95IVZHVWSOc5w+ccjqN9HRwxV3kP7IvL91qx0Uc3MJdB9g/O6HkAP+rpxTVoTb2EAMekwp5+i8nQJW4CN2BSsbQY1M6r7OBZ5nmF4hOW/5Pu4l22lXe2ydy8kEXOEuRpUeQ= test_rsa.key.pub
diff --git a/tests/auth.py b/tests/auth.py
index a12aa5fe..5d732218 100644
--- a/tests/auth.py
+++ b/tests/auth.py
@@ -9,6 +9,7 @@ from pytest import raises
from paramiko import (
RSAKey,
DSSKey,
+ PKey,
BadAuthenticationType,
AuthenticationException,
SSHException,
@@ -165,6 +166,27 @@ class AuthOnlyHandler_:
assert tc._agreed_pubkey_algorithm == "ssh-rsa"
@requires_sha1_signing
+ def key_type_algo_selection_is_cert_suffix_aware(self):
+ # This key has a cert next to it, which should trigger cert-aware
+ # loading within key classes.
+ privkey = PKey.from_path(_support("rsa.key"))
+ server_init = dict(_disable_sha2_pubkey, server_sig_algs=False)
+ with self._server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ server_init=server_init,
+ catch_error=True,
+ ) as (tc, ts, err):
+ assert not err
+ # Auth did work
+ assert tc.is_authenticated()
+ # Selected expected cert type
+ assert (
+ tc._agreed_pubkey_algorithm
+ == "ssh-rsa-cert-v01@openssh.com"
+ )
+
+ @requires_sha1_signing
def uses_first_preferred_algo_if_key_type_not_in_list(self):
# This is functionally the same as legacy AuthHandler, just
# arriving at the same place in a different manner.
diff --git a/tests/pkey.py b/tests/pkey.py
index 98193165..25202a06 100644
--- a/tests/pkey.py
+++ b/tests/pkey.py
@@ -1,7 +1,14 @@
from pytest import raises
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey
-from paramiko import PKey, Ed25519Key, RSAKey, UnknownKeyType, Message
+from paramiko import (
+ PKey,
+ Ed25519Key,
+ RSAKey,
+ UnknownKeyType,
+ Message,
+ PublicBlob,
+)
from ._util import _support
@@ -37,6 +44,47 @@ class PKey_:
with raises(ValueError):
PKey.from_path(__file__)
+ class automatically_loads_certificates:
+ def existing_cert_loaded_when_given_key_path(self):
+ key = PKey.from_path(_support("rsa.key"))
+ # Public blob exists despite no .load_certificate call
+ assert key.public_blob is not None
+ assert (
+ key.public_blob.key_type == "ssh-rsa-cert-v01@openssh.com"
+ )
+ # And it's definitely the one we expected
+ assert key.public_blob == PublicBlob.from_file(
+ _support("rsa.key-cert.pub")
+ )
+
+ def can_be_given_cert_path_instead(self):
+ key = PKey.from_path(_support("rsa.key-cert.pub"))
+ # It's still a key, not a PublicBlob
+ assert isinstance(key, RSAKey)
+ # Public blob exists despite no .load_certificate call
+ assert key.public_blob is not None
+ assert (
+ key.public_blob.key_type == "ssh-rsa-cert-v01@openssh.com"
+ )
+ # And it's definitely the one we expected
+ assert key.public_blob == PublicBlob.from_file(
+ _support("rsa.key-cert.pub")
+ )
+
+ def no_cert_load_if_no_cert(self):
+ # This key exists (it's a copy of the regular one) but has no
+ # matching -cert.pub
+ key = PKey.from_path(_support("rsa-lonely.key"))
+ assert key.public_blob is None
+
+ def excepts_usefully_if_no_key_only_cert(self):
+ # TODO: is that truly an error condition? the cert is ~the
+ # pubkey and we still require the privkey for signing, yea?
+ # This cert exists (it's a copy of the regular one) but there's
+ # no rsa-missing.key to load.
+ with raises(FileNotFoundError) as info:
+ PKey.from_path(_support("rsa-missing.key-cert.pub"))
+ assert info.value.filename.endswith("rsa-missing.key")
class load_certificate:
def rsa_public_cert_blobs(self):