summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorMikael Magnusson <mikma@users.sourceforge.net>2023-07-24 20:51:23 +0200
committerMikael Magnusson <mikma@users.sourceforge.net>2023-08-02 22:07:45 +0200
commit32ca2efa57d3408eae07ac52c6db4873a1c7fd0a (patch)
tree7e1fee78c03554cc78078c2fcf7be6e4bdb3ef4c
parent6c85a310c33b953877092d938940ca7e799f843e (diff)
WIP: clean up u2fu2f
-rw-r--r--paramiko/auth_handler.py9
-rw-r--r--paramiko/ecdsaskkey.py137
-rw-r--r--paramiko/pkey.py2
-rw-r--r--paramiko/transport.py4
4 files changed, 15 insertions, 137 deletions
diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py
index 6ad86204..bc7f298f 100644
--- a/paramiko/auth_handler.py
+++ b/paramiko/auth_handler.py
@@ -25,8 +25,6 @@ import threading
import time
import re
-from hashlib import sha256
-
from paramiko.common import (
cMSG_SERVICE_REQUEST,
cMSG_DISCONNECT,
@@ -233,7 +231,6 @@ class AuthHandler:
_, bits = self._get_key_type_and_bits(key)
m.add_string(algorithm)
m.add_string(bits)
-
return m.asbytes()
def wait_for_response(self, event):
@@ -388,7 +385,6 @@ class AuthHandler:
m.add_string(self.username)
m.add_string("ssh-connection")
m.add_string(self.auth_method)
- self._log(DEBUG, "auth method {}".format(self.auth_method))
if self.auth_method == "password":
m.add_boolean(False)
password = b(self.password)
@@ -619,7 +615,6 @@ Error Message: {}
# telling us directly. No need for _finalize_pubkey_algorithm
# anywhere in this flow.
algorithm = m.get_text()
- self._log(DEBUG, "publickey {}".format(algorithm))
keyblob = m.get_binary()
try:
key = self._generate_key_from_request(algorithm, keyblob)
@@ -637,12 +632,9 @@ Error Message: {}
result = self.transport.server_object.check_auth_publickey(
username, key
)
- self._log(DEBUG, "publickey res {} {}".format(type(key), result))
if result != AUTH_FAILED:
- self._log(DEBUG, "publickey not auth_failed")
# key is okay, verify it
if not sig_attached:
- self._log(DEBUG, "publickey not sig_attached")
# client wants to know if this key is acceptable, before it
# signs anything... send special "ok" message
m = Message()
@@ -655,7 +647,6 @@ Error Message: {}
blob = self._get_session_blob(
key, service, username, algorithm
)
- print("blob len {}".format(len(blob)))
if not key.verify_ssh_sig(blob, sig):
self._log(INFO, "Auth rejected: invalid signature")
result = AUTH_FAILED
diff --git a/paramiko/ecdsaskkey.py b/paramiko/ecdsaskkey.py
index 4459fd6a..75a61e67 100644
--- a/paramiko/ecdsaskkey.py
+++ b/paramiko/ecdsaskkey.py
@@ -20,12 +20,9 @@
ECDSA keys
"""
-from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm
-from cryptography.hazmat.backends import default_backend
-from cryptography.hazmat.primitives import hashes, serialization
+from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import (
- decode_dss_signature,
encode_dss_signature,
)
@@ -33,24 +30,23 @@ from hashlib import sha256
from paramiko.common import four_byte
from paramiko.message import Message
-from paramiko.pkey import PKey
from paramiko.ecdsakey import (
_ECDSACurveSet,
- _ECDSACurve
+ _ECDSACurve,
+ ECDSAKey,
)
from paramiko.ssh_exception import SSHException
from paramiko.util import deflate_long
-from fido2.server import ( U2FFido2Server )
-from fido2.webauthn import ( PublicKeyCredentialRpEntity )
-from pyu2f import model
-from pyu2f.convenience import authenticator
-import base64
+#from fido2.server import ( U2FFido2Server )
+#from fido2.webauthn import ( PublicKeyCredentialRpEntity )
+#from pyu2f import model
+#from pyu2f.convenience import authenticator
class _ECDSASkCurve(_ECDSACurve):
IDENTIFIER_PREFIX = "sk-ecdsa-sha2-"
-class ECDSASkKey(PKey):
+class ECDSASkKey(ECDSAKey):
"""
Representation of an ECDSA security key which can be used to sign and verify SSH2
data.
@@ -137,19 +133,10 @@ class ECDSASkKey(PKey):
except ValueError:
raise SSHException("Invalid public key")
- @classmethod
- def X_identifiers(cls):
- return cls._ECDSA_CURVES.get_key_format_identifier_list()
-
- # TODO 4.0: deprecate/remove
- @classmethod
- def X_supported_key_format_identifiers(cls):
- return cls.identifiers()
-
def asbytes(self):
key = self.verifying_key
m = Message()
- m.add_string(self._get_name())
+ m.add_string(self.get_name())
m.add_string(self.ecdsa_curve.nist_name)
numbers = key.public_numbers()
@@ -167,46 +154,20 @@ class ECDSASkKey(PKey):
m.add_string(self.sk_app_id)
return m.asbytes()
- def __str__(self):
- return self.asbytes()
-
- @property
- def _fields(self):
- return (
- self.get_name(),
- self.verifying_key.public_numbers().x,
- self.verifying_key.public_numbers().y,
- )
-
- def _get_name(self):
+ def get_name(self):
return self.ecdsa_curve.key_format_identifier + "@openssh.com"
- def X_get_bits(self):
- return self.ecdsa_curve.key_length
-
- def X_can_sign(self):
- return self.signing_key is not None
-
- def X_sign_ssh_data(self, data, algorithm=None):
- ecdsa = ec.ECDSA(self.ecdsa_curve.hash_object())
- sig = self.signing_key.sign(data, ecdsa)
- r, s = decode_dss_signature(sig)
-
- m = Message()
- m.add_string(self.ecdsa_curve.key_format_identifier)
- m.add_string(self._sigencode(r, s))
- return m
-
def verify_ssh_sig(self, data, msg):
- print("verify_ssh_sig {} {}".format(len(data), data))
- if msg.get_text() != self._get_name():
+ if msg.get_text() != self.get_name():
print("bad key format identifier")
return False
+
sig = msg.get_binary()
sigR, sigS = self._sigdecode(sig)
signature = encode_dss_signature(sigR, sigS)
flags = msg.get_byte()
counter = msg.get_int()
+
if(len(msg.get_remainder()) != 0):
print("bad remainder != 0")
return False
@@ -216,83 +177,15 @@ class ECDSASkKey(PKey):
m.add_bytes(sha256(self.sk_app_id.encode('ascii')).digest())
m.add_byte(flags)
m.add_int(counter)
-# m.add_bytes(b'') # extensions
+ m.add_bytes(b'') # extensions
m.add_bytes(sha256(data).digest())
- m_data = m.asbytes()
-# sk_data = sha256(m_data).digest()
- sk_data = m_data
-
- print("u2f sign {} {} {} {} {} {} <{}> <{}>".format(self.sk_app_id, len(sk_data), flags, counter, len(data), len(m_data), data, m_data))
- print("sk_data {} {}".format(sk_data, self.ecdsa_curve.hash_object()))
+ sk_data = m.asbytes()
self.verifying_key.verify(
signature, sk_data, ec.ECDSA(self.ecdsa_curve.hash_object())
)
except InvalidSignature:
- print("sk sig fail")
return False
else:
- print("sk sig ok")
return True
- # ...internals...
-
- def _from_private_key_file(self, filename, password):
- data = self._read_private_key_file("EC", filename, password)
- self._decode_key(data)
-
- def _from_private_key(self, file_obj, password):
- data = self._read_private_key("EC", file_obj, password)
- self._decode_key(data)
-
- def _decode_key(self, data):
- return
- pkformat, data = data
- if pkformat == self._PRIVATE_KEY_FORMAT_ORIGINAL:
- try:
- key = serialization.load_der_private_key(
- data, password=None, backend=default_backend()
- )
- except (
- ValueError,
- AssertionError,
- TypeError,
- UnsupportedAlgorithm,
- ) as e:
- raise SSHException(str(e))
- elif pkformat == self._PRIVATE_KEY_FORMAT_OPENSSH:
- try:
- msg = Message(data)
- curve_name = msg.get_text()
- verkey = msg.get_binary() # noqa: F841
- sigkey = msg.get_mpint()
- name = "ecdsa-sha2-" + curve_name
- curve = self._ECDSA_CURVES.get_by_key_format_identifier(name)
- if not curve:
- raise SSHException("Invalid key curve identifier")
- key = ec.derive_private_key(
- sigkey, curve.curve_class(), default_backend()
- )
- except Exception as e:
- # PKey._read_private_key_openssh() should check or return
- # keytype - parsing could fail for any reason due to wrong type
- raise SSHException(str(e))
- else:
- self._got_bad_key_format_id(pkformat)
-
- self.signing_key = key
- self.verifying_key = key.public_key()
- curve_class = key.curve.__class__
- self.ecdsa_curve = self._ECDSA_CURVES.get_by_curve_class(curve_class)
-
- def _sigencode(self, r, s):
- msg = Message()
- msg.add_mpint(r)
- msg.add_mpint(s)
- return msg.asbytes()
-
- def _sigdecode(self, sig):
- msg = Message(sig)
- r = msg.get_mpint()
- s = msg.get_mpint()
- return r, s
diff --git a/paramiko/pkey.py b/paramiko/pkey.py
index 78ba9424..ef371002 100644
--- a/paramiko/pkey.py
+++ b/paramiko/pkey.py
@@ -793,7 +793,6 @@ class PKey:
type_ = msg.get_text()
# Regular public key - nothing special to do besides the implicit
# type check.
- print("type {} {} {}".format(type_, key_types, cert_types))
if type_ in key_types:
pass
# OpenSSH-compatible certificate - store full copy as .public_blob
@@ -840,7 +839,6 @@ class PKey:
else:
constructor = "from_string"
blob = getattr(PublicBlob, constructor)(value)
- print("load_certificate {}".format(blob))
if not blob.key_type.startswith(self.get_name()):
err = "PublicBlob type {} incompatible with key type {}"
raise ValueError(err.format(blob.key_type, self.get_name()))
diff --git a/paramiko/transport.py b/paramiko/transport.py
index a7e87259..b8bf93b0 100644
--- a/paramiko/transport.py
+++ b/paramiko/transport.py
@@ -182,7 +182,6 @@ class Transport(threading.Thread, ClosingContextManager):
"ecdsa-sha2-nistp256",
"ecdsa-sha2-nistp384",
"ecdsa-sha2-nistp521",
- "sk-ecdsa-sha2-nistp256",
"sk-ecdsa-sha2-nistp256@openssh.com",
"rsa-sha2-512",
"rsa-sha2-256",
@@ -195,7 +194,6 @@ class Transport(threading.Thread, ClosingContextManager):
"ecdsa-sha2-nistp256",
"ecdsa-sha2-nistp384",
"ecdsa-sha2-nistp521",
- "sk-ecdsa-sha2-nistp256",
"sk-ecdsa-sha2-nistp256@openssh.com",
"rsa-sha2-512",
"rsa-sha2-256",
@@ -297,9 +295,7 @@ class Transport(threading.Thread, ClosingContextManager):
"ecdsa-sha2-nistp384-cert-v01@openssh.com": ECDSAKey,
"ecdsa-sha2-nistp521": ECDSAKey,
"ecdsa-sha2-nistp521-cert-v01@openssh.com": ECDSAKey,
- "sk-ecdsa-sha2-nistp256": ECDSASkKey,
"sk-ecdsa-sha2-nistp256@openssh.com": ECDSASkKey,
- "sk-ecdsa-sha2-nistp256-cert-v01@openssh.com": ECDSASkKey,
"ssh-ed25519": Ed25519Key,
"ssh-ed25519-cert-v01@openssh.com": Ed25519Key,
}