summaryrefslogtreecommitdiffhomepage
path: root/tests/test_auth.py
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2017-06-06 15:15:40 -0700
committerJeff Forcier <jeff@bitprophet.org>2017-06-06 15:15:40 -0700
commit996fb6fd8ffb6df4f56c81e2ff199b9a600ecfc6 (patch)
tree4571659cb5f9320275cfedccc3ae897b0f425343 /tests/test_auth.py
parent57394f5199ff75abc87b0373e18be2102540d50d (diff)
parentddb277d4e4989e914b67ff26c14c7c298e7fab9f (diff)
Merge branch 'master' into 471-int
Diffstat (limited to 'tests/test_auth.py')
-rw-r--r--tests/test_auth.py44
1 files changed, 35 insertions, 9 deletions
diff --git a/tests/test_auth.py b/tests/test_auth.py
index ec78e3ce..58b2f44f 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -23,10 +23,12 @@ Some unit tests for authenticating over a Transport.
import sys
import threading
import unittest
+from time import sleep
-from paramiko import Transport, ServerInterface, RSAKey, DSSKey, \
- BadAuthenticationType, InteractiveQuery, \
- AuthenticationException
+from paramiko import (
+ Transport, ServerInterface, RSAKey, DSSKey, BadAuthenticationType,
+ InteractiveQuery, AuthenticationException,
+)
from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL
from paramiko.py3compat import u
from tests.loop import LoopSocket
@@ -73,6 +75,9 @@ class NullServer (ServerInterface):
return AUTH_SUCCESSFUL
if username == 'bad-server':
raise Exception("Ack!")
+ if username == 'unresponsive-server':
+ sleep(5)
+ return AUTH_SUCCESSFUL
return AUTH_FAILED
def check_auth_publickey(self, username, key):
@@ -83,13 +88,13 @@ class NullServer (ServerInterface):
return AUTH_SUCCESSFUL
return AUTH_PARTIALLY_SUCCESSFUL
return AUTH_FAILED
-
+
def check_auth_interactive(self, username, submethods):
if username == 'commie':
self.username = username
return InteractiveQuery('password', 'Please enter a password.', ('Password', False))
return AUTH_FAILED
-
+
def check_auth_interactive_response(self, responses):
if self.username == 'commie':
if (len(responses) == 1) and (responses[0] == 'cat'):
@@ -111,7 +116,7 @@ class AuthTest (unittest.TestCase):
self.ts.close()
self.socks.close()
self.sockc.close()
-
+
def start_server(self):
host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
self.public_host_key = RSAKey(data=host_key.asbytes())
@@ -120,7 +125,7 @@ class AuthTest (unittest.TestCase):
self.server = NullServer()
self.assertTrue(not self.event.is_set())
self.ts.start_server(self.event, self.server)
-
+
def verify_finished(self):
self.event.wait(1.0)
self.assertTrue(self.event.is_set())
@@ -156,7 +161,7 @@ class AuthTest (unittest.TestCase):
self.assertTrue(issubclass(etype, AuthenticationException))
self.tc.auth_password(username='slowdive', password='pygmalion')
self.verify_finished()
-
+
def test_3_multipart_auth(self):
"""
verify that multipart auth works.
@@ -187,7 +192,7 @@ class AuthTest (unittest.TestCase):
self.assertEqual(self.got_prompts, [('Password', False)])
self.assertEqual([], remain)
self.verify_finished()
-
+
def test_5_interactive_auth_fallback(self):
"""
verify that a password auth attempt will fallback to "interactive"
@@ -232,3 +237,24 @@ class AuthTest (unittest.TestCase):
except:
etype, evalue, etb = sys.exc_info()
self.assertTrue(issubclass(etype, AuthenticationException))
+
+ def test_9_auth_non_responsive(self):
+ """
+ verify that authentication times out if server takes to long to
+ respond (or never responds).
+ """
+ auth_timeout = self.tc.auth_timeout
+ self.tc.auth_timeout = 2 # Reduce to 2 seconds to speed up test
+
+ try:
+ self.start_server()
+ self.tc.connect()
+ try:
+ remain = self.tc.auth_password('unresponsive-server', 'hello')
+ except:
+ etype, evalue, etb = sys.exc_info()
+ self.assertTrue(issubclass(etype, AuthenticationException))
+ self.assertTrue('Authentication timeout' in str(evalue))
+ finally:
+ # Restore value
+ self.tc.auth_timeout = auth_timeout