summaryrefslogtreecommitdiffhomepage
path: root/tests/test_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_client.py')
-rw-r--r--tests/test_client.py44
1 files changed, 31 insertions, 13 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
index 26de2d37..ad5c36ad 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -34,8 +34,10 @@ import weakref
from tempfile import mkstemp
from pytest_relaxed import raises
+from mock import patch, Mock
import paramiko
+from paramiko import SSHClient
from paramiko.pkey import PublicBlob
from paramiko.ssh_exception import SSHException, AuthenticationException
@@ -191,7 +193,7 @@ class ClientTest(unittest.TestCase):
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
# Client setup
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.get_host_keys().add(
"[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key
)
@@ -213,7 +215,7 @@ class ClientTest(unittest.TestCase):
# Nobody else tests the API of exec_command so let's do it here for
# now. :weary:
- assert isinstance(stdin, paramiko.ChannelFile)
+ assert isinstance(stdin, paramiko.ChannelStdinFile)
assert isinstance(stdout, paramiko.ChannelFile)
assert isinstance(stderr, paramiko.ChannelStderrFile)
@@ -349,7 +351,7 @@ class SSHClientTest(ClientTest):
key_file = _support("test_ecdsa_256.key")
public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file)
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(password="pygmalion", **self.connect_kwargs)
@@ -376,7 +378,7 @@ class SSHClientTest(ClientTest):
fd, localname = mkstemp()
os.close(fd)
- client = paramiko.SSHClient()
+ client = SSHClient()
assert len(client.get_host_keys()) == 0
host_id = "[%s]:%d" % (self.addr, self.port)
@@ -403,7 +405,7 @@ class SSHClientTest(ClientTest):
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(**dict(self.connect_kwargs, password="pygmalion"))
@@ -431,7 +433,7 @@ class SSHClientTest(ClientTest):
"""
threading.Thread(target=self._run).start()
- with paramiko.SSHClient() as tc:
+ with SSHClient() as tc:
self.tc = tc
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
assert len(self.tc.get_host_keys()) == 0
@@ -456,7 +458,7 @@ class SSHClientTest(ClientTest):
)
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.get_host_keys().add(
"[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key
)
@@ -519,7 +521,7 @@ class SSHClientTest(ClientTest):
"""
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.assertRaises(
@@ -539,7 +541,7 @@ class SSHClientTest(ClientTest):
# 2017-08-01
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.assertRaises(
@@ -554,7 +556,7 @@ class SSHClientTest(ClientTest):
threading.Thread(target=self._run).start()
hostname = "[%s]:%d" % (self.addr, self.port)
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.WarningPolicy())
known_hosts = self.tc.get_host_keys()
known_hosts.add(hostname, host_key.get_name(), host_key)
@@ -570,7 +572,7 @@ class SSHClientTest(ClientTest):
threading.Thread(target=self._run).start()
hostname = "[%s]:%d" % (self.addr, self.port)
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
host_key = ktype.from_private_key_file(_support(kfile))
known_hosts = self.tc.get_host_keys()
@@ -599,7 +601,7 @@ class SSHClientTest(ClientTest):
def _setup_for_env(self):
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(
@@ -643,7 +645,7 @@ class SSHClientTest(ClientTest):
"""
# AN ACTUAL UNIT TEST?! GOOD LORD
# (But then we have to test a private API...meh.)
- client = paramiko.SSHClient()
+ client = SSHClient()
# Default
assert isinstance(client._policy, paramiko.RejectPolicy)
# Hand in an instance (classic behavior)
@@ -653,6 +655,22 @@ class SSHClientTest(ClientTest):
client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
assert isinstance(client._policy, paramiko.AutoAddPolicy)
+ @patch("paramiko.client.Transport")
+ def test_disabled_algorithms_defaults_to_None(self, Transport):
+ SSHClient().connect("host", sock=Mock(), password="no")
+ assert Transport.call_args[1]["disabled_algorithms"] is None
+
+ @patch("paramiko.client.Transport")
+ def test_disabled_algorithms_passed_directly_if_given(self, Transport):
+ SSHClient().connect(
+ "host",
+ sock=Mock(),
+ password="no",
+ disabled_algorithms={"keys": ["ssh-dss"]},
+ )
+ call_arg = Transport.call_args[1]["disabled_algorithms"]
+ assert call_arg == {"keys": ["ssh-dss"]}
+
class PasswordPassphraseTests(ClientTest):
# TODO: most of these could reasonably be set up to use mocks/assertions