diff options
Diffstat (limited to 'tests/test_kex.py')
-rw-r--r-- | tests/test_kex.py | 86 |
1 files changed, 80 insertions, 6 deletions
diff --git a/tests/test_kex.py b/tests/test_kex.py index 69492ee2..456a213f 100644 --- a/tests/test_kex.py +++ b/tests/test_kex.py @@ -24,15 +24,24 @@ from binascii import hexlify, unhexlify import os import unittest +from mock import Mock, patch +import pytest + from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import ec +try: + from cryptography.hazmat.primitives.asymmetric import x25519 +except ImportError: + x25519 = None + import paramiko.util from paramiko.kex_group1 import KexGroup1 from paramiko.kex_gex import KexGex, KexGexSHA256 from paramiko import Message from paramiko.common import byte_chr from paramiko.kex_ecdh_nist import KexNistp256 +from paramiko.kex_curve25519 import KexCurve25519 def dummy_urandom(n): @@ -42,20 +51,20 @@ def dummy_urandom(n): def dummy_generate_key_pair(obj): private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037 # noqa public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989" # noqa - public_key_numbers_obj = ec.EllipticCurvePublicNumbers.from_encoded_point( + public_key_numbers_obj = ec.EllipticCurvePublicKey.from_encoded_point( ec.SECP256R1(), unhexlify(public_key_numbers) - ) + ).public_numbers() obj.P = ec.EllipticCurvePrivateNumbers( private_value=private_key_value, public_numbers=public_key_numbers_obj ).private_key(default_backend()) if obj.transport.server_mode: - obj.Q_S = ec.EllipticCurvePublicNumbers.from_encoded_point( + obj.Q_S = ec.EllipticCurvePublicKey.from_encoded_point( ec.SECP256R1(), unhexlify(public_key_numbers) - ).public_key(default_backend()) + ) return - obj.Q_C = ec.EllipticCurvePublicNumbers.from_encoded_point( + obj.Q_C = ec.EllipticCurvePublicKey.from_encoded_point( ec.SECP256R1(), unhexlify(public_key_numbers) - ).public_key(default_backend()) + ) class FakeKey(object): @@ -119,9 +128,23 @@ class KexTest(unittest.TestCase): self._original_generate_key_pair = KexNistp256._generate_key_pair KexNistp256._generate_key_pair = dummy_generate_key_pair + static_x25519_key = x25519.X25519PrivateKey.from_private_bytes( + unhexlify( + b"2184abc7eb3e656d2349d2470ee695b570c227340c2b2863b6c9ff427af1f040" # noqa + ) + ) + mock_x25519 = Mock() + mock_x25519.generate.return_value = static_x25519_key + patcher = patch( + "paramiko.kex_curve25519.X25519PrivateKey", mock_x25519 + ) + patcher.start() + self.x25519_patcher = patcher + def tearDown(self): os.urandom = self._original_urandom KexNistp256._generate_key_pair = self._original_generate_key_pair + self.x25519_patcher.stop() def test_group1_client(self): transport = FakeTransport() @@ -495,3 +518,54 @@ class KexTest(unittest.TestCase): self.assertEqual(K, transport._K) self.assertTrue(transport._activated) self.assertEqual(H, hexlify(transport._H).upper()) + + @pytest.mark.skipif("not KexCurve25519.is_available()") + def test_kex_c25519_client(self): + K = 71294722834835117201316639182051104803802881348227506835068888449366462300724 # noqa + transport = FakeTransport() + transport.server_mode = False + kex = KexCurve25519(transport) + kex.start_kex() + self.assertEqual( + (paramiko.kex_curve25519._MSG_KEXECDH_REPLY,), transport._expect + ) + + # fake reply + msg = Message() + msg.add_string("fake-host-key") + Q_S = unhexlify( + "8d13a119452382a1ada8eea4c979f3e63ad3f0c7366786d6c5b54b87219bae49" + ) + msg.add_string(Q_S) + msg.add_string("fake-sig") + msg.rewind() + kex.parse_next(paramiko.kex_curve25519._MSG_KEXECDH_REPLY, msg) + H = b"05B6F6437C0CF38D1A6C5A6F6E2558DEB54E7FC62447EBFB1E5D7407326A5475" + self.assertEqual(K, kex.transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) + self.assertTrue(transport._activated) + + @pytest.mark.skipif("not KexCurve25519.is_available()") + def test_kex_c25519_server(self): + K = 71294722834835117201316639182051104803802881348227506835068888449366462300724 # noqa + transport = FakeTransport() + transport.server_mode = True + kex = KexCurve25519(transport) + kex.start_kex() + self.assertEqual( + (paramiko.kex_curve25519._MSG_KEXECDH_INIT,), transport._expect + ) + + # fake init + msg = Message() + Q_C = unhexlify( + "8d13a119452382a1ada8eea4c979f3e63ad3f0c7366786d6c5b54b87219bae49" + ) + H = b"DF08FCFCF31560FEE639D9B6D56D760BC3455B5ADA148E4514181023E7A9B042" + msg.add_string(Q_C) + msg.rewind() + kex.parse_next(paramiko.kex_curve25519._MSG_KEXECDH_INIT, msg) + self.assertEqual(K, transport._K) + self.assertTrue(transport._activated) + self.assertEqual(H, hexlify(transport._H).upper()) |