diff options
Diffstat (limited to 'tests/test_client.py')
-rw-r--r-- | tests/test_client.py | 62 |
1 files changed, 33 insertions, 29 deletions
diff --git a/tests/test_client.py b/tests/test_client.py index 80b28adf..c46c383b 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -37,7 +37,6 @@ from pytest_relaxed import raises import paramiko from paramiko.pkey import PublicBlob -from paramiko.common import PY2 from paramiko.ssh_exception import SSHException, AuthenticationException from .util import _support, slow @@ -48,9 +47,9 @@ requires_gss_auth = unittest.skipUnless( ) FINGERPRINTS = { - "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", - "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", - "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", + "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", # noqa + "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", # noqa + "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", # noqa "ssh-ed25519": b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80', } @@ -212,6 +211,12 @@ class ClientTest(unittest.TestCase): stdin, stdout, stderr = self.tc.exec_command("yes") schan = self.ts.accept(1.0) + # Nobody else tests the API of exec_command so let's do it here for + # now. :weary: + assert isinstance(stdin, paramiko.ChannelStdinFile) + assert isinstance(stdout, paramiko.ChannelFile) + assert isinstance(stderr, paramiko.ChannelStderrFile) + schan.send("Hello there.\n") schan.send_stderr("This is on stderr.\n") schan.close() @@ -228,13 +233,13 @@ class ClientTest(unittest.TestCase): class SSHClientTest(ClientTest): - def test_1_client(self): + def test_client(self): """ verify that the SSHClient stuff works too. """ self._test_connection(password="pygmalion") - def test_2_client_dsa(self): + def test_client_dsa(self): """ verify that SSHClient works with a DSA key. """ @@ -246,7 +251,7 @@ class SSHClientTest(ClientTest): """ self._test_connection(key_filename=_support("test_rsa.key")) - def test_2_5_client_ecdsa(self): + def test_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ @@ -255,7 +260,7 @@ class SSHClientTest(ClientTest): def test_client_ed25519(self): self._test_connection(key_filename=_support("test_ed25519.key")) - def test_3_multiple_key_files(self): + def test_multiple_key_files(self): """ verify that SSHClient accepts and tries multiple key files. """ @@ -335,7 +340,7 @@ class SSHClientTest(ClientTest): # code path (!) so we're punting too, sob. pass - def test_4_auto_add_policy(self): + def test_auto_add_policy(self): """ verify that SSHClient's AutoAddPolicy works. """ @@ -358,7 +363,7 @@ class SSHClientTest(ClientTest): new_host_key = list(self.tc.get_host_keys()[hostname].values())[0] self.assertEqual(public_host_key, new_host_key) - def test_5_save_host_keys(self): + def test_save_host_keys(self): """ verify that SSHClient correctly saves a known_hosts file. """ @@ -372,15 +377,13 @@ class SSHClientTest(ClientTest): os.close(fd) client = paramiko.SSHClient() - self.assertEquals(0, len(client.get_host_keys())) + assert len(client.get_host_keys()) == 0 host_id = "[%s]:%d" % (self.addr, self.port) client.get_host_keys().add(host_id, "ssh-rsa", public_host_key) - self.assertEquals(1, len(client.get_host_keys())) - self.assertEquals( - public_host_key, client.get_host_keys()[host_id]["ssh-rsa"] - ) + assert len(client.get_host_keys()) == 1 + assert public_host_key == client.get_host_keys()[host_id]["ssh-rsa"] client.save_host_keys(localname) @@ -389,7 +392,7 @@ class SSHClientTest(ClientTest): os.unlink(localname) - def test_6_cleanup(self): + def test_cleanup(self): """ verify that when an SSHClient is collected, its transport (and the transport's packetizer) is closed. @@ -431,7 +434,7 @@ class SSHClientTest(ClientTest): with paramiko.SSHClient() as tc: self.tc = tc self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - self.assertEquals(0, len(self.tc.get_host_keys())) + assert len(self.tc.get_host_keys()) == 0 self.tc.connect(**dict(self.connect_kwargs, password="pygmalion")) self.event.wait(1.0) @@ -442,7 +445,7 @@ class SSHClientTest(ClientTest): self.assertTrue(self.tc._transport is None) - def test_7_banner_timeout(self): + def test_banner_timeout(self): """ verify that the SSHClient has a configurable banner timeout. """ @@ -461,7 +464,7 @@ class SSHClientTest(ClientTest): kwargs = dict(self.connect_kwargs, banner_timeout=0.5) self.assertRaises(paramiko.SSHException, self.tc.connect, **kwargs) - def test_8_auth_trickledown(self): + def test_auth_trickledown(self): """ Failed key auth doesn't prevent subsequent pw auth from succeeding """ @@ -482,7 +485,7 @@ class SSHClientTest(ClientTest): self._test_connection(**kwargs) @slow - def test_9_auth_timeout(self): + def test_auth_timeout(self): """ verify that the SSHClient has a configurable auth timeout """ @@ -495,22 +498,22 @@ class SSHClientTest(ClientTest): ) @requires_gss_auth - def test_10_auth_trickledown_gsskex(self): + def test_auth_trickledown_gsskex(self): """ - Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding + Failed gssapi-keyex doesn't prevent subsequent key from succeeding """ kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")]) self._test_connection(**kwargs) @requires_gss_auth - def test_11_auth_trickledown_gssauth(self): + def test_auth_trickledown_gssauth(self): """ - Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding + Failed gssapi-with-mic doesn't prevent subsequent key from succeeding """ kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")]) self._test_connection(**kwargs) - def test_12_reject_policy(self): + def test_reject_policy(self): """ verify that SSHClient's RejectPolicy works. """ @@ -527,12 +530,13 @@ class SSHClientTest(ClientTest): ) @requires_gss_auth - def test_13_reject_policy_gsskex(self): + def test_reject_policy_gsskex(self): """ verify that SSHClient's RejectPolicy works, even if gssapi-keyex was enabled but not used. """ - # Test for a bug present in paramiko versions released before 2017-08-01 + # Test for a bug present in paramiko versions released before + # 2017-08-01 threading.Thread(target=self._run).start() self.tc = paramiko.SSHClient() @@ -684,9 +688,9 @@ class PasswordPassphraseTests(ClientTest): ) @raises(AuthenticationException) # TODO: more granular - def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( + def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( # noqa self - ): # noqa + ): # Sanity: if we're given both fields, the password field is NOT used as # a passphrase. self._test_connection( |