summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorPierce Lopez <pierce.lopez@gmail.com>2017-06-02 02:56:23 -0400
committerPierce Lopez <pierce.lopez@gmail.com>2017-06-07 01:58:12 -0400
commit991c39bde018d26cd24d8be2d08c507c8dd7834c (patch)
treec8657be7578afc0032a9b0cc83777fd5afd751cd
parent04a70b9bf61684bacac1e82910ac949a96cbf070 (diff)
tests for host key negotiation when there are multiple
-rw-r--r--tests/test_client.py60
1 files changed, 56 insertions, 4 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
index 5f4f0dd5..3adb9665 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -118,7 +118,11 @@ class SSHClientTest (unittest.TestCase):
allowed_keys = FINGERPRINTS.keys()
self.socks, addr = self.sockl.accept()
self.ts = paramiko.Transport(self.socks)
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ keypath = test_path('test_rsa.key')
+ host_key = paramiko.RSAKey.from_private_key_file(keypath)
+ self.ts.add_server_key(host_key)
+ keypath = test_path('test_ecdsa_256.key')
+ host_key = paramiko.ECDSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
server = NullServer(allowed_keys=allowed_keys)
if delay:
@@ -242,8 +246,9 @@ class SSHClientTest (unittest.TestCase):
verify that SSHClient's AutoAddPolicy works.
"""
threading.Thread(target=self._run).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
- public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+ hostname = '[%s]:%d' % (self.addr, self.port)
+ key_file = test_path('test_ecdsa_256.key')
+ public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file)
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
@@ -256,7 +261,8 @@ class SSHClientTest (unittest.TestCase):
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
self.assertEqual(1, len(self.tc.get_host_keys()))
- self.assertEqual(public_host_key, self.tc.get_host_keys()['[%s]:%d' % (self.addr, self.port)]['ssh-rsa'])
+ new_host_key = list(self.tc.get_host_keys()[hostname].values())[0]
+ self.assertEqual(public_host_key, new_host_key)
def test_5_save_host_keys(self):
"""
@@ -380,6 +386,52 @@ class SSHClientTest (unittest.TestCase):
)
self._test_connection(**kwargs)
+ def _client_host_key_bad(self, host_key):
+ threading.Thread(target=self._run).start()
+ hostname = '[%s]:%d' % (self.addr, self.port)
+
+ self.tc = paramiko.SSHClient()
+ self.tc.set_missing_host_key_policy(paramiko.WarningPolicy())
+ known_hosts = self.tc.get_host_keys()
+ known_hosts.add(hostname, host_key.get_name(), host_key)
+
+ self.assertRaises(
+ paramiko.BadHostKeyException,
+ self.tc.connect,
+ password='pygmalion',
+ **self.connect_kwargs
+ )
+
+ def _client_host_key_good(self, ktype, kfile):
+ threading.Thread(target=self._run).start()
+ hostname = '[%s]:%d' % (self.addr, self.port)
+
+ self.tc = paramiko.SSHClient()
+ self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
+ host_key = ktype.from_private_key_file(test_path(kfile))
+ known_hosts = self.tc.get_host_keys()
+ known_hosts.add(hostname, host_key.get_name(), host_key)
+
+ self.tc.connect(password='pygmalion', **self.connect_kwargs)
+ self.event.wait(1.0)
+ self.assertTrue(self.event.is_set())
+ self.assertTrue(self.ts.is_active())
+ self.assertEqual(True, self.ts.is_authenticated())
+
+ def test_host_key_negotiation_1(self):
+ host_key = paramiko.ECDSAKey.generate()
+ self._client_host_key_bad(host_key)
+
+ def test_host_key_negotiation_2(self):
+ host_key = paramiko.RSAKey.generate(2048)
+ self._client_host_key_bad(host_key)
+
+ def test_host_key_negotiation_3(self):
+ self._client_host_key_good(paramiko.ECDSAKey, 'test_ecdsa_256.key')
+
+ def test_host_key_negotiation_4(self):
+ self._client_host_key_good(paramiko.RSAKey, 'test_rsa.key')
+
def test_update_environment(self):
"""
Verify that environment variables can be set by the client.