summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--paramiko/kex_ecdh_nist.py114
-rw-r--r--paramiko/transport.py9
-rw-r--r--tests/test_kex.py57
3 files changed, 178 insertions, 2 deletions
diff --git a/paramiko/kex_ecdh_nist.py b/paramiko/kex_ecdh_nist.py
new file mode 100644
index 00000000..8961dd14
--- /dev/null
+++ b/paramiko/kex_ecdh_nist.py
@@ -0,0 +1,114 @@
+"""
+Ephemeral Elliptic Curve Diffie-Hellman (ECDH) key exchange
+RFC 5656, Section 4
+"""
+
+from hashlib import sha256, sha384, sha512
+from paramiko.message import Message
+from paramiko.py3compat import byte_chr, long
+from paramiko.ssh_exception import SSHException
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.asymmetric import ec
+from binascii import hexlify
+
+_MSG_KEXECDH_INIT, _MSG_KEXECDH_REPLY = range(30, 32)
+c_MSG_KEXECDH_INIT, c_MSG_KEXECDH_REPLY = [byte_chr(c) for c in range(30, 32)]
+
+
+class KexNistp256():
+
+ name = "ecdh-sha2-nistp256"
+ hash_algo = sha256
+ curve = ec.SECP256R1()
+
+ def __init__(self, transport):
+ self.transport = transport
+ #private key, client public and server public keys
+ self.P = long(0)
+ self.Q_C = None
+ self.Q_S = None
+
+ def start_kex(self):
+ self._generate_key_pair()
+ if self.transport.server_mode:
+ self.transport._expect_packet(_MSG_KEXECDH_INIT)
+ return
+ m = Message()
+ m.add_byte(c_MSG_KEXECDH_INIT)
+ #SEC1: V2.0 2.3.3 Elliptic-Curve-Point-to-Octet-String Conversion
+ m.add_string(self.Q_C.public_numbers().encode_point())
+ self.transport._send_message(m)
+ self.transport._expect_packet(_MSG_KEXECDH_REPLY)
+
+ def parse_next(self, ptype, m):
+ if self.transport.server_mode and (ptype == _MSG_KEXECDH_INIT):
+ return self._parse_kexecdh_init(m)
+ elif not self.transport.server_mode and (ptype == _MSG_KEXECDH_REPLY):
+ return self._parse_kexecdh_reply(m)
+ raise SSHException('KexECDH asked to handle packet type %d' % ptype)
+
+ def _generate_key_pair(self):
+ self.P = ec.generate_private_key(self.curve, default_backend())
+ if self.transport.server_mode:
+ self.Q_S = self.P.public_key()
+ return
+ self.Q_C = self.P.public_key()
+
+ def _parse_kexecdh_init(self, m):
+ Q_C_bytes = m.get_string()
+ self.Q_C = ec.EllipticCurvePublicNumbers.from_encoded_point(self.curve, Q_C_bytes)
+ K_S = self.transport.get_server_key().asbytes()
+ K = self.P.exchange(ec.ECDH(), self.Q_C.public_key(default_backend()))
+ K = long(hexlify(K), 16)
+ #compute exchange hash
+ hm = Message()
+ hm.add(self.transport.remote_version, self.transport.local_version,
+ self.transport.remote_kex_init, self.transport.local_kex_init)
+ hm.add_string(K_S)
+ hm.add_string(Q_C_bytes)
+ #SEC1: V2.0 2.3.3 Elliptic-Curve-Point-to-Octet-String Conversion
+ hm.add_string(self.Q_S.public_numbers().encode_point())
+ hm.add_mpint(long(K))
+ H = self.hash_algo(hm.asbytes()).digest()
+ self.transport._set_K_H(K, H)
+ sig = self.transport.get_server_key().sign_ssh_data(H)
+ #construct reply
+ m = Message()
+ m.add_byte(c_MSG_KEXECDH_REPLY)
+ m.add_string(K_S)
+ m.add_string(self.Q_S.public_numbers().encode_point())
+ m.add_string(sig)
+ self.transport._send_message(m)
+ self.transport._activate_outbound()
+
+ def _parse_kexecdh_reply(self, m):
+ K_S = m.get_string()
+ Q_S_bytes = m.get_string()
+ self.Q_S = ec.EllipticCurvePublicNumbers.from_encoded_point(self.curve, Q_S_bytes)
+ sig = m.get_binary()
+ K = self.P.exchange(ec.ECDH(), self.Q_S.public_key(default_backend()))
+ K = long(hexlify(K), 16)
+ #compute exchange hash and verify signature
+ hm = Message()
+ hm.add(self.transport.local_version, self.transport.remote_version,
+ self.transport.local_kex_init, self.transport.remote_kex_init)
+ hm.add_string(K_S)
+ #SEC1: V2.0 2.3.3 Elliptic-Curve-Point-to-Octet-String Conversion
+ hm.add_string(self.Q_C.public_numbers().encode_point())
+ hm.add_string(Q_S_bytes)
+ hm.add_mpint(K)
+ self.transport._set_K_H(K, self.hash_algo(hm.asbytes()).digest())
+ self.transport._verify_key(K_S, sig)
+ self.transport._activate_outbound()
+
+
+class KexNistp384(KexNistp256):
+ name = "ecdh-sha2-nistp384"
+ hash_algo = sha384
+ curve = ec.SECP384R1()
+
+
+class KexNistp521(KexNistp256):
+ name = "ecdh-sha2-nistp521"
+ hash_algo = sha512
+ curve = ec.SECP521R1()
diff --git a/paramiko/transport.py b/paramiko/transport.py
index 6071a8bc..4d064751 100644
--- a/paramiko/transport.py
+++ b/paramiko/transport.py
@@ -58,6 +58,7 @@ from paramiko.ed25519key import Ed25519Key
from paramiko.kex_gex import KexGex, KexGexSHA256
from paramiko.kex_group1 import KexGroup1
from paramiko.kex_group14 import KexGroup14
+from paramiko.kex_ecdh_nist import KexNistp256, KexNistp384, KexNistp521
from paramiko.kex_gss import KexGSSGex, KexGSSGroup1, KexGSSGroup14
from paramiko.message import Message
from paramiko.packet import Packetizer, NeedRekeyException
@@ -137,6 +138,9 @@ class Transport(threading.Thread, ClosingContextManager):
'diffie-hellman-group14-sha1',
'diffie-hellman-group-exchange-sha1',
'diffie-hellman-group-exchange-sha256',
+ 'ecdh-sha2-nistp256',
+ 'ecdh-sha2-nistp384',
+ 'ecdh-sha2-nistp521',
)
_preferred_compression = ('none',)
@@ -229,7 +233,10 @@ class Transport(threading.Thread, ClosingContextManager):
'diffie-hellman-group-exchange-sha256': KexGexSHA256,
'gss-group1-sha1-toWM5Slw5Ew8Mqkay+al2g==': KexGSSGroup1,
'gss-group14-sha1-toWM5Slw5Ew8Mqkay+al2g==': KexGSSGroup14,
- 'gss-gex-sha1-toWM5Slw5Ew8Mqkay+al2g==': KexGSSGex
+ 'gss-gex-sha1-toWM5Slw5Ew8Mqkay+al2g==': KexGSSGex,
+ 'ecdh-sha2-nistp256': KexNistp256,
+ 'ecdh-sha2-nistp384': KexNistp384,
+ 'ecdh-sha2-nistp521': KexNistp521
}
_compression_info = {
diff --git a/tests/test_kex.py b/tests/test_kex.py
index 19804fbf..b7f588f7 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -20,7 +20,7 @@
Some unit tests for the key exchange protocols.
"""
-from binascii import hexlify
+from binascii import hexlify, unhexlify
import os
import unittest
@@ -29,11 +29,24 @@ from paramiko.kex_group1 import KexGroup1
from paramiko.kex_gex import KexGex, KexGexSHA256
from paramiko import Message
from paramiko.common import byte_chr
+from paramiko.kex_ecdh_nist import KexNistp256
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.asymmetric import ec
def dummy_urandom(n):
return byte_chr(0xcc) * n
+def dummy_generate_key_pair(obj):
+ private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037
+ public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989"
+ public_key_numbers_obj = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers))
+ obj.P = ec.EllipticCurvePrivateNumbers(private_value=private_key_value, public_numbers=public_key_numbers_obj).private_key(default_backend())
+ if obj.transport.server_mode:
+ obj.Q_S = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)).public_key(default_backend())
+ return
+ obj.Q_C = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)).public_key(default_backend())
+
class FakeKey (object):
def __str__(self):
@@ -93,9 +106,12 @@ class KexTest (unittest.TestCase):
def setUp(self):
self._original_urandom = os.urandom
os.urandom = dummy_urandom
+ self._original_generate_key_pair = KexNistp256._generate_key_pair
+ KexNistp256._generate_key_pair = dummy_generate_key_pair
def tearDown(self):
os.urandom = self._original_urandom
+ KexNistp256._generate_key_pair = self._original_generate_key_pair
def test_1_group1_client(self):
transport = FakeTransport()
@@ -369,4 +385,43 @@ class KexTest (unittest.TestCase):
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
+ def test_11_kex_nistp256_client(self):
+ K = 91610929826364598472338906427792435253694642563583721654249504912114314269754
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexNistp256(transport)
+ kex.start_kex()
+ self.assertEqual((paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY,), transport._expect)
+
+ #fake reply
+ msg = Message()
+ msg.add_string('fake-host-key')
+ Q_S = unhexlify("043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210")
+ msg.add_string(Q_S)
+ msg.add_string('fake-sig')
+ msg.rewind()
+ kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY, msg)
+ H = b'BAF7CE243A836037EB5D2221420F35C02B9AB6C957FE3BDE3369307B9612570A'
+ self.assertEqual(K, kex.transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify)
+ self.assertTrue(transport._activated)
+
+ def test_12_kex_nistp256_server(self):
+ K = 91610929826364598472338906427792435253694642563583721654249504912114314269754
+ transport = FakeTransport()
+ transport.server_mode = True
+ kex = KexNistp256(transport)
+ kex.start_kex()
+ self.assertEqual((paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT,), transport._expect)
+ #fake init
+ msg=Message()
+ Q_C = unhexlify("043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210")
+ H = b'2EF4957AFD530DD3F05DBEABF68D724FACC060974DA9704F2AEE4C3DE861E7CA'
+ msg.add_string(Q_C)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT, msg)
+ self.assertEqual(K, transport._K)
+ self.assertTrue(transport._activated)
+ self.assertEqual(H, hexlify(transport._H).upper())