summaryrefslogtreecommitdiffhomepage
path: root/tests/test_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_client.py')
-rw-r--r--tests/test_client.py192
1 files changed, 135 insertions, 57 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
index bfbd395f..9191fc01 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -20,7 +20,7 @@
Some unit tests for SSHClient.
"""
-from __future__ import with_statement
+from __future__ import with_statement, print_function
import gc
import os
@@ -33,18 +33,23 @@ import warnings
import weakref
from tempfile import mkstemp
+from pytest_relaxed import raises
+
import paramiko
from paramiko.pkey import PublicBlob
-from paramiko.common import PY2
from paramiko.ssh_exception import SSHException, AuthenticationException
from .util import _support, slow
+requires_gss_auth = unittest.skipUnless(
+ paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available"
+)
+
FINGERPRINTS = {
- "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c",
- "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5",
- "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60",
+ "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", # noqa
+ "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", # noqa
+ "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", # noqa
"ssh-ed25519": b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80',
}
@@ -107,7 +112,7 @@ class NullServer(paramiko.ServerInterface):
return True
-class SSHClientTest(unittest.TestCase):
+class ClientTest(unittest.TestCase):
def setUp(self):
self.sockl = socket.socket()
self.sockl.bind(("localhost", 0))
@@ -120,16 +125,42 @@ class SSHClientTest(unittest.TestCase):
look_for_keys=False,
)
self.event = threading.Event()
+ self.kill_event = threading.Event()
def tearDown(self):
- for attr in "tc ts socks sockl".split():
- if hasattr(self, attr):
- getattr(self, attr).close()
-
- def _run(self, allowed_keys=None, delay=0, public_blob=None):
+ # Shut down client Transport
+ if hasattr(self, "tc"):
+ self.tc.close()
+ # Shut down shared socket
+ if hasattr(self, "sockl"):
+ # Signal to server thread that it should shut down early; it checks
+ # this immediately after accept(). (In scenarios where connection
+ # actually succeeded during the test, this becomes a no-op.)
+ self.kill_event.set()
+ # Forcibly connect to server sock in case the server thread is
+ # hanging out in its accept() (e.g. if the client side of the test
+ # fails before it even gets to connecting); there's no other good
+ # way to force an accept() to exit.
+ put_a_sock_in_it = socket.socket()
+ put_a_sock_in_it.connect((self.addr, self.port))
+ put_a_sock_in_it.close()
+ # Then close "our" end of the socket (which _should_ cause the
+ # accept() to bail out, but does not, for some reason. I blame
+ # threading.)
+ self.sockl.close()
+
+ def _run(
+ self, allowed_keys=None, delay=0, public_blob=None, kill_event=None
+ ):
if allowed_keys is None:
allowed_keys = FINGERPRINTS.keys()
self.socks, addr = self.sockl.accept()
+ # If the kill event was set at this point, it indicates an early
+ # shutdown, so bail out now and don't even try setting up a Transport
+ # (which will just verbosely die.)
+ if kill_event and kill_event.is_set():
+ self.socks.close()
+ return
self.ts = paramiko.Transport(self.socks)
keypath = _support("test_rsa.key")
host_key = paramiko.RSAKey.from_private_key_file(keypath)
@@ -149,7 +180,7 @@ class SSHClientTest(unittest.TestCase):
The exception is ``allowed_keys`` which is stripped and handed to the
``NullServer`` used for testing.
"""
- run_kwargs = {}
+ run_kwargs = {"kill_event": self.kill_event}
for key in ("allowed_keys", "public_blob"):
run_kwargs[key] = kwargs.pop(key, None)
# Server setup
@@ -194,13 +225,15 @@ class SSHClientTest(unittest.TestCase):
stdout.close()
stderr.close()
- def test_1_client(self):
+
+class SSHClientTest(ClientTest):
+ def test_client(self):
"""
verify that the SSHClient stuff works too.
"""
self._test_connection(password="pygmalion")
- def test_2_client_dsa(self):
+ def test_client_dsa(self):
"""
verify that SSHClient works with a DSA key.
"""
@@ -212,7 +245,7 @@ class SSHClientTest(unittest.TestCase):
"""
self._test_connection(key_filename=_support("test_rsa.key"))
- def test_2_5_client_ecdsa(self):
+ def test_client_ecdsa(self):
"""
verify that SSHClient works with an ECDSA key.
"""
@@ -221,7 +254,7 @@ class SSHClientTest(unittest.TestCase):
def test_client_ed25519(self):
self._test_connection(key_filename=_support("test_ed25519.key"))
- def test_3_multiple_key_files(self):
+ def test_multiple_key_files(self):
"""
verify that SSHClient accepts and tries multiple key files.
"""
@@ -242,7 +275,7 @@ class SSHClientTest(unittest.TestCase):
try:
self._test_connection(
key_filename=[
- _support("test_{0}.key".format(x)) for x in attempt
+ _support("test_{}.key".format(x)) for x in attempt
],
allowed_keys=[types_[x] for x in accept],
)
@@ -271,7 +304,7 @@ class SSHClientTest(unittest.TestCase):
# server-side behavior is 100% identical.)
# NOTE: only bothered whipping up one cert per overall class/family.
for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"):
- cert_name = "test_{0}.key-cert.pub".format(type_)
+ cert_name = "test_{}.key-cert.pub".format(type_)
cert_path = _support(os.path.join("cert_support", cert_name))
self._test_connection(
key_filename=cert_path,
@@ -286,12 +319,12 @@ class SSHClientTest(unittest.TestCase):
# that a specific cert was found, along with regular authorization
# succeeding proving that the overall flow works.
for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"):
- key_name = "test_{0}.key".format(type_)
+ key_name = "test_{}.key".format(type_)
key_path = _support(os.path.join("cert_support", key_name))
self._test_connection(
key_filename=key_path,
public_blob=PublicBlob.from_file(
- "{0}-cert.pub".format(key_path)
+ "{}-cert.pub".format(key_path)
),
)
@@ -301,7 +334,7 @@ class SSHClientTest(unittest.TestCase):
# code path (!) so we're punting too, sob.
pass
- def test_4_auto_add_policy(self):
+ def test_auto_add_policy(self):
"""
verify that SSHClient's AutoAddPolicy works.
"""
@@ -324,7 +357,7 @@ class SSHClientTest(unittest.TestCase):
new_host_key = list(self.tc.get_host_keys()[hostname].values())[0]
self.assertEqual(public_host_key, new_host_key)
- def test_5_save_host_keys(self):
+ def test_save_host_keys(self):
"""
verify that SSHClient correctly saves a known_hosts file.
"""
@@ -353,7 +386,7 @@ class SSHClientTest(unittest.TestCase):
os.unlink(localname)
- def test_6_cleanup(self):
+ def test_cleanup(self):
"""
verify that when an SSHClient is collected, its transport (and the
transport's packetizer) is closed.
@@ -406,7 +439,7 @@ class SSHClientTest(unittest.TestCase):
self.assertTrue(self.tc._transport is None)
- def test_7_banner_timeout(self):
+ def test_banner_timeout(self):
"""
verify that the SSHClient has a configurable banner timeout.
"""
@@ -425,7 +458,7 @@ class SSHClientTest(unittest.TestCase):
kwargs = dict(self.connect_kwargs, banner_timeout=0.5)
self.assertRaises(paramiko.SSHException, self.tc.connect, **kwargs)
- def test_8_auth_trickledown(self):
+ def test_auth_trickledown(self):
"""
Failed key auth doesn't prevent subsequent pw auth from succeeding
"""
@@ -445,7 +478,8 @@ class SSHClientTest(unittest.TestCase):
)
self._test_connection(**kwargs)
- def test_9_auth_timeout(self):
+ @slow
+ def test_auth_timeout(self):
"""
verify that the SSHClient has a configurable auth timeout
"""
@@ -457,25 +491,23 @@ class SSHClientTest(unittest.TestCase):
auth_timeout=0.5,
)
- def test_10_auth_trickledown_gsskex(self):
+ @requires_gss_auth
+ def test_auth_trickledown_gsskex(self):
"""
- Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding
+ Failed gssapi-keyex doesn't prevent subsequent key from succeeding
"""
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")])
self._test_connection(**kwargs)
- def test_11_auth_trickledown_gssauth(self):
+ @requires_gss_auth
+ def test_auth_trickledown_gssauth(self):
"""
- Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
+ Failed gssapi-with-mic doesn't prevent subsequent key from succeeding
"""
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")])
self._test_connection(**kwargs)
- def test_12_reject_policy(self):
+ def test_reject_policy(self):
"""
verify that SSHClient's RejectPolicy works.
"""
@@ -491,14 +523,14 @@ class SSHClientTest(unittest.TestCase):
**self.connect_kwargs
)
- def test_13_reject_policy_gsskex(self):
+ @requires_gss_auth
+ def test_reject_policy_gsskex(self):
"""
verify that SSHClient's RejectPolicy works,
even if gssapi-keyex was enabled but not used.
"""
- # Test for a bug present in paramiko versions released before 2017-08-01
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
+ # Test for a bug present in paramiko versions released before
+ # 2017-08-01
threading.Thread(target=self._run).start()
self.tc = paramiko.SSHClient()
@@ -558,10 +590,7 @@ class SSHClientTest(unittest.TestCase):
def test_host_key_negotiation_4(self):
self._client_host_key_good(paramiko.RSAKey, "test_rsa.key")
- def test_update_environment(self):
- """
- Verify that environment variables can be set by the client.
- """
+ def _setup_for_env(self):
threading.Thread(target=self._run).start()
self.tc = paramiko.SSHClient()
@@ -575,6 +604,11 @@ class SSHClientTest(unittest.TestCase):
self.assertTrue(self.event.isSet())
self.assertTrue(self.ts.is_active())
+ def test_update_environment(self):
+ """
+ Verify that environment variables can be set by the client.
+ """
+ self._setup_for_env()
target_env = {b"A": b"B", b"C": b"d"}
self.tc.exec_command("yes", environment=target_env)
@@ -582,22 +616,20 @@ class SSHClientTest(unittest.TestCase):
self.assertEqual(target_env, getattr(schan, "env", {}))
schan.close()
- # Cannot use assertRaises in context manager mode as it is not supported
- # in Python 2.6.
- try:
+ @unittest.skip("Clients normally fail silently, thus so do we, for now")
+ def test_env_update_failures(self):
+ self._setup_for_env()
+ with self.assertRaises(SSHException) as manager:
# Verify that a rejection by the server can be detected
self.tc.exec_command("yes", environment={b"INVALID_ENV": b""})
- except SSHException as e:
- self.assertTrue(
- "INVALID_ENV" in str(e),
- "Expected variable name in error message",
- )
- self.assertTrue(
- isinstance(e.args[1], SSHException),
- "Expected original SSHException in exception",
- )
- else:
- self.assertFalse(False, "SSHException was not thrown.")
+ self.assertTrue(
+ "INVALID_ENV" in str(manager.exception),
+ "Expected variable name in error message",
+ )
+ self.assertTrue(
+ isinstance(manager.exception.args[1], SSHException),
+ "Expected original SSHException in exception",
+ )
def test_missing_key_policy_accepts_classes_or_instances(self):
"""
@@ -614,3 +646,49 @@ class SSHClientTest(unittest.TestCase):
# Hand in just the class (new behavior)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
assert isinstance(client._policy, paramiko.AutoAddPolicy)
+
+
+class PasswordPassphraseTests(ClientTest):
+ # TODO: most of these could reasonably be set up to use mocks/assertions
+ # (e.g. "gave passphrase -> expect PKey was given it as the passphrase")
+ # instead of suffering a real connection cycle.
+ # TODO: in that case, move the below to be part of an integration suite?
+
+ def test_password_kwarg_works_for_password_auth(self):
+ # Straightforward / duplicate of earlier basic password test.
+ self._test_connection(password="pygmalion")
+
+ # TODO: more granular exception pending #387; should be signaling "no auth
+ # methods available" because no key and no password
+ @raises(SSHException)
+ def test_passphrase_kwarg_not_used_for_password_auth(self):
+ # Using the "right" password in the "wrong" field shouldn't work.
+ self._test_connection(passphrase="pygmalion")
+
+ def test_passphrase_kwarg_used_for_key_passphrase(self):
+ # Straightforward again, with new passphrase kwarg.
+ self._test_connection(
+ key_filename=_support("test_rsa_password.key"),
+ passphrase="television",
+ )
+
+ def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given(
+ self
+ ): # noqa
+ # Backwards compatibility: passphrase in the password field.
+ self._test_connection(
+ key_filename=_support("test_rsa_password.key"),
+ password="television",
+ )
+
+ @raises(AuthenticationException) # TODO: more granular
+ def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( # noqa
+ self
+ ):
+ # Sanity: if we're given both fields, the password field is NOT used as
+ # a passphrase.
+ self._test_connection(
+ key_filename=_support("test_rsa_password.key"),
+ password="television",
+ passphrase="wat? lol no",
+ )