# Copyright (C) 2003-2009 Robey Pointer # # This file is part of paramiko. # # Paramiko is free software; you can redistribute it and/or modify it under the # terms of the GNU Lesser General Public License as published by the Free # Software Foundation; either version 2.1 of the License, or (at your option) # any later version. # # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. """ Some unit tests for the key exchange protocols. """ from binascii import hexlify, unhexlify import os import unittest from mock import Mock, patch import pytest from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import ec try: from cryptography.hazmat.primitives.asymmetric import x25519 except ImportError: x25519 = None import paramiko.util from paramiko.kex_group1 import KexGroup1 from paramiko.kex_gex import KexGex, KexGexSHA256 from paramiko import Message from paramiko.common import byte_chr from paramiko.kex_ecdh_nist import KexNistp256 from paramiko.kex_curve25519 import KexCurve25519 def dummy_urandom(n): return byte_chr(0xcc) * n def dummy_generate_key_pair(obj): private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037 # noqa public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989" # noqa public_key_numbers_obj = ec.EllipticCurvePublicKey.from_encoded_point( ec.SECP256R1(), unhexlify(public_key_numbers) ).public_numbers() obj.P = ec.EllipticCurvePrivateNumbers( private_value=private_key_value, public_numbers=public_key_numbers_obj ).private_key(default_backend()) if obj.transport.server_mode: obj.Q_S = ec.EllipticCurvePublicKey.from_encoded_point( ec.SECP256R1(), unhexlify(public_key_numbers) ) return obj.Q_C = ec.EllipticCurvePublicKey.from_encoded_point( ec.SECP256R1(), unhexlify(public_key_numbers) ) class FakeKey(object): def __str__(self): return "fake-key" def asbytes(self): return b"fake-key" def sign_ssh_data(self, H): return b"fake-sig" class FakeModulusPack(object): P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF # noqa G = 2 def get_modulus(self, min, ask, max): return self.G, self.P class FakeTransport(object): local_version = "SSH-2.0-paramiko_1.0" remote_version = "SSH-2.0-lame" local_kex_init = "local-kex-init" remote_kex_init = "remote-kex-init" def _send_message(self, m): self._message = m def _expect_packet(self, *t): self._expect = t def _set_K_H(self, K, H): self._K = K self._H = H def _verify_key(self, host_key, sig): self._verify = (host_key, sig) def _activate_outbound(self): self._activated = True def _log(self, level, s): pass def get_server_key(self): return FakeKey() def _get_modulus_pack(self): return FakeModulusPack() class KexTest(unittest.TestCase): K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504 # noqa def setUp(self): self._original_urandom = os.urandom os.urandom = dummy_urandom self._original_generate_key_pair = KexNistp256._generate_key_pair KexNistp256._generate_key_pair = dummy_generate_key_pair static_x25519_key = x25519.X25519PrivateKey.from_private_bytes( unhexlify( b"2184abc7eb3e656d2349d2470ee695b570c227340c2b2863b6c9ff427af1f040" # noqa ) ) mock_x25519 = Mock() mock_x25519.generate.return_value = static_x25519_key patcher = patch( "paramiko.kex_curve25519.X25519PrivateKey", mock_x25519 ) patcher.start() self.x25519_patcher = patcher def tearDown(self): os.urandom = self._original_urandom KexNistp256._generate_key_pair = self._original_generate_key_pair self.x25519_patcher.stop() def test_group1_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGroup1(transport) kex.start_kex() x = b"1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect ) # fake "reply" msg = Message() msg.add_string("fake-host-key") msg.add_mpint(69) msg.add_string("fake-sig") msg.rewind() kex.parse_next(paramiko.kex_group1._MSG_KEXDH_REPLY, msg) H = b"03079780F3D3AD0B3C6DB30C8D21685F367A86D2" self.assertEqual(self.K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) def test_group1_server(self): transport = FakeTransport() transport.server_mode = True kex = KexGroup1(transport) kex.start_kex() self.assertEqual( (paramiko.kex_group1._MSG_KEXDH_INIT,), transport._expect ) msg = Message() msg.add_mpint(69) msg.rewind() kex.parse_next(paramiko.kex_group1._MSG_KEXDH_INIT, msg) H = b"B16BF34DD10945EDE84E9C1EF24A14BFDC843389" x = b"1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(self.K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) def test_gex_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGex(transport) kex.start_kex() x = b"22000004000000080000002000" self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect ) msg = Message() msg.add_mpint(FakeModulusPack.P) msg.add_mpint(FakeModulusPack.G) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect ) msg = Message() msg.add_string("fake-host-key") msg.add_mpint(69) msg.add_string("fake-sig") msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg) H = b"A265563F2FA87F1A89BF007EE90D58BE2E4A4BD0" self.assertEqual(self.K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) def test_gex_old_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGex(transport) kex.start_kex(_test_old_style=True) x = b"1E00000800" self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect ) msg = Message() msg.add_mpint(FakeModulusPack.P) msg.add_mpint(FakeModulusPack.G) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect ) msg = Message() msg.add_string("fake-host-key") msg.add_mpint(69) msg.add_string("fake-sig") msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg) H = b"807F87B269EF7AC5EC7E75676808776A27D5864C" self.assertEqual(self.K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) def test_gex_server(self): transport = FakeTransport() transport.server_mode = True kex = KexGex(transport) kex.start_kex() self.assertEqual( ( paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, ), transport._expect, ) msg = Message() msg.add_int(1024) msg.add_int(2048) msg.add_int(4096) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg) x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect ) msg = Message() msg.add_mpint(12345) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa H = b"CE754197C21BF3452863B4F44D0B3951F12516EF" x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) def test_gex_server_with_old_client(self): transport = FakeTransport() transport.server_mode = True kex = KexGex(transport) kex.start_kex() self.assertEqual( ( paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, ), transport._expect, ) msg = Message() msg.add_int(2048) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg) x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect ) msg = Message() msg.add_mpint(12345) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa H = b"B41A06B2E59043CEFC1AE16EC31F1E2D12EC455B" x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) def test_gex_sha256_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGexSHA256(transport) kex.start_kex() x = b"22000004000000080000002000" self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect ) msg = Message() msg.add_mpint(FakeModulusPack.P) msg.add_mpint(FakeModulusPack.G) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect ) msg = Message() msg.add_string("fake-host-key") msg.add_mpint(69) msg.add_string("fake-sig") msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg) H = b"AD1A9365A67B4496F05594AD1BF656E3CDA0851289A4C1AFF549FEAE50896DF4" self.assertEqual(self.K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) def test_gex_sha256_old_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGexSHA256(transport) kex.start_kex(_test_old_style=True) x = b"1E00000800" self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect ) msg = Message() msg.add_mpint(FakeModulusPack.P) msg.add_mpint(FakeModulusPack.G) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect ) msg = Message() msg.add_string("fake-host-key") msg.add_mpint(69) msg.add_string("fake-sig") msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg) H = b"518386608B15891AE5237DEE08DCADDE76A0BCEFCE7F6DB3AD66BC41D256DFE5" self.assertEqual(self.K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) def test_gex_sha256_server(self): transport = FakeTransport() transport.server_mode = True kex = KexGexSHA256(transport) kex.start_kex() self.assertEqual( ( paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, ), transport._expect, ) msg = Message() msg.add_int(1024) msg.add_int(2048) msg.add_int(4096) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg) x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect ) msg = Message() msg.add_mpint(12345) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa H = b"CCAC0497CF0ABA1DBF55E1A3995D17F4CC31824B0E8D95CDF8A06F169D050D80" x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) def test_gex_sha256_server_with_old_client(self): transport = FakeTransport() transport.server_mode = True kex = KexGexSHA256(transport) kex.start_kex() self.assertEqual( ( paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, ), transport._expect, ) msg = Message() msg.add_int(2048) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg) x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect ) msg = Message() msg.add_mpint(12345) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa H = b"3DDD2AD840AD095E397BA4D0573972DC60F6461FD38A187CACA6615A5BC8ADBB" x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) def test_kex_nistp256_client(self): K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 # noqa transport = FakeTransport() transport.server_mode = False kex = KexNistp256(transport) kex.start_kex() self.assertEqual( (paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY,), transport._expect ) # fake reply msg = Message() msg.add_string("fake-host-key") Q_S = unhexlify( "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" # noqa ) msg.add_string(Q_S) msg.add_string("fake-sig") msg.rewind() kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY, msg) H = b"BAF7CE243A836037EB5D2221420F35C02B9AB6C957FE3BDE3369307B9612570A" self.assertEqual(K, kex.transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) def test_kex_nistp256_server(self): K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 # noqa transport = FakeTransport() transport.server_mode = True kex = KexNistp256(transport) kex.start_kex() self.assertEqual( (paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT,), transport._expect ) # fake init msg = Message() Q_C = unhexlify( "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" # noqa ) H = b"2EF4957AFD530DD3F05DBEABF68D724FACC060974DA9704F2AEE4C3DE861E7CA" msg.add_string(Q_C) msg.rewind() kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT, msg) self.assertEqual(K, transport._K) self.assertTrue(transport._activated) self.assertEqual(H, hexlify(transport._H).upper()) @pytest.mark.skipif("not KexCurve25519.is_available()") def test_kex_c25519_client(self): K = 71294722834835117201316639182051104803802881348227506835068888449366462300724 # noqa transport = FakeTransport() transport.server_mode = False kex = KexCurve25519(transport) kex.start_kex() self.assertEqual( (paramiko.kex_curve25519._MSG_KEXECDH_REPLY,), transport._expect ) # fake reply msg = Message() msg.add_string("fake-host-key") Q_S = unhexlify( "8d13a119452382a1ada8eea4c979f3e63ad3f0c7366786d6c5b54b87219bae49" ) msg.add_string(Q_S) msg.add_string("fake-sig") msg.rewind() kex.parse_next(paramiko.kex_curve25519._MSG_KEXECDH_REPLY, msg) H = b"05B6F6437C0CF38D1A6C5A6F6E2558DEB54E7FC62447EBFB1E5D7407326A5475" self.assertEqual(K, kex.transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) @pytest.mark.skipif("not KexCurve25519.is_available()") def test_kex_c25519_server(self): K = 71294722834835117201316639182051104803802881348227506835068888449366462300724 # noqa transport = FakeTransport() transport.server_mode = True kex = KexCurve25519(transport) kex.start_kex() self.assertEqual( (paramiko.kex_curve25519._MSG_KEXECDH_INIT,), transport._expect ) # fake init msg = Message() Q_C = unhexlify( "8d13a119452382a1ada8eea4c979f3e63ad3f0c7366786d6c5b54b87219bae49" ) H = b"DF08FCFCF31560FEE639D9B6D56D760BC3455B5ADA148E4514181023E7A9B042" msg.add_string(Q_C) msg.rewind() kex.parse_next(paramiko.kex_curve25519._MSG_KEXECDH_INIT, msg) self.assertEqual(K, transport._K) self.assertTrue(transport._activated) self.assertEqual(H, hexlify(transport._H).upper())