diff options
Diffstat (limited to 'tests/test_client.py')
-rw-r--r-- | tests/test_client.py | 253 |
1 files changed, 133 insertions, 120 deletions
diff --git a/tests/test_client.py b/tests/test_client.py index 7163fdcf..4943df29 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -48,30 +48,31 @@ requires_gss_auth = unittest.skipUnless( ) FINGERPRINTS = { - 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c', - 'ssh-rsa': b'\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5', - 'ecdsa-sha2-nistp256': b'\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60', - 'ssh-ed25519': b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80', + "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", + "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", + "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", + "ssh-ed25519": b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80', } class NullServer(paramiko.ServerInterface): + def __init__(self, *args, **kwargs): # Allow tests to enable/disable specific key types - self.__allowed_keys = kwargs.pop('allowed_keys', []) + self.__allowed_keys = kwargs.pop("allowed_keys", []) # And allow them to set a (single...meh) expected public blob (cert) - self.__expected_public_blob = kwargs.pop('public_blob', None) + self.__expected_public_blob = kwargs.pop("public_blob", None) super(NullServer, self).__init__(*args, **kwargs) def get_allowed_auths(self, username): - if username == 'slowdive': - return 'publickey,password' - return 'publickey' + if username == "slowdive": + return "publickey,password" + return "publickey" def check_auth_password(self, username, password): - if (username == 'slowdive') and (password == 'pygmalion'): + if (username == "slowdive") and (password == "pygmalion"): return paramiko.AUTH_SUCCESSFUL - if (username == 'slowdive') and (password == 'unresponsive-server'): + if (username == "slowdive") and (password == "unresponsive-server"): time.sleep(5) return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED @@ -83,13 +84,13 @@ class NullServer(paramiko.ServerInterface): return paramiko.AUTH_FAILED # Base check: allowed auth type & fingerprint matches happy = ( - key.get_name() in self.__allowed_keys and - key.get_fingerprint() == expected + key.get_name() in self.__allowed_keys + and key.get_fingerprint() == expected ) # Secondary check: if test wants assertions about cert data if ( - self.__expected_public_blob is not None and - key.public_blob != self.__expected_public_blob + self.__expected_public_blob is not None + and key.public_blob != self.__expected_public_blob ): happy = False return paramiko.AUTH_SUCCESSFUL if happy else paramiko.AUTH_FAILED @@ -98,31 +99,32 @@ class NullServer(paramiko.ServerInterface): return paramiko.OPEN_SUCCEEDED def check_channel_exec_request(self, channel, command): - if command != b'yes': + if command != b"yes": return False return True def check_channel_env_request(self, channel, name, value): - if name == 'INVALID_ENV': + if name == "INVALID_ENV": return False - if not hasattr(channel, 'env'): - setattr(channel, 'env', {}) + if not hasattr(channel, "env"): + setattr(channel, "env", {}) channel.env[name] = value return True class ClientTest(unittest.TestCase): + def setUp(self): self.sockl = socket.socket() - self.sockl.bind(('localhost', 0)) + self.sockl.bind(("localhost", 0)) self.sockl.listen(1) self.addr, self.port = self.sockl.getsockname() self.connect_kwargs = dict( hostname=self.addr, port=self.port, - username='slowdive', + username="slowdive", look_for_keys=False, ) self.event = threading.Event() @@ -130,10 +132,10 @@ class ClientTest(unittest.TestCase): def tearDown(self): # Shut down client Transport - if hasattr(self, 'tc'): + if hasattr(self, "tc"): self.tc.close() # Shut down shared socket - if hasattr(self, 'sockl'): + if hasattr(self, "sockl"): # Signal to server thread that it should shut down early; it checks # this immediately after accept(). (In scenarios where connection # actually succeeded during the test, this becomes a no-op.) @@ -151,7 +153,7 @@ class ClientTest(unittest.TestCase): self.sockl.close() def _run( - self, allowed_keys=None, delay=0, public_blob=None, kill_event=None, + self, allowed_keys=None, delay=0, public_blob=None, kill_event=None ): if allowed_keys is None: allowed_keys = FINGERPRINTS.keys() @@ -163,10 +165,10 @@ class ClientTest(unittest.TestCase): self.socks.close() return self.ts = paramiko.Transport(self.socks) - keypath = _support('test_rsa.key') + keypath = _support("test_rsa.key") host_key = paramiko.RSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) - keypath = _support('test_ecdsa_256.key') + keypath = _support("test_ecdsa_256.key") host_key = paramiko.ECDSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob) @@ -181,17 +183,21 @@ class ClientTest(unittest.TestCase): The exception is ``allowed_keys`` which is stripped and handed to the ``NullServer`` used for testing. """ - run_kwargs = {'kill_event': self.kill_event} - for key in ('allowed_keys', 'public_blob'): + run_kwargs = {"kill_event": self.kill_event} + for key in ("allowed_keys", "public_blob"): run_kwargs[key] = kwargs.pop(key, None) # Server setup threading.Thread(target=self._run, kwargs=run_kwargs).start() - host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) + host_key = paramiko.RSAKey.from_private_key_file( + _support("test_rsa.key") + ) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) # Client setup self.tc = paramiko.SSHClient() - self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key) + self.tc.get_host_keys().add( + "[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key + ) # Actual connection self.tc.connect(**dict(self.connect_kwargs, **kwargs)) @@ -200,22 +206,22 @@ class ClientTest(unittest.TestCase): self.event.wait(1.0) self.assertTrue(self.event.is_set()) self.assertTrue(self.ts.is_active()) - self.assertEqual('slowdive', self.ts.get_username()) + self.assertEqual("slowdive", self.ts.get_username()) self.assertEqual(True, self.ts.is_authenticated()) self.assertEqual(False, self.tc.get_transport().gss_kex_used) # Command execution functions? - stdin, stdout, stderr = self.tc.exec_command('yes') + stdin, stdout, stderr = self.tc.exec_command("yes") schan = self.ts.accept(1.0) - schan.send('Hello there.\n') - schan.send_stderr('This is on stderr.\n') + schan.send("Hello there.\n") + schan.send_stderr("This is on stderr.\n") schan.close() - self.assertEqual('Hello there.\n', stdout.readline()) - self.assertEqual('', stdout.readline()) - self.assertEqual('This is on stderr.\n', stderr.readline()) - self.assertEqual('', stderr.readline()) + self.assertEqual("Hello there.\n", stdout.readline()) + self.assertEqual("", stdout.readline()) + self.assertEqual("This is on stderr.\n", stderr.readline()) + self.assertEqual("", stderr.readline()) # Cleanup stdin.close() @@ -224,32 +230,33 @@ class ClientTest(unittest.TestCase): class SSHClientTest(ClientTest): + def test_1_client(self): """ verify that the SSHClient stuff works too. """ - self._test_connection(password='pygmalion') + self._test_connection(password="pygmalion") def test_2_client_dsa(self): """ verify that SSHClient works with a DSA key. """ - self._test_connection(key_filename=_support('test_dss.key')) + self._test_connection(key_filename=_support("test_dss.key")) def test_client_rsa(self): """ verify that SSHClient works with an RSA key. """ - self._test_connection(key_filename=_support('test_rsa.key')) + self._test_connection(key_filename=_support("test_rsa.key")) def test_2_5_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ - self._test_connection(key_filename=_support('test_ecdsa_256.key')) + self._test_connection(key_filename=_support("test_ecdsa_256.key")) def test_client_ed25519(self): - self._test_connection(key_filename=_support('test_ed25519.key')) + self._test_connection(key_filename=_support("test_ed25519.key")) def test_3_multiple_key_files(self): """ @@ -257,22 +264,22 @@ class SSHClientTest(ClientTest): """ # This is dumb :( types_ = { - 'rsa': 'ssh-rsa', - 'dss': 'ssh-dss', - 'ecdsa': 'ecdsa-sha2-nistp256', + "rsa": "ssh-rsa", + "dss": "ssh-dss", + "ecdsa": "ecdsa-sha2-nistp256", } # Various combos of attempted & valid keys # TODO: try every possible combo using itertools functions for attempt, accept in ( - (['rsa', 'dss'], ['dss']), # Original test #3 - (['dss', 'rsa'], ['dss']), # Ordering matters sometimes, sadly - (['dss', 'rsa', 'ecdsa_256'], ['dss']), # Try ECDSA but fail - (['rsa', 'ecdsa_256'], ['ecdsa']), # ECDSA success + (["rsa", "dss"], ["dss"]), # Original test #3 + (["dss", "rsa"], ["dss"]), # Ordering matters sometimes, sadly + (["dss", "rsa", "ecdsa_256"], ["dss"]), # Try ECDSA but fail + (["rsa", "ecdsa_256"], ["ecdsa"]), # ECDSA success ): try: self._test_connection( key_filename=[ - _support('test_{}.key'.format(x)) for x in attempt + _support("test_{}.key".format(x)) for x in attempt ], allowed_keys=[types_[x] for x in accept], ) @@ -288,10 +295,11 @@ class SSHClientTest(ClientTest): """ # Until #387 is fixed we have to catch a high-up exception since # various platforms trigger different errors here >_< - self.assertRaises(SSHException, + self.assertRaises( + SSHException, self._test_connection, - key_filename=[_support('test_rsa.key')], - allowed_keys=['ecdsa-sha2-nistp256'], + key_filename=[_support("test_rsa.key")], + allowed_keys=["ecdsa-sha2-nistp256"], ) def test_certs_allowed_as_key_filename_values(self): @@ -299,9 +307,9 @@ class SSHClientTest(ClientTest): # They're similar except for which path is given; the expected auth and # server-side behavior is 100% identical.) # NOTE: only bothered whipping up one cert per overall class/family. - for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'): - cert_name = 'test_{}.key-cert.pub'.format(type_) - cert_path = _support(os.path.join('cert_support', cert_name)) + for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"): + cert_name = "test_{}.key-cert.pub".format(type_) + cert_path = _support(os.path.join("cert_support", cert_name)) self._test_connection( key_filename=cert_path, public_blob=PublicBlob.from_file(cert_path), @@ -314,13 +322,13 @@ class SSHClientTest(ClientTest): # about the server-side key object's public blob. Thus, we can prove # that a specific cert was found, along with regular authorization # succeeding proving that the overall flow works. - for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'): - key_name = 'test_{}.key'.format(type_) - key_path = _support(os.path.join('cert_support', key_name)) + for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"): + key_name = "test_{}.key".format(type_) + key_path = _support(os.path.join("cert_support", key_name)) self._test_connection( key_filename=key_path, public_blob=PublicBlob.from_file( - '{}-cert.pub'.format(key_path) + "{}-cert.pub".format(key_path) ), ) @@ -335,19 +343,19 @@ class SSHClientTest(ClientTest): verify that SSHClient's AutoAddPolicy works. """ threading.Thread(target=self._run).start() - hostname = '[%s]:%d' % (self.addr, self.port) - key_file = _support('test_ecdsa_256.key') + hostname = "[%s]:%d" % (self.addr, self.port) + key_file = _support("test_ecdsa_256.key") public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.assertEqual(0, len(self.tc.get_host_keys())) - self.tc.connect(password='pygmalion', **self.connect_kwargs) + self.tc.connect(password="pygmalion", **self.connect_kwargs) self.event.wait(1.0) self.assertTrue(self.event.is_set()) self.assertTrue(self.ts.is_active()) - self.assertEqual('slowdive', self.ts.get_username()) + self.assertEqual("slowdive", self.ts.get_username()) self.assertEqual(True, self.ts.is_authenticated()) self.assertEqual(1, len(self.tc.get_host_keys())) new_host_key = list(self.tc.get_host_keys()[hostname].values())[0] @@ -357,9 +365,11 @@ class SSHClientTest(ClientTest): """ verify that SSHClient correctly saves a known_hosts file. """ - warnings.filterwarnings('ignore', 'tempnam.*') + warnings.filterwarnings("ignore", "tempnam.*") - host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) + host_key = paramiko.RSAKey.from_private_key_file( + _support("test_rsa.key") + ) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) fd, localname = mkstemp() os.close(fd) @@ -367,11 +377,13 @@ class SSHClientTest(ClientTest): client = paramiko.SSHClient() self.assertEquals(0, len(client.get_host_keys())) - host_id = '[%s]:%d' % (self.addr, self.port) + host_id = "[%s]:%d" % (self.addr, self.port) - client.get_host_keys().add(host_id, 'ssh-rsa', public_host_key) + client.get_host_keys().add(host_id, "ssh-rsa", public_host_key) self.assertEquals(1, len(client.get_host_keys())) - self.assertEquals(public_host_key, client.get_host_keys()[host_id]['ssh-rsa']) + self.assertEquals( + public_host_key, client.get_host_keys()[host_id]["ssh-rsa"] + ) client.save_host_keys(localname) @@ -394,7 +406,7 @@ class SSHClientTest(ClientTest): self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.assertEqual(0, len(self.tc.get_host_keys())) - self.tc.connect(**dict(self.connect_kwargs, password='pygmalion')) + self.tc.connect(**dict(self.connect_kwargs, password="pygmalion")) self.event.wait(1.0) self.assertTrue(self.event.is_set()) @@ -423,7 +435,7 @@ class SSHClientTest(ClientTest): self.tc = tc self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.assertEquals(0, len(self.tc.get_host_keys())) - self.tc.connect(**dict(self.connect_kwargs, password='pygmalion')) + self.tc.connect(**dict(self.connect_kwargs, password="pygmalion")) self.event.wait(1.0) self.assertTrue(self.event.is_set()) @@ -438,19 +450,19 @@ class SSHClientTest(ClientTest): verify that the SSHClient has a configurable banner timeout. """ # Start the thread with a 1 second wait. - threading.Thread(target=self._run, kwargs={'delay': 1}).start() - host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) + threading.Thread(target=self._run, kwargs={"delay": 1}).start() + host_key = paramiko.RSAKey.from_private_key_file( + _support("test_rsa.key") + ) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() - self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key) + self.tc.get_host_keys().add( + "[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key + ) # Connect with a half second banner timeout. kwargs = dict(self.connect_kwargs, banner_timeout=0.5) - self.assertRaises( - paramiko.SSHException, - self.tc.connect, - **kwargs - ) + self.assertRaises(paramiko.SSHException, self.tc.connect, **kwargs) def test_8_auth_trickledown(self): """ @@ -466,9 +478,9 @@ class SSHClientTest(ClientTest): # 'television' as per tests/test_pkey.py). NOTE: must use # key_filename, loading the actual key here with PKey will except # immediately; we're testing the try/except crap within Client. - key_filename=[_support('test_rsa_password.key')], + key_filename=[_support("test_rsa_password.key")], # Actual password for default 'slowdive' user - password='pygmalion', + password="pygmalion", ) self._test_connection(**kwargs) @@ -481,7 +493,7 @@ class SSHClientTest(ClientTest): self.assertRaises( AuthenticationException, self._test_connection, - password='unresponsive-server', + password="unresponsive-server", auth_timeout=0.5, ) @@ -490,10 +502,7 @@ class SSHClientTest(ClientTest): """ Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding """ - kwargs = dict( - gss_kex=True, - key_filename=[_support('test_rsa.key')], - ) + kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")]) self._test_connection(**kwargs) @requires_gss_auth @@ -501,10 +510,7 @@ class SSHClientTest(ClientTest): """ Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding """ - kwargs = dict( - gss_auth=True, - key_filename=[_support('test_rsa.key')], - ) + kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")]) self._test_connection(**kwargs) def test_12_reject_policy(self): @@ -519,7 +525,8 @@ class SSHClientTest(ClientTest): self.assertRaises( paramiko.SSHException, self.tc.connect, - password='pygmalion', **self.connect_kwargs + password="pygmalion", + **self.connect_kwargs ) @requires_gss_auth @@ -537,14 +544,14 @@ class SSHClientTest(ClientTest): self.assertRaises( paramiko.SSHException, self.tc.connect, - password='pygmalion', + password="pygmalion", gss_kex=True, - **self.connect_kwargs + **self.connect_kwargs ) def _client_host_key_bad(self, host_key): threading.Thread(target=self._run).start() - hostname = '[%s]:%d' % (self.addr, self.port) + hostname = "[%s]:%d" % (self.addr, self.port) self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.WarningPolicy()) @@ -554,13 +561,13 @@ class SSHClientTest(ClientTest): self.assertRaises( paramiko.BadHostKeyException, self.tc.connect, - password='pygmalion', + password="pygmalion", **self.connect_kwargs ) def _client_host_key_good(self, ktype, kfile): threading.Thread(target=self._run).start() - hostname = '[%s]:%d' % (self.addr, self.port) + hostname = "[%s]:%d" % (self.addr, self.port) self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) @@ -568,7 +575,7 @@ class SSHClientTest(ClientTest): known_hosts = self.tc.get_host_keys() known_hosts.add(hostname, host_key.get_name(), host_key) - self.tc.connect(password='pygmalion', **self.connect_kwargs) + self.tc.connect(password="pygmalion", **self.connect_kwargs) self.event.wait(1.0) self.assertTrue(self.event.is_set()) self.assertTrue(self.ts.is_active()) @@ -583,10 +590,10 @@ class SSHClientTest(ClientTest): self._client_host_key_bad(host_key) def test_host_key_negotiation_3(self): - self._client_host_key_good(paramiko.ECDSAKey, 'test_ecdsa_256.key') + self._client_host_key_good(paramiko.ECDSAKey, "test_ecdsa_256.key") def test_host_key_negotiation_4(self): - self._client_host_key_good(paramiko.RSAKey, 'test_rsa.key') + self._client_host_key_good(paramiko.RSAKey, "test_rsa.key") def _setup_for_env(self): threading.Thread(target=self._run).start() @@ -594,7 +601,9 @@ class SSHClientTest(ClientTest): self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.assertEqual(0, len(self.tc.get_host_keys())) - self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') + self.tc.connect( + self.addr, self.port, username="slowdive", password="pygmalion" + ) self.event.wait(1.0) self.assertTrue(self.event.isSet()) @@ -605,11 +614,11 @@ class SSHClientTest(ClientTest): Verify that environment variables can be set by the client. """ self._setup_for_env() - target_env = {b'A': b'B', b'C': b'd'} + target_env = {b"A": b"B", b"C": b"d"} - self.tc.exec_command('yes', environment=target_env) + self.tc.exec_command("yes", environment=target_env) schan = self.ts.accept(1.0) - self.assertEqual(target_env, getattr(schan, 'env', {})) + self.assertEqual(target_env, getattr(schan, "env", {})) schan.close() @unittest.skip("Clients normally fail silently, thus so do we, for now") @@ -617,14 +626,14 @@ class SSHClientTest(ClientTest): self._setup_for_env() with self.assertRaises(SSHException) as manager: # Verify that a rejection by the server can be detected - self.tc.exec_command('yes', environment={b'INVALID_ENV': b''}) + self.tc.exec_command("yes", environment={b"INVALID_ENV": b""}) self.assertTrue( - 'INVALID_ENV' in str(manager.exception), - 'Expected variable name in error message' + "INVALID_ENV" in str(manager.exception), + "Expected variable name in error message", ) self.assertTrue( isinstance(manager.exception.args[1], SSHException), - 'Expected original SSHException in exception' + "Expected original SSHException in exception", ) def test_missing_key_policy_accepts_classes_or_instances(self): @@ -652,35 +661,39 @@ class PasswordPassphraseTests(ClientTest): def test_password_kwarg_works_for_password_auth(self): # Straightforward / duplicate of earlier basic password test. - self._test_connection(password='pygmalion') + self._test_connection(password="pygmalion") # TODO: more granular exception pending #387; should be signaling "no auth # methods available" because no key and no password @raises(SSHException) def test_passphrase_kwarg_not_used_for_password_auth(self): # Using the "right" password in the "wrong" field shouldn't work. - self._test_connection(passphrase='pygmalion') + self._test_connection(passphrase="pygmalion") def test_passphrase_kwarg_used_for_key_passphrase(self): # Straightforward again, with new passphrase kwarg. self._test_connection( - key_filename=_support('test_rsa_password.key'), - passphrase='television', + key_filename=_support("test_rsa_password.key"), + passphrase="television", ) - def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given(self): # noqa + def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given( + self + ): # noqa # Backwards compatibility: passphrase in the password field. self._test_connection( - key_filename=_support('test_rsa_password.key'), - password='television', + key_filename=_support("test_rsa_password.key"), + password="television", ) - @raises(AuthenticationException) # TODO: more granular - def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given(self): # noqa + @raises(AuthenticationException) # TODO: more granular + def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( + self + ): # noqa # Sanity: if we're given both fields, the password field is NOT used as # a passphrase. self._test_connection( - key_filename=_support('test_rsa_password.key'), - password='television', - passphrase='wat? lol no', + key_filename=_support("test_rsa_password.key"), + password="television", + passphrase="wat? lol no", ) |