diff options
Diffstat (limited to 'tests/test_client.py')
-rw-r--r-- | tests/test_client.py | 192 |
1 files changed, 135 insertions, 57 deletions
diff --git a/tests/test_client.py b/tests/test_client.py index bfbd395f..9191fc01 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -20,7 +20,7 @@ Some unit tests for SSHClient. """ -from __future__ import with_statement +from __future__ import with_statement, print_function import gc import os @@ -33,18 +33,23 @@ import warnings import weakref from tempfile import mkstemp +from pytest_relaxed import raises + import paramiko from paramiko.pkey import PublicBlob -from paramiko.common import PY2 from paramiko.ssh_exception import SSHException, AuthenticationException from .util import _support, slow +requires_gss_auth = unittest.skipUnless( + paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available" +) + FINGERPRINTS = { - "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", - "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", - "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", + "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", # noqa + "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", # noqa + "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", # noqa "ssh-ed25519": b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80', } @@ -107,7 +112,7 @@ class NullServer(paramiko.ServerInterface): return True -class SSHClientTest(unittest.TestCase): +class ClientTest(unittest.TestCase): def setUp(self): self.sockl = socket.socket() self.sockl.bind(("localhost", 0)) @@ -120,16 +125,42 @@ class SSHClientTest(unittest.TestCase): look_for_keys=False, ) self.event = threading.Event() + self.kill_event = threading.Event() def tearDown(self): - for attr in "tc ts socks sockl".split(): - if hasattr(self, attr): - getattr(self, attr).close() - - def _run(self, allowed_keys=None, delay=0, public_blob=None): + # Shut down client Transport + if hasattr(self, "tc"): + self.tc.close() + # Shut down shared socket + if hasattr(self, "sockl"): + # Signal to server thread that it should shut down early; it checks + # this immediately after accept(). (In scenarios where connection + # actually succeeded during the test, this becomes a no-op.) + self.kill_event.set() + # Forcibly connect to server sock in case the server thread is + # hanging out in its accept() (e.g. if the client side of the test + # fails before it even gets to connecting); there's no other good + # way to force an accept() to exit. + put_a_sock_in_it = socket.socket() + put_a_sock_in_it.connect((self.addr, self.port)) + put_a_sock_in_it.close() + # Then close "our" end of the socket (which _should_ cause the + # accept() to bail out, but does not, for some reason. I blame + # threading.) + self.sockl.close() + + def _run( + self, allowed_keys=None, delay=0, public_blob=None, kill_event=None + ): if allowed_keys is None: allowed_keys = FINGERPRINTS.keys() self.socks, addr = self.sockl.accept() + # If the kill event was set at this point, it indicates an early + # shutdown, so bail out now and don't even try setting up a Transport + # (which will just verbosely die.) + if kill_event and kill_event.is_set(): + self.socks.close() + return self.ts = paramiko.Transport(self.socks) keypath = _support("test_rsa.key") host_key = paramiko.RSAKey.from_private_key_file(keypath) @@ -149,7 +180,7 @@ class SSHClientTest(unittest.TestCase): The exception is ``allowed_keys`` which is stripped and handed to the ``NullServer`` used for testing. """ - run_kwargs = {} + run_kwargs = {"kill_event": self.kill_event} for key in ("allowed_keys", "public_blob"): run_kwargs[key] = kwargs.pop(key, None) # Server setup @@ -194,13 +225,15 @@ class SSHClientTest(unittest.TestCase): stdout.close() stderr.close() - def test_1_client(self): + +class SSHClientTest(ClientTest): + def test_client(self): """ verify that the SSHClient stuff works too. """ self._test_connection(password="pygmalion") - def test_2_client_dsa(self): + def test_client_dsa(self): """ verify that SSHClient works with a DSA key. """ @@ -212,7 +245,7 @@ class SSHClientTest(unittest.TestCase): """ self._test_connection(key_filename=_support("test_rsa.key")) - def test_2_5_client_ecdsa(self): + def test_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ @@ -221,7 +254,7 @@ class SSHClientTest(unittest.TestCase): def test_client_ed25519(self): self._test_connection(key_filename=_support("test_ed25519.key")) - def test_3_multiple_key_files(self): + def test_multiple_key_files(self): """ verify that SSHClient accepts and tries multiple key files. """ @@ -242,7 +275,7 @@ class SSHClientTest(unittest.TestCase): try: self._test_connection( key_filename=[ - _support("test_{0}.key".format(x)) for x in attempt + _support("test_{}.key".format(x)) for x in attempt ], allowed_keys=[types_[x] for x in accept], ) @@ -271,7 +304,7 @@ class SSHClientTest(unittest.TestCase): # server-side behavior is 100% identical.) # NOTE: only bothered whipping up one cert per overall class/family. for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"): - cert_name = "test_{0}.key-cert.pub".format(type_) + cert_name = "test_{}.key-cert.pub".format(type_) cert_path = _support(os.path.join("cert_support", cert_name)) self._test_connection( key_filename=cert_path, @@ -286,12 +319,12 @@ class SSHClientTest(unittest.TestCase): # that a specific cert was found, along with regular authorization # succeeding proving that the overall flow works. for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"): - key_name = "test_{0}.key".format(type_) + key_name = "test_{}.key".format(type_) key_path = _support(os.path.join("cert_support", key_name)) self._test_connection( key_filename=key_path, public_blob=PublicBlob.from_file( - "{0}-cert.pub".format(key_path) + "{}-cert.pub".format(key_path) ), ) @@ -301,7 +334,7 @@ class SSHClientTest(unittest.TestCase): # code path (!) so we're punting too, sob. pass - def test_4_auto_add_policy(self): + def test_auto_add_policy(self): """ verify that SSHClient's AutoAddPolicy works. """ @@ -324,7 +357,7 @@ class SSHClientTest(unittest.TestCase): new_host_key = list(self.tc.get_host_keys()[hostname].values())[0] self.assertEqual(public_host_key, new_host_key) - def test_5_save_host_keys(self): + def test_save_host_keys(self): """ verify that SSHClient correctly saves a known_hosts file. """ @@ -353,7 +386,7 @@ class SSHClientTest(unittest.TestCase): os.unlink(localname) - def test_6_cleanup(self): + def test_cleanup(self): """ verify that when an SSHClient is collected, its transport (and the transport's packetizer) is closed. @@ -406,7 +439,7 @@ class SSHClientTest(unittest.TestCase): self.assertTrue(self.tc._transport is None) - def test_7_banner_timeout(self): + def test_banner_timeout(self): """ verify that the SSHClient has a configurable banner timeout. """ @@ -425,7 +458,7 @@ class SSHClientTest(unittest.TestCase): kwargs = dict(self.connect_kwargs, banner_timeout=0.5) self.assertRaises(paramiko.SSHException, self.tc.connect, **kwargs) - def test_8_auth_trickledown(self): + def test_auth_trickledown(self): """ Failed key auth doesn't prevent subsequent pw auth from succeeding """ @@ -445,7 +478,8 @@ class SSHClientTest(unittest.TestCase): ) self._test_connection(**kwargs) - def test_9_auth_timeout(self): + @slow + def test_auth_timeout(self): """ verify that the SSHClient has a configurable auth timeout """ @@ -457,25 +491,23 @@ class SSHClientTest(unittest.TestCase): auth_timeout=0.5, ) - def test_10_auth_trickledown_gsskex(self): + @requires_gss_auth + def test_auth_trickledown_gsskex(self): """ - Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding + Failed gssapi-keyex doesn't prevent subsequent key from succeeding """ - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")]) self._test_connection(**kwargs) - def test_11_auth_trickledown_gssauth(self): + @requires_gss_auth + def test_auth_trickledown_gssauth(self): """ - Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding + Failed gssapi-with-mic doesn't prevent subsequent key from succeeding """ - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")]) self._test_connection(**kwargs) - def test_12_reject_policy(self): + def test_reject_policy(self): """ verify that SSHClient's RejectPolicy works. """ @@ -491,14 +523,14 @@ class SSHClientTest(unittest.TestCase): **self.connect_kwargs ) - def test_13_reject_policy_gsskex(self): + @requires_gss_auth + def test_reject_policy_gsskex(self): """ verify that SSHClient's RejectPolicy works, even if gssapi-keyex was enabled but not used. """ - # Test for a bug present in paramiko versions released before 2017-08-01 - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest + # Test for a bug present in paramiko versions released before + # 2017-08-01 threading.Thread(target=self._run).start() self.tc = paramiko.SSHClient() @@ -558,10 +590,7 @@ class SSHClientTest(unittest.TestCase): def test_host_key_negotiation_4(self): self._client_host_key_good(paramiko.RSAKey, "test_rsa.key") - def test_update_environment(self): - """ - Verify that environment variables can be set by the client. - """ + def _setup_for_env(self): threading.Thread(target=self._run).start() self.tc = paramiko.SSHClient() @@ -575,6 +604,11 @@ class SSHClientTest(unittest.TestCase): self.assertTrue(self.event.isSet()) self.assertTrue(self.ts.is_active()) + def test_update_environment(self): + """ + Verify that environment variables can be set by the client. + """ + self._setup_for_env() target_env = {b"A": b"B", b"C": b"d"} self.tc.exec_command("yes", environment=target_env) @@ -582,22 +616,20 @@ class SSHClientTest(unittest.TestCase): self.assertEqual(target_env, getattr(schan, "env", {})) schan.close() - # Cannot use assertRaises in context manager mode as it is not supported - # in Python 2.6. - try: + @unittest.skip("Clients normally fail silently, thus so do we, for now") + def test_env_update_failures(self): + self._setup_for_env() + with self.assertRaises(SSHException) as manager: # Verify that a rejection by the server can be detected self.tc.exec_command("yes", environment={b"INVALID_ENV": b""}) - except SSHException as e: - self.assertTrue( - "INVALID_ENV" in str(e), - "Expected variable name in error message", - ) - self.assertTrue( - isinstance(e.args[1], SSHException), - "Expected original SSHException in exception", - ) - else: - self.assertFalse(False, "SSHException was not thrown.") + self.assertTrue( + "INVALID_ENV" in str(manager.exception), + "Expected variable name in error message", + ) + self.assertTrue( + isinstance(manager.exception.args[1], SSHException), + "Expected original SSHException in exception", + ) def test_missing_key_policy_accepts_classes_or_instances(self): """ @@ -614,3 +646,49 @@ class SSHClientTest(unittest.TestCase): # Hand in just the class (new behavior) client.set_missing_host_key_policy(paramiko.AutoAddPolicy) assert isinstance(client._policy, paramiko.AutoAddPolicy) + + +class PasswordPassphraseTests(ClientTest): + # TODO: most of these could reasonably be set up to use mocks/assertions + # (e.g. "gave passphrase -> expect PKey was given it as the passphrase") + # instead of suffering a real connection cycle. + # TODO: in that case, move the below to be part of an integration suite? + + def test_password_kwarg_works_for_password_auth(self): + # Straightforward / duplicate of earlier basic password test. + self._test_connection(password="pygmalion") + + # TODO: more granular exception pending #387; should be signaling "no auth + # methods available" because no key and no password + @raises(SSHException) + def test_passphrase_kwarg_not_used_for_password_auth(self): + # Using the "right" password in the "wrong" field shouldn't work. + self._test_connection(passphrase="pygmalion") + + def test_passphrase_kwarg_used_for_key_passphrase(self): + # Straightforward again, with new passphrase kwarg. + self._test_connection( + key_filename=_support("test_rsa_password.key"), + passphrase="television", + ) + + def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given( + self + ): # noqa + # Backwards compatibility: passphrase in the password field. + self._test_connection( + key_filename=_support("test_rsa_password.key"), + password="television", + ) + + @raises(AuthenticationException) # TODO: more granular + def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( # noqa + self + ): + # Sanity: if we're given both fields, the password field is NOT used as + # a passphrase. + self._test_connection( + key_filename=_support("test_rsa_password.key"), + password="television", + passphrase="wat? lol no", + ) |