diff options
Diffstat (limited to 'tests/test_client.py')
-rw-r--r-- | tests/test_client.py | 189 |
1 files changed, 133 insertions, 56 deletions
diff --git a/tests/test_client.py b/tests/test_client.py index 7058f394..7163fdcf 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -20,25 +20,32 @@ Some unit tests for SSHClient. """ -from __future__ import with_statement +from __future__ import with_statement, print_function import gc +import os import platform import socket -from tempfile import mkstemp import threading +import time import unittest -import weakref import warnings -import os -import time -from tests.util import test_path +import weakref +from tempfile import mkstemp + +from pytest_relaxed import raises import paramiko from paramiko.pkey import PublicBlob from paramiko.common import PY2 from paramiko.ssh_exception import SSHException, AuthenticationException +from .util import _support, slow + + +requires_gss_auth = unittest.skipUnless( + paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available" +) FINGERPRINTS = { 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c', @@ -106,8 +113,7 @@ class NullServer(paramiko.ServerInterface): return True -class SSHClientTest (unittest.TestCase): - +class ClientTest(unittest.TestCase): def setUp(self): self.sockl = socket.socket() self.sockl.bind(('localhost', 0)) @@ -120,21 +126,47 @@ class SSHClientTest (unittest.TestCase): look_for_keys=False, ) self.event = threading.Event() + self.kill_event = threading.Event() def tearDown(self): - for attr in "tc ts socks sockl".split(): - if hasattr(self, attr): - getattr(self, attr).close() - - def _run(self, allowed_keys=None, delay=0, public_blob=None): + # Shut down client Transport + if hasattr(self, 'tc'): + self.tc.close() + # Shut down shared socket + if hasattr(self, 'sockl'): + # Signal to server thread that it should shut down early; it checks + # this immediately after accept(). (In scenarios where connection + # actually succeeded during the test, this becomes a no-op.) + self.kill_event.set() + # Forcibly connect to server sock in case the server thread is + # hanging out in its accept() (e.g. if the client side of the test + # fails before it even gets to connecting); there's no other good + # way to force an accept() to exit. + put_a_sock_in_it = socket.socket() + put_a_sock_in_it.connect((self.addr, self.port)) + put_a_sock_in_it.close() + # Then close "our" end of the socket (which _should_ cause the + # accept() to bail out, but does not, for some reason. I blame + # threading.) + self.sockl.close() + + def _run( + self, allowed_keys=None, delay=0, public_blob=None, kill_event=None, + ): if allowed_keys is None: allowed_keys = FINGERPRINTS.keys() self.socks, addr = self.sockl.accept() + # If the kill event was set at this point, it indicates an early + # shutdown, so bail out now and don't even try setting up a Transport + # (which will just verbosely die.) + if kill_event and kill_event.is_set(): + self.socks.close() + return self.ts = paramiko.Transport(self.socks) - keypath = test_path('test_rsa.key') + keypath = _support('test_rsa.key') host_key = paramiko.RSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) - keypath = test_path('test_ecdsa_256.key') + keypath = _support('test_ecdsa_256.key') host_key = paramiko.ECDSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob) @@ -149,12 +181,12 @@ class SSHClientTest (unittest.TestCase): The exception is ``allowed_keys`` which is stripped and handed to the ``NullServer`` used for testing. """ - run_kwargs = {} + run_kwargs = {'kill_event': self.kill_event} for key in ('allowed_keys', 'public_blob'): run_kwargs[key] = kwargs.pop(key, None) # Server setup threading.Thread(target=self._run, kwargs=run_kwargs).start() - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) # Client setup @@ -190,6 +222,8 @@ class SSHClientTest (unittest.TestCase): stdout.close() stderr.close() + +class SSHClientTest(ClientTest): def test_1_client(self): """ verify that the SSHClient stuff works too. @@ -200,22 +234,22 @@ class SSHClientTest (unittest.TestCase): """ verify that SSHClient works with a DSA key. """ - self._test_connection(key_filename=test_path('test_dss.key')) + self._test_connection(key_filename=_support('test_dss.key')) def test_client_rsa(self): """ verify that SSHClient works with an RSA key. """ - self._test_connection(key_filename=test_path('test_rsa.key')) + self._test_connection(key_filename=_support('test_rsa.key')) def test_2_5_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ - self._test_connection(key_filename=test_path('test_ecdsa_256.key')) + self._test_connection(key_filename=_support('test_ecdsa_256.key')) def test_client_ed25519(self): - self._test_connection(key_filename=test_path('test_ed25519.key')) + self._test_connection(key_filename=_support('test_ed25519.key')) def test_3_multiple_key_files(self): """ @@ -238,7 +272,7 @@ class SSHClientTest (unittest.TestCase): try: self._test_connection( key_filename=[ - test_path('test_{0}.key'.format(x)) for x in attempt + _support('test_{}.key'.format(x)) for x in attempt ], allowed_keys=[types_[x] for x in accept], ) @@ -256,7 +290,7 @@ class SSHClientTest (unittest.TestCase): # various platforms trigger different errors here >_< self.assertRaises(SSHException, self._test_connection, - key_filename=[test_path('test_rsa.key')], + key_filename=[_support('test_rsa.key')], allowed_keys=['ecdsa-sha2-nistp256'], ) @@ -266,8 +300,8 @@ class SSHClientTest (unittest.TestCase): # server-side behavior is 100% identical.) # NOTE: only bothered whipping up one cert per overall class/family. for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'): - cert_name = 'test_{0}.key-cert.pub'.format(type_) - cert_path = test_path(os.path.join('cert_support', cert_name)) + cert_name = 'test_{}.key-cert.pub'.format(type_) + cert_path = _support(os.path.join('cert_support', cert_name)) self._test_connection( key_filename=cert_path, public_blob=PublicBlob.from_file(cert_path), @@ -281,12 +315,12 @@ class SSHClientTest (unittest.TestCase): # that a specific cert was found, along with regular authorization # succeeding proving that the overall flow works. for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'): - key_name = 'test_{0}.key'.format(type_) - key_path = test_path(os.path.join('cert_support', key_name)) + key_name = 'test_{}.key'.format(type_) + key_path = _support(os.path.join('cert_support', key_name)) self._test_connection( key_filename=key_path, public_blob=PublicBlob.from_file( - '{0}-cert.pub'.format(key_path) + '{}-cert.pub'.format(key_path) ), ) @@ -302,7 +336,7 @@ class SSHClientTest (unittest.TestCase): """ threading.Thread(target=self._run).start() hostname = '[%s]:%d' % (self.addr, self.port) - key_file = test_path('test_ecdsa_256.key') + key_file = _support('test_ecdsa_256.key') public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) self.tc = paramiko.SSHClient() @@ -325,7 +359,7 @@ class SSHClientTest (unittest.TestCase): """ warnings.filterwarnings('ignore', 'tempnam.*') - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) fd, localname = mkstemp() os.close(fd) @@ -405,7 +439,7 @@ class SSHClientTest (unittest.TestCase): """ # Start the thread with a 1 second wait. threading.Thread(target=self._run, kwargs={'delay': 1}).start() - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() @@ -432,12 +466,13 @@ class SSHClientTest (unittest.TestCase): # 'television' as per tests/test_pkey.py). NOTE: must use # key_filename, loading the actual key here with PKey will except # immediately; we're testing the try/except crap within Client. - key_filename=[test_path('test_rsa_password.key')], + key_filename=[_support('test_rsa_password.key')], # Actual password for default 'slowdive' user password='pygmalion', ) self._test_connection(**kwargs) + @slow def test_9_auth_timeout(self): """ verify that the SSHClient has a configurable auth timeout @@ -450,27 +485,25 @@ class SSHClientTest (unittest.TestCase): auth_timeout=0.5, ) + @requires_gss_auth def test_10_auth_trickledown_gsskex(self): """ Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding """ - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest kwargs = dict( gss_kex=True, - key_filename=[test_path('test_rsa.key')], + key_filename=[_support('test_rsa.key')], ) self._test_connection(**kwargs) + @requires_gss_auth def test_11_auth_trickledown_gssauth(self): """ Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding """ - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest kwargs = dict( gss_auth=True, - key_filename=[test_path('test_rsa.key')], + key_filename=[_support('test_rsa.key')], ) self._test_connection(**kwargs) @@ -489,14 +522,13 @@ class SSHClientTest (unittest.TestCase): password='pygmalion', **self.connect_kwargs ) + @requires_gss_auth def test_13_reject_policy_gsskex(self): """ verify that SSHClient's RejectPolicy works, even if gssapi-keyex was enabled but not used. """ # Test for a bug present in paramiko versions released before 2017-08-01 - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest threading.Thread(target=self._run).start() self.tc = paramiko.SSHClient() @@ -532,7 +564,7 @@ class SSHClientTest (unittest.TestCase): self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) - host_key = ktype.from_private_key_file(test_path(kfile)) + host_key = ktype.from_private_key_file(_support(kfile)) known_hosts = self.tc.get_host_keys() known_hosts.add(hostname, host_key.get_name(), host_key) @@ -556,10 +588,7 @@ class SSHClientTest (unittest.TestCase): def test_host_key_negotiation_4(self): self._client_host_key_good(paramiko.RSAKey, 'test_rsa.key') - def test_update_environment(self): - """ - Verify that environment variables can be set by the client. - """ + def _setup_for_env(self): threading.Thread(target=self._run).start() self.tc = paramiko.SSHClient() @@ -571,6 +600,11 @@ class SSHClientTest (unittest.TestCase): self.assertTrue(self.event.isSet()) self.assertTrue(self.ts.is_active()) + def test_update_environment(self): + """ + Verify that environment variables can be set by the client. + """ + self._setup_for_env() target_env = {b'A': b'B', b'C': b'd'} self.tc.exec_command('yes', environment=target_env) @@ -578,19 +612,20 @@ class SSHClientTest (unittest.TestCase): self.assertEqual(target_env, getattr(schan, 'env', {})) schan.close() - # Cannot use assertRaises in context manager mode as it is not supported - # in Python 2.6. - try: + @unittest.skip("Clients normally fail silently, thus so do we, for now") + def test_env_update_failures(self): + self._setup_for_env() + with self.assertRaises(SSHException) as manager: # Verify that a rejection by the server can be detected self.tc.exec_command('yes', environment={b'INVALID_ENV': b''}) - except SSHException as e: - self.assertTrue('INVALID_ENV' in str(e), - 'Expected variable name in error message') - self.assertTrue(isinstance(e.args[1], SSHException), - 'Expected original SSHException in exception') - else: - self.assertFalse(False, 'SSHException was not thrown.') - + self.assertTrue( + 'INVALID_ENV' in str(manager.exception), + 'Expected variable name in error message' + ) + self.assertTrue( + isinstance(manager.exception.args[1], SSHException), + 'Expected original SSHException in exception' + ) def test_missing_key_policy_accepts_classes_or_instances(self): """ @@ -607,3 +642,45 @@ class SSHClientTest (unittest.TestCase): # Hand in just the class (new behavior) client.set_missing_host_key_policy(paramiko.AutoAddPolicy) assert isinstance(client._policy, paramiko.AutoAddPolicy) + + +class PasswordPassphraseTests(ClientTest): + # TODO: most of these could reasonably be set up to use mocks/assertions + # (e.g. "gave passphrase -> expect PKey was given it as the passphrase") + # instead of suffering a real connection cycle. + # TODO: in that case, move the below to be part of an integration suite? + + def test_password_kwarg_works_for_password_auth(self): + # Straightforward / duplicate of earlier basic password test. + self._test_connection(password='pygmalion') + + # TODO: more granular exception pending #387; should be signaling "no auth + # methods available" because no key and no password + @raises(SSHException) + def test_passphrase_kwarg_not_used_for_password_auth(self): + # Using the "right" password in the "wrong" field shouldn't work. + self._test_connection(passphrase='pygmalion') + + def test_passphrase_kwarg_used_for_key_passphrase(self): + # Straightforward again, with new passphrase kwarg. + self._test_connection( + key_filename=_support('test_rsa_password.key'), + passphrase='television', + ) + + def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given(self): # noqa + # Backwards compatibility: passphrase in the password field. + self._test_connection( + key_filename=_support('test_rsa_password.key'), + password='television', + ) + + @raises(AuthenticationException) # TODO: more granular + def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given(self): # noqa + # Sanity: if we're given both fields, the password field is NOT used as + # a passphrase. + self._test_connection( + key_filename=_support('test_rsa_password.key'), + password='television', + passphrase='wat? lol no', + ) |