diff options
Diffstat (limited to 'tests/test_client.py')
-rw-r--r-- | tests/test_client.py | 107 |
1 files changed, 94 insertions, 13 deletions
diff --git a/tests/test_client.py b/tests/test_client.py index a340be00..e912d5b2 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -36,7 +36,7 @@ from tests.util import test_path import paramiko from paramiko.common import PY2 -from paramiko.ssh_exception import SSHException +from paramiko.ssh_exception import SSHException, AuthenticationException FINGERPRINTS = { @@ -61,6 +61,9 @@ class NullServer (paramiko.ServerInterface): def check_auth_password(self, username, password): if (username == 'slowdive') and (password == 'pygmalion'): return paramiko.AUTH_SUCCESSFUL + if (username == 'slowdive') and (password == 'unresponsive-server'): + time.sleep(5) + return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED def check_auth_publickey(self, username, key): @@ -119,7 +122,11 @@ class SSHClientTest (unittest.TestCase): allowed_keys = FINGERPRINTS.keys() self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks) - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + keypath = test_path('test_rsa.key') + host_key = paramiko.RSAKey.from_private_key_file(keypath) + self.ts.add_server_key(host_key) + keypath = test_path('test_ecdsa_256.key') + host_key = paramiko.ECDSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) server = NullServer(allowed_keys=allowed_keys) if delay: @@ -246,8 +253,9 @@ class SSHClientTest (unittest.TestCase): verify that SSHClient's AutoAddPolicy works. """ threading.Thread(target=self._run).start() - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) - public_host_key = paramiko.RSAKey(data=host_key.asbytes()) + hostname = '[%s]:%d' % (self.addr, self.port) + key_file = test_path('test_ecdsa_256.key') + public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) @@ -260,7 +268,8 @@ class SSHClientTest (unittest.TestCase): self.assertEqual('slowdive', self.ts.get_username()) self.assertEqual(True, self.ts.is_authenticated()) self.assertEqual(1, len(self.tc.get_host_keys())) - self.assertEqual(public_host_key, self.tc.get_host_keys()['[%s]:%d' % (self.addr, self.port)]['ssh-rsa']) + new_host_key = list(self.tc.get_host_keys()[hostname].values())[0] + self.assertEqual(public_host_key, new_host_key) def test_5_save_host_keys(self): """ @@ -294,13 +303,10 @@ class SSHClientTest (unittest.TestCase): verify that when an SSHClient is collected, its transport (and the transport's packetizer) is closed. """ - # Unclear why this is borked on Py3, but it is, and does not seem worth - # pursuing at the moment. Skipped on PyPy because it fails on travis - # for unknown reasons, works fine locally. - # XXX: It's the release of the references to e.g packetizer that fails - # in py3... - if not PY2 or platform.python_implementation() == "PyPy": + # Skipped on PyPy because it fails on travis for unknown reasons + if platform.python_implementation() == "PyPy": return + threading.Thread(target=self._run).start() self.tc = paramiko.SSHClient() @@ -318,8 +324,8 @@ class SSHClientTest (unittest.TestCase): del self.tc # force a collection to see whether the SSHClient object is deallocated - # correctly. 2 GCs are needed to make sure it's really collected on - # PyPy + # 2 GCs are needed on PyPy, time is needed for Python 3 + time.sleep(0.3) gc.collect() gc.collect() @@ -384,6 +390,64 @@ class SSHClientTest (unittest.TestCase): ) self._test_connection(**kwargs) + def test_9_auth_timeout(self): + """ + verify that the SSHClient has a configurable auth timeout + """ + # Connect with a half second auth timeout + self.assertRaises( + AuthenticationException, + self._test_connection, + password='unresponsive-server', + auth_timeout=0.5, + ) + + def _client_host_key_bad(self, host_key): + threading.Thread(target=self._run).start() + hostname = '[%s]:%d' % (self.addr, self.port) + + self.tc = paramiko.SSHClient() + self.tc.set_missing_host_key_policy(paramiko.WarningPolicy()) + known_hosts = self.tc.get_host_keys() + known_hosts.add(hostname, host_key.get_name(), host_key) + + self.assertRaises( + paramiko.BadHostKeyException, + self.tc.connect, + password='pygmalion', + **self.connect_kwargs + ) + + def _client_host_key_good(self, ktype, kfile): + threading.Thread(target=self._run).start() + hostname = '[%s]:%d' % (self.addr, self.port) + + self.tc = paramiko.SSHClient() + self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) + host_key = ktype.from_private_key_file(test_path(kfile)) + known_hosts = self.tc.get_host_keys() + known_hosts.add(hostname, host_key.get_name(), host_key) + + self.tc.connect(password='pygmalion', **self.connect_kwargs) + self.event.wait(1.0) + self.assertTrue(self.event.is_set()) + self.assertTrue(self.ts.is_active()) + self.assertEqual(True, self.ts.is_authenticated()) + + def test_host_key_negotiation_1(self): + host_key = paramiko.ECDSAKey.generate() + self._client_host_key_bad(host_key) + + def test_host_key_negotiation_2(self): + host_key = paramiko.RSAKey.generate(2048) + self._client_host_key_bad(host_key) + + def test_host_key_negotiation_3(self): + self._client_host_key_good(paramiko.ECDSAKey, 'test_ecdsa_256.key') + + def test_host_key_negotiation_4(self): + self._client_host_key_good(paramiko.RSAKey, 'test_rsa.key') + def test_update_environment(self): """ Verify that environment variables can be set by the client. @@ -418,3 +482,20 @@ class SSHClientTest (unittest.TestCase): 'Expected original SSHException in exception') else: self.assertFalse(False, 'SSHException was not thrown.') + + + def test_missing_key_policy_accepts_classes_or_instances(self): + """ + Client.missing_host_key_policy() can take classes or instances. + """ + # AN ACTUAL UNIT TEST?! GOOD LORD + # (But then we have to test a private API...meh.) + client = paramiko.SSHClient() + # Default + assert isinstance(client._policy, paramiko.RejectPolicy) + # Hand in an instance (classic behavior) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + assert isinstance(client._policy, paramiko.AutoAddPolicy) + # Hand in just the class (new behavior) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy) + assert isinstance(client._policy, paramiko.AutoAddPolicy) |