diff options
-rw-r--r-- | paramiko/kex_ecdh_nist.py | 114 | ||||
-rw-r--r-- | paramiko/transport.py | 9 | ||||
-rw-r--r-- | tests/test_kex.py | 57 |
3 files changed, 178 insertions, 2 deletions
diff --git a/paramiko/kex_ecdh_nist.py b/paramiko/kex_ecdh_nist.py new file mode 100644 index 00000000..8961dd14 --- /dev/null +++ b/paramiko/kex_ecdh_nist.py @@ -0,0 +1,114 @@ +""" +Ephemeral Elliptic Curve Diffie-Hellman (ECDH) key exchange +RFC 5656, Section 4 +""" + +from hashlib import sha256, sha384, sha512 +from paramiko.message import Message +from paramiko.py3compat import byte_chr, long +from paramiko.ssh_exception import SSHException +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import ec +from binascii import hexlify + +_MSG_KEXECDH_INIT, _MSG_KEXECDH_REPLY = range(30, 32) +c_MSG_KEXECDH_INIT, c_MSG_KEXECDH_REPLY = [byte_chr(c) for c in range(30, 32)] + + +class KexNistp256(): + + name = "ecdh-sha2-nistp256" + hash_algo = sha256 + curve = ec.SECP256R1() + + def __init__(self, transport): + self.transport = transport + #private key, client public and server public keys + self.P = long(0) + self.Q_C = None + self.Q_S = None + + def start_kex(self): + self._generate_key_pair() + if self.transport.server_mode: + self.transport._expect_packet(_MSG_KEXECDH_INIT) + return + m = Message() + m.add_byte(c_MSG_KEXECDH_INIT) + #SEC1: V2.0 2.3.3 Elliptic-Curve-Point-to-Octet-String Conversion + m.add_string(self.Q_C.public_numbers().encode_point()) + self.transport._send_message(m) + self.transport._expect_packet(_MSG_KEXECDH_REPLY) + + def parse_next(self, ptype, m): + if self.transport.server_mode and (ptype == _MSG_KEXECDH_INIT): + return self._parse_kexecdh_init(m) + elif not self.transport.server_mode and (ptype == _MSG_KEXECDH_REPLY): + return self._parse_kexecdh_reply(m) + raise SSHException('KexECDH asked to handle packet type %d' % ptype) + + def _generate_key_pair(self): + self.P = ec.generate_private_key(self.curve, default_backend()) + if self.transport.server_mode: + self.Q_S = self.P.public_key() + return + self.Q_C = self.P.public_key() + + def _parse_kexecdh_init(self, m): + Q_C_bytes = m.get_string() + self.Q_C = ec.EllipticCurvePublicNumbers.from_encoded_point(self.curve, Q_C_bytes) + K_S = self.transport.get_server_key().asbytes() + K = self.P.exchange(ec.ECDH(), self.Q_C.public_key(default_backend())) + K = long(hexlify(K), 16) + #compute exchange hash + hm = Message() + hm.add(self.transport.remote_version, self.transport.local_version, + self.transport.remote_kex_init, self.transport.local_kex_init) + hm.add_string(K_S) + hm.add_string(Q_C_bytes) + #SEC1: V2.0 2.3.3 Elliptic-Curve-Point-to-Octet-String Conversion + hm.add_string(self.Q_S.public_numbers().encode_point()) + hm.add_mpint(long(K)) + H = self.hash_algo(hm.asbytes()).digest() + self.transport._set_K_H(K, H) + sig = self.transport.get_server_key().sign_ssh_data(H) + #construct reply + m = Message() + m.add_byte(c_MSG_KEXECDH_REPLY) + m.add_string(K_S) + m.add_string(self.Q_S.public_numbers().encode_point()) + m.add_string(sig) + self.transport._send_message(m) + self.transport._activate_outbound() + + def _parse_kexecdh_reply(self, m): + K_S = m.get_string() + Q_S_bytes = m.get_string() + self.Q_S = ec.EllipticCurvePublicNumbers.from_encoded_point(self.curve, Q_S_bytes) + sig = m.get_binary() + K = self.P.exchange(ec.ECDH(), self.Q_S.public_key(default_backend())) + K = long(hexlify(K), 16) + #compute exchange hash and verify signature + hm = Message() + hm.add(self.transport.local_version, self.transport.remote_version, + self.transport.local_kex_init, self.transport.remote_kex_init) + hm.add_string(K_S) + #SEC1: V2.0 2.3.3 Elliptic-Curve-Point-to-Octet-String Conversion + hm.add_string(self.Q_C.public_numbers().encode_point()) + hm.add_string(Q_S_bytes) + hm.add_mpint(K) + self.transport._set_K_H(K, self.hash_algo(hm.asbytes()).digest()) + self.transport._verify_key(K_S, sig) + self.transport._activate_outbound() + + +class KexNistp384(KexNistp256): + name = "ecdh-sha2-nistp384" + hash_algo = sha384 + curve = ec.SECP384R1() + + +class KexNistp521(KexNistp256): + name = "ecdh-sha2-nistp521" + hash_algo = sha512 + curve = ec.SECP521R1() diff --git a/paramiko/transport.py b/paramiko/transport.py index 6071a8bc..4d064751 100644 --- a/paramiko/transport.py +++ b/paramiko/transport.py @@ -58,6 +58,7 @@ from paramiko.ed25519key import Ed25519Key from paramiko.kex_gex import KexGex, KexGexSHA256 from paramiko.kex_group1 import KexGroup1 from paramiko.kex_group14 import KexGroup14 +from paramiko.kex_ecdh_nist import KexNistp256, KexNistp384, KexNistp521 from paramiko.kex_gss import KexGSSGex, KexGSSGroup1, KexGSSGroup14 from paramiko.message import Message from paramiko.packet import Packetizer, NeedRekeyException @@ -137,6 +138,9 @@ class Transport(threading.Thread, ClosingContextManager): 'diffie-hellman-group14-sha1', 'diffie-hellman-group-exchange-sha1', 'diffie-hellman-group-exchange-sha256', + 'ecdh-sha2-nistp256', + 'ecdh-sha2-nistp384', + 'ecdh-sha2-nistp521', ) _preferred_compression = ('none',) @@ -229,7 +233,10 @@ class Transport(threading.Thread, ClosingContextManager): 'diffie-hellman-group-exchange-sha256': KexGexSHA256, 'gss-group1-sha1-toWM5Slw5Ew8Mqkay+al2g==': KexGSSGroup1, 'gss-group14-sha1-toWM5Slw5Ew8Mqkay+al2g==': KexGSSGroup14, - 'gss-gex-sha1-toWM5Slw5Ew8Mqkay+al2g==': KexGSSGex + 'gss-gex-sha1-toWM5Slw5Ew8Mqkay+al2g==': KexGSSGex, + 'ecdh-sha2-nistp256': KexNistp256, + 'ecdh-sha2-nistp384': KexNistp384, + 'ecdh-sha2-nistp521': KexNistp521 } _compression_info = { diff --git a/tests/test_kex.py b/tests/test_kex.py index 19804fbf..b7f588f7 100644 --- a/tests/test_kex.py +++ b/tests/test_kex.py @@ -20,7 +20,7 @@ Some unit tests for the key exchange protocols. """ -from binascii import hexlify +from binascii import hexlify, unhexlify import os import unittest @@ -29,11 +29,24 @@ from paramiko.kex_group1 import KexGroup1 from paramiko.kex_gex import KexGex, KexGexSHA256 from paramiko import Message from paramiko.common import byte_chr +from paramiko.kex_ecdh_nist import KexNistp256 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import ec def dummy_urandom(n): return byte_chr(0xcc) * n +def dummy_generate_key_pair(obj): + private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037 + public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989" + public_key_numbers_obj = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)) + obj.P = ec.EllipticCurvePrivateNumbers(private_value=private_key_value, public_numbers=public_key_numbers_obj).private_key(default_backend()) + if obj.transport.server_mode: + obj.Q_S = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)).public_key(default_backend()) + return + obj.Q_C = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)).public_key(default_backend()) + class FakeKey (object): def __str__(self): @@ -93,9 +106,12 @@ class KexTest (unittest.TestCase): def setUp(self): self._original_urandom = os.urandom os.urandom = dummy_urandom + self._original_generate_key_pair = KexNistp256._generate_key_pair + KexNistp256._generate_key_pair = dummy_generate_key_pair def tearDown(self): os.urandom = self._original_urandom + KexNistp256._generate_key_pair = self._original_generate_key_pair def test_1_group1_client(self): transport = FakeTransport() @@ -369,4 +385,43 @@ class KexTest (unittest.TestCase): self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) + def test_11_kex_nistp256_client(self): + K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 + transport = FakeTransport() + transport.server_mode = False + kex = KexNistp256(transport) + kex.start_kex() + self.assertEqual((paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY,), transport._expect) + + #fake reply + msg = Message() + msg.add_string('fake-host-key') + Q_S = unhexlify("043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210") + msg.add_string(Q_S) + msg.add_string('fake-sig') + msg.rewind() + kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY, msg) + H = b'BAF7CE243A836037EB5D2221420F35C02B9AB6C957FE3BDE3369307B9612570A' + self.assertEqual(K, kex.transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify) + self.assertTrue(transport._activated) + + def test_12_kex_nistp256_server(self): + K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 + transport = FakeTransport() + transport.server_mode = True + kex = KexNistp256(transport) + kex.start_kex() + self.assertEqual((paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT,), transport._expect) + #fake init + msg=Message() + Q_C = unhexlify("043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210") + H = b'2EF4957AFD530DD3F05DBEABF68D724FACC060974DA9704F2AEE4C3DE861E7CA' + msg.add_string(Q_C) + msg.rewind() + kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT, msg) + self.assertEqual(K, transport._K) + self.assertTrue(transport._activated) + self.assertEqual(H, hexlify(transport._H).upper()) |