diff options
-rw-r--r-- | paramiko/transport.py | 11 | ||||
-rw-r--r-- | tests/test_kex.py | 57 |
2 files changed, 66 insertions, 2 deletions
diff --git a/paramiko/transport.py b/paramiko/transport.py index c4f0ca7c..8775e434 100644 --- a/paramiko/transport.py +++ b/paramiko/transport.py @@ -55,6 +55,9 @@ from paramiko.dsskey import DSSKey from paramiko.kex_gex import KexGex, KexGexSHA256 from paramiko.kex_group1 import KexGroup1 from paramiko.kex_group14 import KexGroup14 +from paramiko.kex_nistp256 import KexNistp256 +from paramiko.kex_nistp384 import KexNistp384 +from paramiko.kex_nistp521 import KexNistp521 from paramiko.kex_gss import KexGSSGex, KexGSSGroup1, KexGSSGroup14, NullHostKey from paramiko.message import Message from paramiko.packet import Packetizer, NeedRekeyException @@ -128,6 +131,9 @@ class Transport (threading.Thread, ClosingContextManager): 'diffie-hellman-group14-sha1', 'diffie-hellman-group-exchange-sha1', 'diffie-hellman-group-exchange-sha256', + 'ecdh-sha2-nistp256', + 'ecdh-sha2-nistp384', + 'ecdh-sha2-nistp521', ) _preferred_compression = ('none',) @@ -217,7 +223,10 @@ class Transport (threading.Thread, ClosingContextManager): 'diffie-hellman-group-exchange-sha256': KexGexSHA256, 'gss-group1-sha1-toWM5Slw5Ew8Mqkay+al2g==': KexGSSGroup1, 'gss-group14-sha1-toWM5Slw5Ew8Mqkay+al2g==': KexGSSGroup14, - 'gss-gex-sha1-toWM5Slw5Ew8Mqkay+al2g==': KexGSSGex + 'gss-gex-sha1-toWM5Slw5Ew8Mqkay+al2g==': KexGSSGex, + 'ecdh-sha2-nistp256': KexNistp256, + 'ecdh-sha2-nistp384': KexNistp384, + 'ecdh-sha2-nistp521': KexNistp521 } _compression_info = { diff --git a/tests/test_kex.py b/tests/test_kex.py index 19804fbf..619b2722 100644 --- a/tests/test_kex.py +++ b/tests/test_kex.py @@ -20,7 +20,7 @@ Some unit tests for the key exchange protocols. """ -from binascii import hexlify +from binascii import hexlify, unhexlify import os import unittest @@ -29,11 +29,24 @@ from paramiko.kex_group1 import KexGroup1 from paramiko.kex_gex import KexGex, KexGexSHA256 from paramiko import Message from paramiko.common import byte_chr +from paramiko.kex_nistp256 import KexNistp256 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import ec def dummy_urandom(n): return byte_chr(0xcc) * n +def dummy_generate_key_pair(obj): + private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037 + public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989" + public_key_numbers_obj = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)) + obj.P = ec.EllipticCurvePrivateNumbers(private_value=private_key_value, public_numbers=public_key_numbers_obj).private_key(default_backend()) + if obj.transport.server_mode: + obj.Q_S = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)).public_key(default_backend()) + return + obj.Q_C = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)).public_key(default_backend()) + class FakeKey (object): def __str__(self): @@ -93,9 +106,12 @@ class KexTest (unittest.TestCase): def setUp(self): self._original_urandom = os.urandom os.urandom = dummy_urandom + self._original_generate_key_pair = KexNistp256._generate_key_pair + KexNistp256._generate_key_pair = dummy_generate_key_pair def tearDown(self): os.urandom = self._original_urandom + KexNistp256._generate_key_pair = self._original_generate_key_pair def test_1_group1_client(self): transport = FakeTransport() @@ -369,4 +385,43 @@ class KexTest (unittest.TestCase): self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) + def test_11_kex_nistp256_client(self): + K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 + transport = FakeTransport() + transport.server_mode = False + kex = KexNistp256(transport) + kex.start_kex() + self.assertEqual((paramiko.kex_nistp256._MSG_KEXECDH_REPLY,), transport._expect) + + #fake reply + msg = Message() + msg.add_string('fake-host-key') + Q_S = unhexlify("043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210") + msg.add_string(Q_S) + msg.add_string('fake-sig') + msg.rewind() + kex.parse_next(paramiko.kex_nistp256._MSG_KEXECDH_REPLY, msg) + H = b'BAF7CE243A836037EB5D2221420F35C02B9AB6C957FE3BDE3369307B9612570A' + self.assertEqual(K, kex.transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify) + self.assertTrue(transport._activated) + + def test_12_kex_nistp256_server(self): + K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 + transport = FakeTransport() + transport.server_mode = True + kex = KexNistp256(transport) + kex.start_kex() + self.assertEqual((paramiko.kex_nistp256._MSG_KEXECDH_INIT,), transport._expect) + #fake init + msg=Message() + Q_C = unhexlify("043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210") + H = b'2EF4957AFD530DD3F05DBEABF68D724FACC060974DA9704F2AEE4C3DE861E7CA' + msg.add_string(Q_C) + msg.rewind() + kex.parse_next(paramiko.kex_nistp256._MSG_KEXECDH_INIT, msg) + self.assertEqual(K, transport._K) + self.assertTrue(transport._activated) + self.assertEqual(H, hexlify(transport._H).upper()) |