summaryrefslogtreecommitdiffhomepage
path: root/tests/test_client.py
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2019-12-03 14:35:04 -0500
committerJeff Forcier <jeff@bitprophet.org>2019-12-03 14:35:04 -0500
commit9e6fc80397582838de8124344efcde6b17472b73 (patch)
tree50fd8121c695ad5f322ebb377b9815711098e531 /tests/test_client.py
parent25de1a7a02a2189614787718739efbde86d5bb8e (diff)
parent84fa355a253d30d6c39adaea8bb095ced0c3b751 (diff)
Merge branch 'master' into 1343-int
Diffstat (limited to 'tests/test_client.py')
-rw-r--r--tests/test_client.py114
1 files changed, 68 insertions, 46 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
index 80b28adf..60ad310c 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -34,10 +34,11 @@ import weakref
from tempfile import mkstemp
from pytest_relaxed import raises
+from mock import patch, Mock
import paramiko
+from paramiko import SSHClient
from paramiko.pkey import PublicBlob
-from paramiko.common import PY2
from paramiko.ssh_exception import SSHException, AuthenticationException
from .util import _support, slow
@@ -48,9 +49,9 @@ requires_gss_auth = unittest.skipUnless(
)
FINGERPRINTS = {
- "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c",
- "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5",
- "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60",
+ "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", # noqa
+ "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", # noqa
+ "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", # noqa
"ssh-ed25519": b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80',
}
@@ -192,7 +193,7 @@ class ClientTest(unittest.TestCase):
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
# Client setup
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.get_host_keys().add(
"[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key
)
@@ -212,6 +213,12 @@ class ClientTest(unittest.TestCase):
stdin, stdout, stderr = self.tc.exec_command("yes")
schan = self.ts.accept(1.0)
+ # Nobody else tests the API of exec_command so let's do it here for
+ # now. :weary:
+ assert isinstance(stdin, paramiko.ChannelStdinFile)
+ assert isinstance(stdout, paramiko.ChannelFile)
+ assert isinstance(stderr, paramiko.ChannelStderrFile)
+
schan.send("Hello there.\n")
schan.send_stderr("This is on stderr.\n")
schan.close()
@@ -228,13 +235,13 @@ class ClientTest(unittest.TestCase):
class SSHClientTest(ClientTest):
- def test_1_client(self):
+ def test_client(self):
"""
verify that the SSHClient stuff works too.
"""
self._test_connection(password="pygmalion")
- def test_2_client_dsa(self):
+ def test_client_dsa(self):
"""
verify that SSHClient works with a DSA key.
"""
@@ -246,7 +253,7 @@ class SSHClientTest(ClientTest):
"""
self._test_connection(key_filename=_support("test_rsa.key"))
- def test_2_5_client_ecdsa(self):
+ def test_client_ecdsa(self):
"""
verify that SSHClient works with an ECDSA key.
"""
@@ -255,7 +262,7 @@ class SSHClientTest(ClientTest):
def test_client_ed25519(self):
self._test_connection(key_filename=_support("test_ed25519.key"))
- def test_3_multiple_key_files(self):
+ def test_multiple_key_files(self):
"""
verify that SSHClient accepts and tries multiple key files.
"""
@@ -335,7 +342,7 @@ class SSHClientTest(ClientTest):
# code path (!) so we're punting too, sob.
pass
- def test_4_auto_add_policy(self):
+ def test_auto_add_policy(self):
"""
verify that SSHClient's AutoAddPolicy works.
"""
@@ -344,7 +351,7 @@ class SSHClientTest(ClientTest):
key_file = _support("test_ecdsa_256.key")
public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file)
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(password="pygmalion", **self.connect_kwargs)
@@ -358,7 +365,7 @@ class SSHClientTest(ClientTest):
new_host_key = list(self.tc.get_host_keys()[hostname].values())[0]
self.assertEqual(public_host_key, new_host_key)
- def test_5_save_host_keys(self):
+ def test_save_host_keys(self):
"""
verify that SSHClient correctly saves a known_hosts file.
"""
@@ -371,16 +378,14 @@ class SSHClientTest(ClientTest):
fd, localname = mkstemp()
os.close(fd)
- client = paramiko.SSHClient()
- self.assertEquals(0, len(client.get_host_keys()))
+ client = SSHClient()
+ assert len(client.get_host_keys()) == 0
host_id = "[%s]:%d" % (self.addr, self.port)
client.get_host_keys().add(host_id, "ssh-rsa", public_host_key)
- self.assertEquals(1, len(client.get_host_keys()))
- self.assertEquals(
- public_host_key, client.get_host_keys()[host_id]["ssh-rsa"]
- )
+ assert len(client.get_host_keys()) == 1
+ assert public_host_key == client.get_host_keys()[host_id]["ssh-rsa"]
client.save_host_keys(localname)
@@ -389,7 +394,7 @@ class SSHClientTest(ClientTest):
os.unlink(localname)
- def test_6_cleanup(self):
+ def test_cleanup(self):
"""
verify that when an SSHClient is collected, its transport (and the
transport's packetizer) is closed.
@@ -400,17 +405,17 @@ class SSHClientTest(ClientTest):
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- self.assertEqual(0, len(self.tc.get_host_keys()))
+ assert len(self.tc.get_host_keys()) == 0
self.tc.connect(**dict(self.connect_kwargs, password="pygmalion"))
self.event.wait(1.0)
- self.assertTrue(self.event.is_set())
- self.assertTrue(self.ts.is_active())
+ assert self.event.is_set()
+ assert self.ts.is_active()
p = weakref.ref(self.tc._transport.packetizer)
- self.assertTrue(p() is not None)
+ assert p() is not None
self.tc.close()
del self.tc
@@ -420,7 +425,7 @@ class SSHClientTest(ClientTest):
gc.collect()
gc.collect()
- self.assertTrue(p() is None)
+ assert p() is None
def test_client_can_be_used_as_context_manager(self):
"""
@@ -428,10 +433,10 @@ class SSHClientTest(ClientTest):
"""
threading.Thread(target=self._run).start()
- with paramiko.SSHClient() as tc:
+ with SSHClient() as tc:
self.tc = tc
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- self.assertEquals(0, len(self.tc.get_host_keys()))
+ assert len(self.tc.get_host_keys()) == 0
self.tc.connect(**dict(self.connect_kwargs, password="pygmalion"))
self.event.wait(1.0)
@@ -442,7 +447,7 @@ class SSHClientTest(ClientTest):
self.assertTrue(self.tc._transport is None)
- def test_7_banner_timeout(self):
+ def test_banner_timeout(self):
"""
verify that the SSHClient has a configurable banner timeout.
"""
@@ -453,7 +458,7 @@ class SSHClientTest(ClientTest):
)
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.get_host_keys().add(
"[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key
)
@@ -461,7 +466,7 @@ class SSHClientTest(ClientTest):
kwargs = dict(self.connect_kwargs, banner_timeout=0.5)
self.assertRaises(paramiko.SSHException, self.tc.connect, **kwargs)
- def test_8_auth_trickledown(self):
+ def test_auth_trickledown(self):
"""
Failed key auth doesn't prevent subsequent pw auth from succeeding
"""
@@ -482,7 +487,7 @@ class SSHClientTest(ClientTest):
self._test_connection(**kwargs)
@slow
- def test_9_auth_timeout(self):
+ def test_auth_timeout(self):
"""
verify that the SSHClient has a configurable auth timeout
"""
@@ -495,28 +500,28 @@ class SSHClientTest(ClientTest):
)
@requires_gss_auth
- def test_10_auth_trickledown_gsskex(self):
+ def test_auth_trickledown_gsskex(self):
"""
- Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding
+ Failed gssapi-keyex doesn't prevent subsequent key from succeeding
"""
kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")])
self._test_connection(**kwargs)
@requires_gss_auth
- def test_11_auth_trickledown_gssauth(self):
+ def test_auth_trickledown_gssauth(self):
"""
- Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
+ Failed gssapi-with-mic doesn't prevent subsequent key from succeeding
"""
kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")])
self._test_connection(**kwargs)
- def test_12_reject_policy(self):
+ def test_reject_policy(self):
"""
verify that SSHClient's RejectPolicy works.
"""
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.assertRaises(
@@ -527,15 +532,16 @@ class SSHClientTest(ClientTest):
)
@requires_gss_auth
- def test_13_reject_policy_gsskex(self):
+ def test_reject_policy_gsskex(self):
"""
verify that SSHClient's RejectPolicy works,
even if gssapi-keyex was enabled but not used.
"""
- # Test for a bug present in paramiko versions released before 2017-08-01
+ # Test for a bug present in paramiko versions released before
+ # 2017-08-01
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.assertRaises(
@@ -550,7 +556,7 @@ class SSHClientTest(ClientTest):
threading.Thread(target=self._run).start()
hostname = "[%s]:%d" % (self.addr, self.port)
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.WarningPolicy())
known_hosts = self.tc.get_host_keys()
known_hosts.add(hostname, host_key.get_name(), host_key)
@@ -566,7 +572,7 @@ class SSHClientTest(ClientTest):
threading.Thread(target=self._run).start()
hostname = "[%s]:%d" % (self.addr, self.port)
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
host_key = ktype.from_private_key_file(_support(kfile))
known_hosts = self.tc.get_host_keys()
@@ -595,7 +601,7 @@ class SSHClientTest(ClientTest):
def _setup_for_env(self):
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(
@@ -639,7 +645,7 @@ class SSHClientTest(ClientTest):
"""
# AN ACTUAL UNIT TEST?! GOOD LORD
# (But then we have to test a private API...meh.)
- client = paramiko.SSHClient()
+ client = SSHClient()
# Default
assert isinstance(client._policy, paramiko.RejectPolicy)
# Hand in an instance (classic behavior)
@@ -649,6 +655,22 @@ class SSHClientTest(ClientTest):
client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
assert isinstance(client._policy, paramiko.AutoAddPolicy)
+ @patch("paramiko.client.Transport")
+ def test_disabled_algorithms_defaults_to_None(self, Transport):
+ SSHClient().connect("host", sock=Mock(), password="no")
+ assert Transport.call_args[1]["disabled_algorithms"] is None
+
+ @patch("paramiko.client.Transport")
+ def test_disabled_algorithms_passed_directly_if_given(self, Transport):
+ SSHClient().connect(
+ "host",
+ sock=Mock(),
+ password="no",
+ disabled_algorithms={"keys": ["ssh-dss"]},
+ )
+ call_arg = Transport.call_args[1]["disabled_algorithms"]
+ assert call_arg == {"keys": ["ssh-dss"]}
+
class PasswordPassphraseTests(ClientTest):
# TODO: most of these could reasonably be set up to use mocks/assertions
@@ -684,9 +706,9 @@ class PasswordPassphraseTests(ClientTest):
)
@raises(AuthenticationException) # TODO: more granular
- def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given(
+ def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( # noqa
self
- ): # noqa
+ ):
# Sanity: if we're given both fields, the password field is NOT used as
# a passphrase.
self._test_connection(