From 32ca2efa57d3408eae07ac52c6db4873a1c7fd0a Mon Sep 17 00:00:00 2001 From: Mikael Magnusson Date: Mon, 24 Jul 2023 20:51:23 +0200 Subject: WIP: clean up u2f --- paramiko/auth_handler.py | 9 ---- paramiko/ecdsaskkey.py | 137 ++++++----------------------------------------- paramiko/pkey.py | 2 - paramiko/transport.py | 4 -- 4 files changed, 15 insertions(+), 137 deletions(-) diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py index 6ad86204..bc7f298f 100644 --- a/paramiko/auth_handler.py +++ b/paramiko/auth_handler.py @@ -25,8 +25,6 @@ import threading import time import re -from hashlib import sha256 - from paramiko.common import ( cMSG_SERVICE_REQUEST, cMSG_DISCONNECT, @@ -233,7 +231,6 @@ class AuthHandler: _, bits = self._get_key_type_and_bits(key) m.add_string(algorithm) m.add_string(bits) - return m.asbytes() def wait_for_response(self, event): @@ -388,7 +385,6 @@ class AuthHandler: m.add_string(self.username) m.add_string("ssh-connection") m.add_string(self.auth_method) - self._log(DEBUG, "auth method {}".format(self.auth_method)) if self.auth_method == "password": m.add_boolean(False) password = b(self.password) @@ -619,7 +615,6 @@ Error Message: {} # telling us directly. No need for _finalize_pubkey_algorithm # anywhere in this flow. algorithm = m.get_text() - self._log(DEBUG, "publickey {}".format(algorithm)) keyblob = m.get_binary() try: key = self._generate_key_from_request(algorithm, keyblob) @@ -637,12 +632,9 @@ Error Message: {} result = self.transport.server_object.check_auth_publickey( username, key ) - self._log(DEBUG, "publickey res {} {}".format(type(key), result)) if result != AUTH_FAILED: - self._log(DEBUG, "publickey not auth_failed") # key is okay, verify it if not sig_attached: - self._log(DEBUG, "publickey not sig_attached") # client wants to know if this key is acceptable, before it # signs anything... send special "ok" message m = Message() @@ -655,7 +647,6 @@ Error Message: {} blob = self._get_session_blob( key, service, username, algorithm ) - print("blob len {}".format(len(blob))) if not key.verify_ssh_sig(blob, sig): self._log(INFO, "Auth rejected: invalid signature") result = AUTH_FAILED diff --git a/paramiko/ecdsaskkey.py b/paramiko/ecdsaskkey.py index 4459fd6a..75a61e67 100644 --- a/paramiko/ecdsaskkey.py +++ b/paramiko/ecdsaskkey.py @@ -20,12 +20,9 @@ ECDSA keys """ -from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes, serialization +from cryptography.exceptions import InvalidSignature from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric.utils import ( - decode_dss_signature, encode_dss_signature, ) @@ -33,24 +30,23 @@ from hashlib import sha256 from paramiko.common import four_byte from paramiko.message import Message -from paramiko.pkey import PKey from paramiko.ecdsakey import ( _ECDSACurveSet, - _ECDSACurve + _ECDSACurve, + ECDSAKey, ) from paramiko.ssh_exception import SSHException from paramiko.util import deflate_long -from fido2.server import ( U2FFido2Server ) -from fido2.webauthn import ( PublicKeyCredentialRpEntity ) -from pyu2f import model -from pyu2f.convenience import authenticator -import base64 +#from fido2.server import ( U2FFido2Server ) +#from fido2.webauthn import ( PublicKeyCredentialRpEntity ) +#from pyu2f import model +#from pyu2f.convenience import authenticator class _ECDSASkCurve(_ECDSACurve): IDENTIFIER_PREFIX = "sk-ecdsa-sha2-" -class ECDSASkKey(PKey): +class ECDSASkKey(ECDSAKey): """ Representation of an ECDSA security key which can be used to sign and verify SSH2 data. @@ -137,19 +133,10 @@ class ECDSASkKey(PKey): except ValueError: raise SSHException("Invalid public key") - @classmethod - def X_identifiers(cls): - return cls._ECDSA_CURVES.get_key_format_identifier_list() - - # TODO 4.0: deprecate/remove - @classmethod - def X_supported_key_format_identifiers(cls): - return cls.identifiers() - def asbytes(self): key = self.verifying_key m = Message() - m.add_string(self._get_name()) + m.add_string(self.get_name()) m.add_string(self.ecdsa_curve.nist_name) numbers = key.public_numbers() @@ -167,46 +154,20 @@ class ECDSASkKey(PKey): m.add_string(self.sk_app_id) return m.asbytes() - def __str__(self): - return self.asbytes() - - @property - def _fields(self): - return ( - self.get_name(), - self.verifying_key.public_numbers().x, - self.verifying_key.public_numbers().y, - ) - - def _get_name(self): + def get_name(self): return self.ecdsa_curve.key_format_identifier + "@openssh.com" - def X_get_bits(self): - return self.ecdsa_curve.key_length - - def X_can_sign(self): - return self.signing_key is not None - - def X_sign_ssh_data(self, data, algorithm=None): - ecdsa = ec.ECDSA(self.ecdsa_curve.hash_object()) - sig = self.signing_key.sign(data, ecdsa) - r, s = decode_dss_signature(sig) - - m = Message() - m.add_string(self.ecdsa_curve.key_format_identifier) - m.add_string(self._sigencode(r, s)) - return m - def verify_ssh_sig(self, data, msg): - print("verify_ssh_sig {} {}".format(len(data), data)) - if msg.get_text() != self._get_name(): + if msg.get_text() != self.get_name(): print("bad key format identifier") return False + sig = msg.get_binary() sigR, sigS = self._sigdecode(sig) signature = encode_dss_signature(sigR, sigS) flags = msg.get_byte() counter = msg.get_int() + if(len(msg.get_remainder()) != 0): print("bad remainder != 0") return False @@ -216,83 +177,15 @@ class ECDSASkKey(PKey): m.add_bytes(sha256(self.sk_app_id.encode('ascii')).digest()) m.add_byte(flags) m.add_int(counter) -# m.add_bytes(b'') # extensions + m.add_bytes(b'') # extensions m.add_bytes(sha256(data).digest()) - m_data = m.asbytes() -# sk_data = sha256(m_data).digest() - sk_data = m_data - - print("u2f sign {} {} {} {} {} {} <{}> <{}>".format(self.sk_app_id, len(sk_data), flags, counter, len(data), len(m_data), data, m_data)) - print("sk_data {} {}".format(sk_data, self.ecdsa_curve.hash_object())) + sk_data = m.asbytes() self.verifying_key.verify( signature, sk_data, ec.ECDSA(self.ecdsa_curve.hash_object()) ) except InvalidSignature: - print("sk sig fail") return False else: - print("sk sig ok") return True - # ...internals... - - def _from_private_key_file(self, filename, password): - data = self._read_private_key_file("EC", filename, password) - self._decode_key(data) - - def _from_private_key(self, file_obj, password): - data = self._read_private_key("EC", file_obj, password) - self._decode_key(data) - - def _decode_key(self, data): - return - pkformat, data = data - if pkformat == self._PRIVATE_KEY_FORMAT_ORIGINAL: - try: - key = serialization.load_der_private_key( - data, password=None, backend=default_backend() - ) - except ( - ValueError, - AssertionError, - TypeError, - UnsupportedAlgorithm, - ) as e: - raise SSHException(str(e)) - elif pkformat == self._PRIVATE_KEY_FORMAT_OPENSSH: - try: - msg = Message(data) - curve_name = msg.get_text() - verkey = msg.get_binary() # noqa: F841 - sigkey = msg.get_mpint() - name = "ecdsa-sha2-" + curve_name - curve = self._ECDSA_CURVES.get_by_key_format_identifier(name) - if not curve: - raise SSHException("Invalid key curve identifier") - key = ec.derive_private_key( - sigkey, curve.curve_class(), default_backend() - ) - except Exception as e: - # PKey._read_private_key_openssh() should check or return - # keytype - parsing could fail for any reason due to wrong type - raise SSHException(str(e)) - else: - self._got_bad_key_format_id(pkformat) - - self.signing_key = key - self.verifying_key = key.public_key() - curve_class = key.curve.__class__ - self.ecdsa_curve = self._ECDSA_CURVES.get_by_curve_class(curve_class) - - def _sigencode(self, r, s): - msg = Message() - msg.add_mpint(r) - msg.add_mpint(s) - return msg.asbytes() - - def _sigdecode(self, sig): - msg = Message(sig) - r = msg.get_mpint() - s = msg.get_mpint() - return r, s diff --git a/paramiko/pkey.py b/paramiko/pkey.py index 78ba9424..ef371002 100644 --- a/paramiko/pkey.py +++ b/paramiko/pkey.py @@ -793,7 +793,6 @@ class PKey: type_ = msg.get_text() # Regular public key - nothing special to do besides the implicit # type check. - print("type {} {} {}".format(type_, key_types, cert_types)) if type_ in key_types: pass # OpenSSH-compatible certificate - store full copy as .public_blob @@ -840,7 +839,6 @@ class PKey: else: constructor = "from_string" blob = getattr(PublicBlob, constructor)(value) - print("load_certificate {}".format(blob)) if not blob.key_type.startswith(self.get_name()): err = "PublicBlob type {} incompatible with key type {}" raise ValueError(err.format(blob.key_type, self.get_name())) diff --git a/paramiko/transport.py b/paramiko/transport.py index a7e87259..b8bf93b0 100644 --- a/paramiko/transport.py +++ b/paramiko/transport.py @@ -182,7 +182,6 @@ class Transport(threading.Thread, ClosingContextManager): "ecdsa-sha2-nistp256", "ecdsa-sha2-nistp384", "ecdsa-sha2-nistp521", - "sk-ecdsa-sha2-nistp256", "sk-ecdsa-sha2-nistp256@openssh.com", "rsa-sha2-512", "rsa-sha2-256", @@ -195,7 +194,6 @@ class Transport(threading.Thread, ClosingContextManager): "ecdsa-sha2-nistp256", "ecdsa-sha2-nistp384", "ecdsa-sha2-nistp521", - "sk-ecdsa-sha2-nistp256", "sk-ecdsa-sha2-nistp256@openssh.com", "rsa-sha2-512", "rsa-sha2-256", @@ -297,9 +295,7 @@ class Transport(threading.Thread, ClosingContextManager): "ecdsa-sha2-nistp384-cert-v01@openssh.com": ECDSAKey, "ecdsa-sha2-nistp521": ECDSAKey, "ecdsa-sha2-nistp521-cert-v01@openssh.com": ECDSAKey, - "sk-ecdsa-sha2-nistp256": ECDSASkKey, "sk-ecdsa-sha2-nistp256@openssh.com": ECDSASkKey, - "sk-ecdsa-sha2-nistp256-cert-v01@openssh.com": ECDSASkKey, "ssh-ed25519": Ed25519Key, "ssh-ed25519-cert-v01@openssh.com": Ed25519Key, } -- cgit v1.2.3