diff options
author | Jeff Forcier <jeff@bitprophet.org> | 2017-06-06 15:15:40 -0700 |
---|---|---|
committer | Jeff Forcier <jeff@bitprophet.org> | 2017-06-06 15:15:40 -0700 |
commit | 996fb6fd8ffb6df4f56c81e2ff199b9a600ecfc6 (patch) | |
tree | 4571659cb5f9320275cfedccc3ae897b0f425343 /tests/test_auth.py | |
parent | 57394f5199ff75abc87b0373e18be2102540d50d (diff) | |
parent | ddb277d4e4989e914b67ff26c14c7c298e7fab9f (diff) |
Merge branch 'master' into 471-int
Diffstat (limited to 'tests/test_auth.py')
-rw-r--r-- | tests/test_auth.py | 44 |
1 files changed, 35 insertions, 9 deletions
diff --git a/tests/test_auth.py b/tests/test_auth.py index ec78e3ce..58b2f44f 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -23,10 +23,12 @@ Some unit tests for authenticating over a Transport. import sys import threading import unittest +from time import sleep -from paramiko import Transport, ServerInterface, RSAKey, DSSKey, \ - BadAuthenticationType, InteractiveQuery, \ - AuthenticationException +from paramiko import ( + Transport, ServerInterface, RSAKey, DSSKey, BadAuthenticationType, + InteractiveQuery, AuthenticationException, +) from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL from paramiko.py3compat import u from tests.loop import LoopSocket @@ -73,6 +75,9 @@ class NullServer (ServerInterface): return AUTH_SUCCESSFUL if username == 'bad-server': raise Exception("Ack!") + if username == 'unresponsive-server': + sleep(5) + return AUTH_SUCCESSFUL return AUTH_FAILED def check_auth_publickey(self, username, key): @@ -83,13 +88,13 @@ class NullServer (ServerInterface): return AUTH_SUCCESSFUL return AUTH_PARTIALLY_SUCCESSFUL return AUTH_FAILED - + def check_auth_interactive(self, username, submethods): if username == 'commie': self.username = username return InteractiveQuery('password', 'Please enter a password.', ('Password', False)) return AUTH_FAILED - + def check_auth_interactive_response(self, responses): if self.username == 'commie': if (len(responses) == 1) and (responses[0] == 'cat'): @@ -111,7 +116,7 @@ class AuthTest (unittest.TestCase): self.ts.close() self.socks.close() self.sockc.close() - + def start_server(self): host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) self.public_host_key = RSAKey(data=host_key.asbytes()) @@ -120,7 +125,7 @@ class AuthTest (unittest.TestCase): self.server = NullServer() self.assertTrue(not self.event.is_set()) self.ts.start_server(self.event, self.server) - + def verify_finished(self): self.event.wait(1.0) self.assertTrue(self.event.is_set()) @@ -156,7 +161,7 @@ class AuthTest (unittest.TestCase): self.assertTrue(issubclass(etype, AuthenticationException)) self.tc.auth_password(username='slowdive', password='pygmalion') self.verify_finished() - + def test_3_multipart_auth(self): """ verify that multipart auth works. @@ -187,7 +192,7 @@ class AuthTest (unittest.TestCase): self.assertEqual(self.got_prompts, [('Password', False)]) self.assertEqual([], remain) self.verify_finished() - + def test_5_interactive_auth_fallback(self): """ verify that a password auth attempt will fallback to "interactive" @@ -232,3 +237,24 @@ class AuthTest (unittest.TestCase): except: etype, evalue, etb = sys.exc_info() self.assertTrue(issubclass(etype, AuthenticationException)) + + def test_9_auth_non_responsive(self): + """ + verify that authentication times out if server takes to long to + respond (or never responds). + """ + auth_timeout = self.tc.auth_timeout + self.tc.auth_timeout = 2 # Reduce to 2 seconds to speed up test + + try: + self.start_server() + self.tc.connect() + try: + remain = self.tc.auth_password('unresponsive-server', 'hello') + except: + etype, evalue, etb = sys.exc_info() + self.assertTrue(issubclass(etype, AuthenticationException)) + self.assertTrue('Authentication timeout' in str(evalue)) + finally: + # Restore value + self.tc.auth_timeout = auth_timeout |