summaryrefslogtreecommitdiffhomepage
path: root/tests/test_kex.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_kex.py')
-rw-r--r--tests/test_kex.py86
1 files changed, 80 insertions, 6 deletions
diff --git a/tests/test_kex.py b/tests/test_kex.py
index 69492ee2..456a213f 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -24,15 +24,24 @@ from binascii import hexlify, unhexlify
import os
import unittest
+from mock import Mock, patch
+import pytest
+
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import ec
+try:
+ from cryptography.hazmat.primitives.asymmetric import x25519
+except ImportError:
+ x25519 = None
+
import paramiko.util
from paramiko.kex_group1 import KexGroup1
from paramiko.kex_gex import KexGex, KexGexSHA256
from paramiko import Message
from paramiko.common import byte_chr
from paramiko.kex_ecdh_nist import KexNistp256
+from paramiko.kex_curve25519 import KexCurve25519
def dummy_urandom(n):
@@ -42,20 +51,20 @@ def dummy_urandom(n):
def dummy_generate_key_pair(obj):
private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037 # noqa
public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989" # noqa
- public_key_numbers_obj = ec.EllipticCurvePublicNumbers.from_encoded_point(
+ public_key_numbers_obj = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(), unhexlify(public_key_numbers)
- )
+ ).public_numbers()
obj.P = ec.EllipticCurvePrivateNumbers(
private_value=private_key_value, public_numbers=public_key_numbers_obj
).private_key(default_backend())
if obj.transport.server_mode:
- obj.Q_S = ec.EllipticCurvePublicNumbers.from_encoded_point(
+ obj.Q_S = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(), unhexlify(public_key_numbers)
- ).public_key(default_backend())
+ )
return
- obj.Q_C = ec.EllipticCurvePublicNumbers.from_encoded_point(
+ obj.Q_C = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(), unhexlify(public_key_numbers)
- ).public_key(default_backend())
+ )
class FakeKey(object):
@@ -119,9 +128,23 @@ class KexTest(unittest.TestCase):
self._original_generate_key_pair = KexNistp256._generate_key_pair
KexNistp256._generate_key_pair = dummy_generate_key_pair
+ static_x25519_key = x25519.X25519PrivateKey.from_private_bytes(
+ unhexlify(
+ b"2184abc7eb3e656d2349d2470ee695b570c227340c2b2863b6c9ff427af1f040" # noqa
+ )
+ )
+ mock_x25519 = Mock()
+ mock_x25519.generate.return_value = static_x25519_key
+ patcher = patch(
+ "paramiko.kex_curve25519.X25519PrivateKey", mock_x25519
+ )
+ patcher.start()
+ self.x25519_patcher = patcher
+
def tearDown(self):
os.urandom = self._original_urandom
KexNistp256._generate_key_pair = self._original_generate_key_pair
+ self.x25519_patcher.stop()
def test_group1_client(self):
transport = FakeTransport()
@@ -495,3 +518,54 @@ class KexTest(unittest.TestCase):
self.assertEqual(K, transport._K)
self.assertTrue(transport._activated)
self.assertEqual(H, hexlify(transport._H).upper())
+
+ @pytest.mark.skipif("not KexCurve25519.is_available()")
+ def test_kex_c25519_client(self):
+ K = 71294722834835117201316639182051104803802881348227506835068888449366462300724 # noqa
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexCurve25519(transport)
+ kex.start_kex()
+ self.assertEqual(
+ (paramiko.kex_curve25519._MSG_KEXECDH_REPLY,), transport._expect
+ )
+
+ # fake reply
+ msg = Message()
+ msg.add_string("fake-host-key")
+ Q_S = unhexlify(
+ "8d13a119452382a1ada8eea4c979f3e63ad3f0c7366786d6c5b54b87219bae49"
+ )
+ msg.add_string(Q_S)
+ msg.add_string("fake-sig")
+ msg.rewind()
+ kex.parse_next(paramiko.kex_curve25519._MSG_KEXECDH_REPLY, msg)
+ H = b"05B6F6437C0CF38D1A6C5A6F6E2558DEB54E7FC62447EBFB1E5D7407326A5475"
+ self.assertEqual(K, kex.transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify)
+ self.assertTrue(transport._activated)
+
+ @pytest.mark.skipif("not KexCurve25519.is_available()")
+ def test_kex_c25519_server(self):
+ K = 71294722834835117201316639182051104803802881348227506835068888449366462300724 # noqa
+ transport = FakeTransport()
+ transport.server_mode = True
+ kex = KexCurve25519(transport)
+ kex.start_kex()
+ self.assertEqual(
+ (paramiko.kex_curve25519._MSG_KEXECDH_INIT,), transport._expect
+ )
+
+ # fake init
+ msg = Message()
+ Q_C = unhexlify(
+ "8d13a119452382a1ada8eea4c979f3e63ad3f0c7366786d6c5b54b87219bae49"
+ )
+ H = b"DF08FCFCF31560FEE639D9B6D56D760BC3455B5ADA148E4514181023E7A9B042"
+ msg.add_string(Q_C)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_curve25519._MSG_KEXECDH_INIT, msg)
+ self.assertEqual(K, transport._K)
+ self.assertTrue(transport._activated)
+ self.assertEqual(H, hexlify(transport._H).upper())