diff options
Diffstat (limited to 'tests/test_client.py')
-rw-r--r-- | tests/test_client.py | 100 |
1 files changed, 53 insertions, 47 deletions
diff --git a/tests/test_client.py b/tests/test_client.py index 7058f394..0483cf6f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -23,22 +23,27 @@ Some unit tests for SSHClient. from __future__ import with_statement import gc +import os import platform import socket -from tempfile import mkstemp import threading +import time import unittest -import weakref import warnings -import os -import time -from tests.util import test_path +import weakref +from tempfile import mkstemp import paramiko from paramiko.pkey import PublicBlob from paramiko.common import PY2 from paramiko.ssh_exception import SSHException, AuthenticationException +from .util import _support, slow + + +requires_gss_auth = unittest.skipUnless( + paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available" +) FINGERPRINTS = { 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c', @@ -131,10 +136,10 @@ class SSHClientTest (unittest.TestCase): allowed_keys = FINGERPRINTS.keys() self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks) - keypath = test_path('test_rsa.key') + keypath = _support('test_rsa.key') host_key = paramiko.RSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) - keypath = test_path('test_ecdsa_256.key') + keypath = _support('test_ecdsa_256.key') host_key = paramiko.ECDSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob) @@ -154,7 +159,7 @@ class SSHClientTest (unittest.TestCase): run_kwargs[key] = kwargs.pop(key, None) # Server setup threading.Thread(target=self._run, kwargs=run_kwargs).start() - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) # Client setup @@ -200,22 +205,22 @@ class SSHClientTest (unittest.TestCase): """ verify that SSHClient works with a DSA key. """ - self._test_connection(key_filename=test_path('test_dss.key')) + self._test_connection(key_filename=_support('test_dss.key')) def test_client_rsa(self): """ verify that SSHClient works with an RSA key. """ - self._test_connection(key_filename=test_path('test_rsa.key')) + self._test_connection(key_filename=_support('test_rsa.key')) def test_2_5_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ - self._test_connection(key_filename=test_path('test_ecdsa_256.key')) + self._test_connection(key_filename=_support('test_ecdsa_256.key')) def test_client_ed25519(self): - self._test_connection(key_filename=test_path('test_ed25519.key')) + self._test_connection(key_filename=_support('test_ed25519.key')) def test_3_multiple_key_files(self): """ @@ -238,7 +243,7 @@ class SSHClientTest (unittest.TestCase): try: self._test_connection( key_filename=[ - test_path('test_{0}.key'.format(x)) for x in attempt + _support('test_{}.key'.format(x)) for x in attempt ], allowed_keys=[types_[x] for x in accept], ) @@ -256,7 +261,7 @@ class SSHClientTest (unittest.TestCase): # various platforms trigger different errors here >_< self.assertRaises(SSHException, self._test_connection, - key_filename=[test_path('test_rsa.key')], + key_filename=[_support('test_rsa.key')], allowed_keys=['ecdsa-sha2-nistp256'], ) @@ -266,8 +271,8 @@ class SSHClientTest (unittest.TestCase): # server-side behavior is 100% identical.) # NOTE: only bothered whipping up one cert per overall class/family. for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'): - cert_name = 'test_{0}.key-cert.pub'.format(type_) - cert_path = test_path(os.path.join('cert_support', cert_name)) + cert_name = 'test_{}.key-cert.pub'.format(type_) + cert_path = _support(os.path.join('cert_support', cert_name)) self._test_connection( key_filename=cert_path, public_blob=PublicBlob.from_file(cert_path), @@ -281,12 +286,12 @@ class SSHClientTest (unittest.TestCase): # that a specific cert was found, along with regular authorization # succeeding proving that the overall flow works. for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'): - key_name = 'test_{0}.key'.format(type_) - key_path = test_path(os.path.join('cert_support', key_name)) + key_name = 'test_{}.key'.format(type_) + key_path = _support(os.path.join('cert_support', key_name)) self._test_connection( key_filename=key_path, public_blob=PublicBlob.from_file( - '{0}-cert.pub'.format(key_path) + '{}-cert.pub'.format(key_path) ), ) @@ -302,7 +307,7 @@ class SSHClientTest (unittest.TestCase): """ threading.Thread(target=self._run).start() hostname = '[%s]:%d' % (self.addr, self.port) - key_file = test_path('test_ecdsa_256.key') + key_file = _support('test_ecdsa_256.key') public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) self.tc = paramiko.SSHClient() @@ -325,7 +330,7 @@ class SSHClientTest (unittest.TestCase): """ warnings.filterwarnings('ignore', 'tempnam.*') - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) fd, localname = mkstemp() os.close(fd) @@ -405,7 +410,7 @@ class SSHClientTest (unittest.TestCase): """ # Start the thread with a 1 second wait. threading.Thread(target=self._run, kwargs={'delay': 1}).start() - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() @@ -432,12 +437,13 @@ class SSHClientTest (unittest.TestCase): # 'television' as per tests/test_pkey.py). NOTE: must use # key_filename, loading the actual key here with PKey will except # immediately; we're testing the try/except crap within Client. - key_filename=[test_path('test_rsa_password.key')], + key_filename=[_support('test_rsa_password.key')], # Actual password for default 'slowdive' user password='pygmalion', ) self._test_connection(**kwargs) + @slow def test_9_auth_timeout(self): """ verify that the SSHClient has a configurable auth timeout @@ -450,27 +456,25 @@ class SSHClientTest (unittest.TestCase): auth_timeout=0.5, ) + @requires_gss_auth def test_10_auth_trickledown_gsskex(self): """ Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding """ - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest kwargs = dict( gss_kex=True, - key_filename=[test_path('test_rsa.key')], + key_filename=[_support('test_rsa.key')], ) self._test_connection(**kwargs) + @requires_gss_auth def test_11_auth_trickledown_gssauth(self): """ Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding """ - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest kwargs = dict( gss_auth=True, - key_filename=[test_path('test_rsa.key')], + key_filename=[_support('test_rsa.key')], ) self._test_connection(**kwargs) @@ -489,14 +493,13 @@ class SSHClientTest (unittest.TestCase): password='pygmalion', **self.connect_kwargs ) + @requires_gss_auth def test_13_reject_policy_gsskex(self): """ verify that SSHClient's RejectPolicy works, even if gssapi-keyex was enabled but not used. """ # Test for a bug present in paramiko versions released before 2017-08-01 - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest threading.Thread(target=self._run).start() self.tc = paramiko.SSHClient() @@ -532,7 +535,7 @@ class SSHClientTest (unittest.TestCase): self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) - host_key = ktype.from_private_key_file(test_path(kfile)) + host_key = ktype.from_private_key_file(_support(kfile)) known_hosts = self.tc.get_host_keys() known_hosts.add(hostname, host_key.get_name(), host_key) @@ -556,10 +559,7 @@ class SSHClientTest (unittest.TestCase): def test_host_key_negotiation_4(self): self._client_host_key_good(paramiko.RSAKey, 'test_rsa.key') - def test_update_environment(self): - """ - Verify that environment variables can be set by the client. - """ + def _setup_for_env(self): threading.Thread(target=self._run).start() self.tc = paramiko.SSHClient() @@ -571,6 +571,11 @@ class SSHClientTest (unittest.TestCase): self.assertTrue(self.event.isSet()) self.assertTrue(self.ts.is_active()) + def test_update_environment(self): + """ + Verify that environment variables can be set by the client. + """ + self._setup_for_env() target_env = {b'A': b'B', b'C': b'd'} self.tc.exec_command('yes', environment=target_env) @@ -578,19 +583,20 @@ class SSHClientTest (unittest.TestCase): self.assertEqual(target_env, getattr(schan, 'env', {})) schan.close() - # Cannot use assertRaises in context manager mode as it is not supported - # in Python 2.6. - try: + @unittest.skip("Clients normally fail silently, thus so do we, for now") + def test_env_update_failures(self): + self._setup_for_env() + with self.assertRaises(SSHException) as manager: # Verify that a rejection by the server can be detected self.tc.exec_command('yes', environment={b'INVALID_ENV': b''}) - except SSHException as e: - self.assertTrue('INVALID_ENV' in str(e), - 'Expected variable name in error message') - self.assertTrue(isinstance(e.args[1], SSHException), - 'Expected original SSHException in exception') - else: - self.assertFalse(False, 'SSHException was not thrown.') - + self.assertTrue( + 'INVALID_ENV' in str(manager.exception), + 'Expected variable name in error message' + ) + self.assertTrue( + isinstance(manager.exception.args[1], SSHException), + 'Expected original SSHException in exception' + ) def test_missing_key_policy_accepts_classes_or_instances(self): """ |