diff options
Diffstat (limited to 'tests/test_client.py')
-rw-r--r-- | tests/test_client.py | 93 |
1 files changed, 53 insertions, 40 deletions
diff --git a/tests/test_client.py b/tests/test_client.py index dae5b13a..1c0c6c84 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -41,7 +41,7 @@ from paramiko import SSHClient from paramiko.pkey import PublicBlob from paramiko.ssh_exception import SSHException, AuthenticationException -from .util import _support, requires_sha1_signing, slow +from ._util import _support, requires_sha1_signing, slow requires_gss_auth = unittest.skipUnless( @@ -171,10 +171,10 @@ class ClientTest(unittest.TestCase): self.ts = paramiko.Transport(self.socks) if server_name is not None: self.ts.local_version = server_name - keypath = _support("test_rsa.key") + keypath = _support("rsa.key") host_key = paramiko.RSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) - keypath = _support("test_ecdsa_256.key") + keypath = _support("ecdsa-256.key") host_key = paramiko.ECDSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob) @@ -194,9 +194,7 @@ class ClientTest(unittest.TestCase): run_kwargs[key] = kwargs.pop(key, None) # Server setup threading.Thread(target=self._run, kwargs=run_kwargs).start() - host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") - ) + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) # Client setup @@ -256,25 +254,25 @@ class SSHClientTest(ClientTest): """ verify that SSHClient works with a DSA key. """ - self._test_connection(key_filename=_support("test_dss.key")) + self._test_connection(key_filename=_support("dss.key")) @requires_sha1_signing def test_client_rsa(self): """ verify that SSHClient works with an RSA key. """ - self._test_connection(key_filename=_support("test_rsa.key")) + self._test_connection(key_filename=_support("rsa.key")) @requires_sha1_signing def test_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ - self._test_connection(key_filename=_support("test_ecdsa_256.key")) + self._test_connection(key_filename=_support("ecdsa-256.key")) @requires_sha1_signing def test_client_ed25519(self): - self._test_connection(key_filename=_support("test_ed25519.key")) + self._test_connection(key_filename=_support("ed25519.key")) @requires_sha1_signing def test_multiple_key_files(self): @@ -289,16 +287,17 @@ class SSHClientTest(ClientTest): } # Various combos of attempted & valid keys # TODO: try every possible combo using itertools functions + # TODO: use new key(s) fixture(s) for attempt, accept in ( (["rsa", "dss"], ["dss"]), # Original test #3 (["dss", "rsa"], ["dss"]), # Ordering matters sometimes, sadly - (["dss", "rsa", "ecdsa_256"], ["dss"]), # Try ECDSA but fail - (["rsa", "ecdsa_256"], ["ecdsa"]), # ECDSA success + (["dss", "rsa", "ecdsa-256"], ["dss"]), # Try ECDSA but fail + (["rsa", "ecdsa-256"], ["ecdsa"]), # ECDSA success ): try: self._test_connection( key_filename=[ - _support("test_{}.key".format(x)) for x in attempt + _support("{}.key".format(x)) for x in attempt ], allowed_keys=[types_[x] for x in accept], ) @@ -318,7 +317,7 @@ class SSHClientTest(ClientTest): self.assertRaises( SSHException, self._test_connection, - key_filename=[_support("test_rsa.key")], + key_filename=[_support("rsa.key")], allowed_keys=["ecdsa-sha2-nistp256"], ) @@ -328,30 +327,26 @@ class SSHClientTest(ClientTest): # They're similar except for which path is given; the expected auth and # server-side behavior is 100% identical.) # NOTE: only bothered whipping up one cert per overall class/family. - for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"): - cert_name = "test_{}.key-cert.pub".format(type_) - cert_path = _support(os.path.join("cert_support", cert_name)) + for type_ in ("rsa", "dss", "ecdsa-256", "ed25519"): + key_path = _support(f"{type_}.key") self._test_connection( - key_filename=cert_path, - public_blob=PublicBlob.from_file(cert_path), + key_filename=key_path, + public_blob=PublicBlob.from_file(f"{key_path}-cert.pub"), ) @requires_sha1_signing def test_certs_implicitly_loaded_alongside_key_filename_keys(self): - # NOTE: a regular test_connection() w/ test_rsa.key would incidentally + # NOTE: a regular test_connection() w/ rsa.key would incidentally # test this (because test_xxx.key-cert.pub exists) but incidental tests # stink, so NullServer and friends were updated to allow assertions # about the server-side key object's public blob. Thus, we can prove # that a specific cert was found, along with regular authorization # succeeding proving that the overall flow works. - for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"): - key_name = "test_{}.key".format(type_) - key_path = _support(os.path.join("cert_support", key_name)) + for type_ in ("rsa", "dss", "ecdsa-256", "ed25519"): + key_path = _support(f"{type_}.key") self._test_connection( key_filename=key_path, - public_blob=PublicBlob.from_file( - "{}-cert.pub".format(key_path) - ), + public_blob=PublicBlob.from_file(f"{key_path}-cert.pub"), ) def _cert_algo_test(self, ver, alg): @@ -360,9 +355,7 @@ class SSHClientTest(ClientTest): self._test_connection( # NOTE: SSHClient is able to take either the key or the cert & will # set up its internals as needed - key_filename=_support( - os.path.join("cert_support", "test_rsa.key-cert.pub") - ), + key_filename=_support("rsa.key-cert.pub"), server_name="SSH-2.0-OpenSSH_{}".format(ver), ) assert ( @@ -391,7 +384,7 @@ class SSHClientTest(ClientTest): """ threading.Thread(target=self._run).start() hostname = f"[{self.addr}]:{self.port}" - key_file = _support("test_ecdsa_256.key") + key_file = _support("ecdsa-256.key") public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) self.tc = SSHClient() @@ -414,9 +407,7 @@ class SSHClientTest(ClientTest): """ warnings.filterwarnings("ignore", "tempnam.*") - host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") - ) + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) fd, localname = mkstemp() os.close(fd) @@ -516,9 +507,7 @@ class SSHClientTest(ClientTest): """ # Start the thread with a 1 second wait. threading.Thread(target=self._run, kwargs={"delay": 1}).start() - host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") - ) + host_key = paramiko.RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = SSHClient() @@ -564,12 +553,36 @@ class SSHClientTest(ClientTest): auth_timeout=0.5, ) + @patch.object( + paramiko.Channel, + "_set_remote_channel", + lambda *args, **kwargs: time.sleep(100), + ) + def test_channel_timeout(self): + """ + verify that the SSHClient has a configurable channel timeout + """ + threading.Thread(target=self._run).start() + # Client setup + self.tc = SSHClient() + self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + # Actual connection + self.tc.connect( + **dict( + self.connect_kwargs, password="pygmalion", channel_timeout=0.5 + ) + ) + self.event.wait(1.0) + + self.assertRaises(paramiko.SSHException, self.tc.open_sftp) + @requires_gss_auth def test_auth_trickledown_gsskex(self): """ Failed gssapi-keyex doesn't prevent subsequent key from succeeding """ - kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")]) + kwargs = dict(gss_kex=True, key_filename=[_support("rsa.key")]) self._test_connection(**kwargs) @requires_gss_auth @@ -577,7 +590,7 @@ class SSHClientTest(ClientTest): """ Failed gssapi-with-mic doesn't prevent subsequent key from succeeding """ - kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")]) + kwargs = dict(gss_auth=True, key_filename=[_support("rsa.key")]) self._test_connection(**kwargs) def test_reject_policy(self): @@ -659,11 +672,11 @@ class SSHClientTest(ClientTest): self._client_host_key_bad(host_key) def test_host_key_negotiation_3(self): - self._client_host_key_good(paramiko.ECDSAKey, "test_ecdsa_256.key") + self._client_host_key_good(paramiko.ECDSAKey, "ecdsa-256.key") @requires_sha1_signing def test_host_key_negotiation_4(self): - self._client_host_key_good(paramiko.RSAKey, "test_rsa.key") + self._client_host_key_good(paramiko.RSAKey, "rsa.key") def _setup_for_env(self): threading.Thread(target=self._run).start() |