summaryrefslogtreecommitdiffhomepage
path: root/tests/test_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_client.py')
-rw-r--r--tests/test_client.py107
1 files changed, 94 insertions, 13 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
index a340be00..e912d5b2 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -36,7 +36,7 @@ from tests.util import test_path
import paramiko
from paramiko.common import PY2
-from paramiko.ssh_exception import SSHException
+from paramiko.ssh_exception import SSHException, AuthenticationException
FINGERPRINTS = {
@@ -61,6 +61,9 @@ class NullServer (paramiko.ServerInterface):
def check_auth_password(self, username, password):
if (username == 'slowdive') and (password == 'pygmalion'):
return paramiko.AUTH_SUCCESSFUL
+ if (username == 'slowdive') and (password == 'unresponsive-server'):
+ time.sleep(5)
+ return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
def check_auth_publickey(self, username, key):
@@ -119,7 +122,11 @@ class SSHClientTest (unittest.TestCase):
allowed_keys = FINGERPRINTS.keys()
self.socks, addr = self.sockl.accept()
self.ts = paramiko.Transport(self.socks)
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ keypath = test_path('test_rsa.key')
+ host_key = paramiko.RSAKey.from_private_key_file(keypath)
+ self.ts.add_server_key(host_key)
+ keypath = test_path('test_ecdsa_256.key')
+ host_key = paramiko.ECDSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
server = NullServer(allowed_keys=allowed_keys)
if delay:
@@ -246,8 +253,9 @@ class SSHClientTest (unittest.TestCase):
verify that SSHClient's AutoAddPolicy works.
"""
threading.Thread(target=self._run).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
- public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+ hostname = '[%s]:%d' % (self.addr, self.port)
+ key_file = test_path('test_ecdsa_256.key')
+ public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file)
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
@@ -260,7 +268,8 @@ class SSHClientTest (unittest.TestCase):
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
self.assertEqual(1, len(self.tc.get_host_keys()))
- self.assertEqual(public_host_key, self.tc.get_host_keys()['[%s]:%d' % (self.addr, self.port)]['ssh-rsa'])
+ new_host_key = list(self.tc.get_host_keys()[hostname].values())[0]
+ self.assertEqual(public_host_key, new_host_key)
def test_5_save_host_keys(self):
"""
@@ -294,13 +303,10 @@ class SSHClientTest (unittest.TestCase):
verify that when an SSHClient is collected, its transport (and the
transport's packetizer) is closed.
"""
- # Unclear why this is borked on Py3, but it is, and does not seem worth
- # pursuing at the moment. Skipped on PyPy because it fails on travis
- # for unknown reasons, works fine locally.
- # XXX: It's the release of the references to e.g packetizer that fails
- # in py3...
- if not PY2 or platform.python_implementation() == "PyPy":
+ # Skipped on PyPy because it fails on travis for unknown reasons
+ if platform.python_implementation() == "PyPy":
return
+
threading.Thread(target=self._run).start()
self.tc = paramiko.SSHClient()
@@ -318,8 +324,8 @@ class SSHClientTest (unittest.TestCase):
del self.tc
# force a collection to see whether the SSHClient object is deallocated
- # correctly. 2 GCs are needed to make sure it's really collected on
- # PyPy
+ # 2 GCs are needed on PyPy, time is needed for Python 3
+ time.sleep(0.3)
gc.collect()
gc.collect()
@@ -384,6 +390,64 @@ class SSHClientTest (unittest.TestCase):
)
self._test_connection(**kwargs)
+ def test_9_auth_timeout(self):
+ """
+ verify that the SSHClient has a configurable auth timeout
+ """
+ # Connect with a half second auth timeout
+ self.assertRaises(
+ AuthenticationException,
+ self._test_connection,
+ password='unresponsive-server',
+ auth_timeout=0.5,
+ )
+
+ def _client_host_key_bad(self, host_key):
+ threading.Thread(target=self._run).start()
+ hostname = '[%s]:%d' % (self.addr, self.port)
+
+ self.tc = paramiko.SSHClient()
+ self.tc.set_missing_host_key_policy(paramiko.WarningPolicy())
+ known_hosts = self.tc.get_host_keys()
+ known_hosts.add(hostname, host_key.get_name(), host_key)
+
+ self.assertRaises(
+ paramiko.BadHostKeyException,
+ self.tc.connect,
+ password='pygmalion',
+ **self.connect_kwargs
+ )
+
+ def _client_host_key_good(self, ktype, kfile):
+ threading.Thread(target=self._run).start()
+ hostname = '[%s]:%d' % (self.addr, self.port)
+
+ self.tc = paramiko.SSHClient()
+ self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
+ host_key = ktype.from_private_key_file(test_path(kfile))
+ known_hosts = self.tc.get_host_keys()
+ known_hosts.add(hostname, host_key.get_name(), host_key)
+
+ self.tc.connect(password='pygmalion', **self.connect_kwargs)
+ self.event.wait(1.0)
+ self.assertTrue(self.event.is_set())
+ self.assertTrue(self.ts.is_active())
+ self.assertEqual(True, self.ts.is_authenticated())
+
+ def test_host_key_negotiation_1(self):
+ host_key = paramiko.ECDSAKey.generate()
+ self._client_host_key_bad(host_key)
+
+ def test_host_key_negotiation_2(self):
+ host_key = paramiko.RSAKey.generate(2048)
+ self._client_host_key_bad(host_key)
+
+ def test_host_key_negotiation_3(self):
+ self._client_host_key_good(paramiko.ECDSAKey, 'test_ecdsa_256.key')
+
+ def test_host_key_negotiation_4(self):
+ self._client_host_key_good(paramiko.RSAKey, 'test_rsa.key')
+
def test_update_environment(self):
"""
Verify that environment variables can be set by the client.
@@ -418,3 +482,20 @@ class SSHClientTest (unittest.TestCase):
'Expected original SSHException in exception')
else:
self.assertFalse(False, 'SSHException was not thrown.')
+
+
+ def test_missing_key_policy_accepts_classes_or_instances(self):
+ """
+ Client.missing_host_key_policy() can take classes or instances.
+ """
+ # AN ACTUAL UNIT TEST?! GOOD LORD
+ # (But then we have to test a private API...meh.)
+ client = paramiko.SSHClient()
+ # Default
+ assert isinstance(client._policy, paramiko.RejectPolicy)
+ # Hand in an instance (classic behavior)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ assert isinstance(client._policy, paramiko.AutoAddPolicy)
+ # Hand in just the class (new behavior)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
+ assert isinstance(client._policy, paramiko.AutoAddPolicy)