summaryrefslogtreecommitdiffhomepage
path: root/tests/test_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_client.py')
-rw-r--r--tests/test_client.py93
1 files changed, 53 insertions, 40 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
index dae5b13a..1c0c6c84 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -41,7 +41,7 @@ from paramiko import SSHClient
from paramiko.pkey import PublicBlob
from paramiko.ssh_exception import SSHException, AuthenticationException
-from .util import _support, requires_sha1_signing, slow
+from ._util import _support, requires_sha1_signing, slow
requires_gss_auth = unittest.skipUnless(
@@ -171,10 +171,10 @@ class ClientTest(unittest.TestCase):
self.ts = paramiko.Transport(self.socks)
if server_name is not None:
self.ts.local_version = server_name
- keypath = _support("test_rsa.key")
+ keypath = _support("rsa.key")
host_key = paramiko.RSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
- keypath = _support("test_ecdsa_256.key")
+ keypath = _support("ecdsa-256.key")
host_key = paramiko.ECDSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob)
@@ -194,9 +194,7 @@ class ClientTest(unittest.TestCase):
run_kwargs[key] = kwargs.pop(key, None)
# Server setup
threading.Thread(target=self._run, kwargs=run_kwargs).start()
- host_key = paramiko.RSAKey.from_private_key_file(
- _support("test_rsa.key")
- )
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
# Client setup
@@ -256,25 +254,25 @@ class SSHClientTest(ClientTest):
"""
verify that SSHClient works with a DSA key.
"""
- self._test_connection(key_filename=_support("test_dss.key"))
+ self._test_connection(key_filename=_support("dss.key"))
@requires_sha1_signing
def test_client_rsa(self):
"""
verify that SSHClient works with an RSA key.
"""
- self._test_connection(key_filename=_support("test_rsa.key"))
+ self._test_connection(key_filename=_support("rsa.key"))
@requires_sha1_signing
def test_client_ecdsa(self):
"""
verify that SSHClient works with an ECDSA key.
"""
- self._test_connection(key_filename=_support("test_ecdsa_256.key"))
+ self._test_connection(key_filename=_support("ecdsa-256.key"))
@requires_sha1_signing
def test_client_ed25519(self):
- self._test_connection(key_filename=_support("test_ed25519.key"))
+ self._test_connection(key_filename=_support("ed25519.key"))
@requires_sha1_signing
def test_multiple_key_files(self):
@@ -289,16 +287,17 @@ class SSHClientTest(ClientTest):
}
# Various combos of attempted & valid keys
# TODO: try every possible combo using itertools functions
+ # TODO: use new key(s) fixture(s)
for attempt, accept in (
(["rsa", "dss"], ["dss"]), # Original test #3
(["dss", "rsa"], ["dss"]), # Ordering matters sometimes, sadly
- (["dss", "rsa", "ecdsa_256"], ["dss"]), # Try ECDSA but fail
- (["rsa", "ecdsa_256"], ["ecdsa"]), # ECDSA success
+ (["dss", "rsa", "ecdsa-256"], ["dss"]), # Try ECDSA but fail
+ (["rsa", "ecdsa-256"], ["ecdsa"]), # ECDSA success
):
try:
self._test_connection(
key_filename=[
- _support("test_{}.key".format(x)) for x in attempt
+ _support("{}.key".format(x)) for x in attempt
],
allowed_keys=[types_[x] for x in accept],
)
@@ -318,7 +317,7 @@ class SSHClientTest(ClientTest):
self.assertRaises(
SSHException,
self._test_connection,
- key_filename=[_support("test_rsa.key")],
+ key_filename=[_support("rsa.key")],
allowed_keys=["ecdsa-sha2-nistp256"],
)
@@ -328,30 +327,26 @@ class SSHClientTest(ClientTest):
# They're similar except for which path is given; the expected auth and
# server-side behavior is 100% identical.)
# NOTE: only bothered whipping up one cert per overall class/family.
- for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"):
- cert_name = "test_{}.key-cert.pub".format(type_)
- cert_path = _support(os.path.join("cert_support", cert_name))
+ for type_ in ("rsa", "dss", "ecdsa-256", "ed25519"):
+ key_path = _support(f"{type_}.key")
self._test_connection(
- key_filename=cert_path,
- public_blob=PublicBlob.from_file(cert_path),
+ key_filename=key_path,
+ public_blob=PublicBlob.from_file(f"{key_path}-cert.pub"),
)
@requires_sha1_signing
def test_certs_implicitly_loaded_alongside_key_filename_keys(self):
- # NOTE: a regular test_connection() w/ test_rsa.key would incidentally
+ # NOTE: a regular test_connection() w/ rsa.key would incidentally
# test this (because test_xxx.key-cert.pub exists) but incidental tests
# stink, so NullServer and friends were updated to allow assertions
# about the server-side key object's public blob. Thus, we can prove
# that a specific cert was found, along with regular authorization
# succeeding proving that the overall flow works.
- for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"):
- key_name = "test_{}.key".format(type_)
- key_path = _support(os.path.join("cert_support", key_name))
+ for type_ in ("rsa", "dss", "ecdsa-256", "ed25519"):
+ key_path = _support(f"{type_}.key")
self._test_connection(
key_filename=key_path,
- public_blob=PublicBlob.from_file(
- "{}-cert.pub".format(key_path)
- ),
+ public_blob=PublicBlob.from_file(f"{key_path}-cert.pub"),
)
def _cert_algo_test(self, ver, alg):
@@ -360,9 +355,7 @@ class SSHClientTest(ClientTest):
self._test_connection(
# NOTE: SSHClient is able to take either the key or the cert & will
# set up its internals as needed
- key_filename=_support(
- os.path.join("cert_support", "test_rsa.key-cert.pub")
- ),
+ key_filename=_support("rsa.key-cert.pub"),
server_name="SSH-2.0-OpenSSH_{}".format(ver),
)
assert (
@@ -391,7 +384,7 @@ class SSHClientTest(ClientTest):
"""
threading.Thread(target=self._run).start()
hostname = f"[{self.addr}]:{self.port}"
- key_file = _support("test_ecdsa_256.key")
+ key_file = _support("ecdsa-256.key")
public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file)
self.tc = SSHClient()
@@ -414,9 +407,7 @@ class SSHClientTest(ClientTest):
"""
warnings.filterwarnings("ignore", "tempnam.*")
- host_key = paramiko.RSAKey.from_private_key_file(
- _support("test_rsa.key")
- )
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
fd, localname = mkstemp()
os.close(fd)
@@ -516,9 +507,7 @@ class SSHClientTest(ClientTest):
"""
# Start the thread with a 1 second wait.
threading.Thread(target=self._run, kwargs={"delay": 1}).start()
- host_key = paramiko.RSAKey.from_private_key_file(
- _support("test_rsa.key")
- )
+ host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key"))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = SSHClient()
@@ -564,12 +553,36 @@ class SSHClientTest(ClientTest):
auth_timeout=0.5,
)
+ @patch.object(
+ paramiko.Channel,
+ "_set_remote_channel",
+ lambda *args, **kwargs: time.sleep(100),
+ )
+ def test_channel_timeout(self):
+ """
+ verify that the SSHClient has a configurable channel timeout
+ """
+ threading.Thread(target=self._run).start()
+ # Client setup
+ self.tc = SSHClient()
+ self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+
+ # Actual connection
+ self.tc.connect(
+ **dict(
+ self.connect_kwargs, password="pygmalion", channel_timeout=0.5
+ )
+ )
+ self.event.wait(1.0)
+
+ self.assertRaises(paramiko.SSHException, self.tc.open_sftp)
+
@requires_gss_auth
def test_auth_trickledown_gsskex(self):
"""
Failed gssapi-keyex doesn't prevent subsequent key from succeeding
"""
- kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")])
+ kwargs = dict(gss_kex=True, key_filename=[_support("rsa.key")])
self._test_connection(**kwargs)
@requires_gss_auth
@@ -577,7 +590,7 @@ class SSHClientTest(ClientTest):
"""
Failed gssapi-with-mic doesn't prevent subsequent key from succeeding
"""
- kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")])
+ kwargs = dict(gss_auth=True, key_filename=[_support("rsa.key")])
self._test_connection(**kwargs)
def test_reject_policy(self):
@@ -659,11 +672,11 @@ class SSHClientTest(ClientTest):
self._client_host_key_bad(host_key)
def test_host_key_negotiation_3(self):
- self._client_host_key_good(paramiko.ECDSAKey, "test_ecdsa_256.key")
+ self._client_host_key_good(paramiko.ECDSAKey, "ecdsa-256.key")
@requires_sha1_signing
def test_host_key_negotiation_4(self):
- self._client_host_key_good(paramiko.RSAKey, "test_rsa.key")
+ self._client_host_key_good(paramiko.RSAKey, "rsa.key")
def _setup_for_env(self):
threading.Thread(target=self._run).start()