diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_auth.py | 25 | ||||
-rw-r--r-- | tests/test_client.py | 21 |
2 files changed, 46 insertions, 0 deletions
diff --git a/tests/test_auth.py b/tests/test_auth.py index 96f7611c..58b2f44f 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -23,6 +23,7 @@ Some unit tests for authenticating over a Transport. import sys import threading import unittest +from time import sleep from paramiko import ( Transport, ServerInterface, RSAKey, DSSKey, BadAuthenticationType, @@ -74,6 +75,9 @@ class NullServer (ServerInterface): return AUTH_SUCCESSFUL if username == 'bad-server': raise Exception("Ack!") + if username == 'unresponsive-server': + sleep(5) + return AUTH_SUCCESSFUL return AUTH_FAILED def check_auth_publickey(self, username, key): @@ -233,3 +237,24 @@ class AuthTest (unittest.TestCase): except: etype, evalue, etb = sys.exc_info() self.assertTrue(issubclass(etype, AuthenticationException)) + + def test_9_auth_non_responsive(self): + """ + verify that authentication times out if server takes to long to + respond (or never responds). + """ + auth_timeout = self.tc.auth_timeout + self.tc.auth_timeout = 2 # Reduce to 2 seconds to speed up test + + try: + self.start_server() + self.tc.connect() + try: + remain = self.tc.auth_password('unresponsive-server', 'hello') + except: + etype, evalue, etb = sys.exc_info() + self.assertTrue(issubclass(etype, AuthenticationException)) + self.assertTrue('Authentication timeout' in str(evalue)) + finally: + # Restore value + self.tc.auth_timeout = auth_timeout diff --git a/tests/test_client.py b/tests/test_client.py index 3a9001e2..aa3ff59b 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -61,6 +61,9 @@ class NullServer (paramiko.ServerInterface): def check_auth_password(self, username, password): if (username == 'slowdive') and (password == 'pygmalion'): return paramiko.AUTH_SUCCESSFUL + if (username == 'slowdive') and (password == 'unresponsive-server'): + time.sleep(5) + return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED def check_auth_publickey(self, username, key): @@ -384,6 +387,24 @@ class SSHClientTest (unittest.TestCase): ) self._test_connection(**kwargs) + def test_9_auth_timeout(self): + """ + verify that the SSHClient has a configurable auth timeout + """ + threading.Thread(target=self._run).start() + host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + public_host_key = paramiko.RSAKey(data=host_key.asbytes()) + + self.tc = paramiko.SSHClient() + self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key) + # Connect with a half second auth timeout + kwargs = dict(self.connect_kwargs, password='unresponsive-server', auth_timeout=0.5) + self.assertRaises( + paramiko.AuthenticationException, + self.tc.connect, + **kwargs + ) + def test_update_environment(self): """ Verify that environment variables can be set by the client. |