diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_client.py | 61 | ||||
-rw-r--r-- | tests/test_kex_gss.py | 21 | ||||
-rw-r--r-- | tests/test_ssh_gss.py | 45 |
3 files changed, 118 insertions, 9 deletions
diff --git a/tests/test_client.py b/tests/test_client.py index e912d5b2..7710055b 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -159,6 +159,7 @@ class SSHClientTest (unittest.TestCase): self.assertTrue(self.ts.is_active()) self.assertEqual('slowdive', self.ts.get_username()) self.assertEqual(True, self.ts.is_authenticated()) + self.assertEqual(False, self.tc.get_transport().gss_kex_used) # Command execution functions? stdin, stdout, stderr = self.tc.exec_command('yes') @@ -402,6 +403,66 @@ class SSHClientTest (unittest.TestCase): auth_timeout=0.5, ) + def test_10_auth_trickledown_gsskex(self): + """ + Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding + """ + if not paramiko.GSS_AUTH_AVAILABLE: + return # for python 2.6 lacks skipTest + kwargs = dict( + gss_kex=True, + key_filename=[test_path('test_rsa.key')], + ) + self._test_connection(**kwargs) + + def test_11_auth_trickledown_gssauth(self): + """ + Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding + """ + if not paramiko.GSS_AUTH_AVAILABLE: + return # for python 2.6 lacks skipTest + kwargs = dict( + gss_auth=True, + key_filename=[test_path('test_rsa.key')], + ) + self._test_connection(**kwargs) + + def test_12_reject_policy(self): + """ + verify that SSHClient's RejectPolicy works. + """ + threading.Thread(target=self._run).start() + + self.tc = paramiko.SSHClient() + self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) + self.assertEqual(0, len(self.tc.get_host_keys())) + self.assertRaises( + paramiko.SSHException, + self.tc.connect, + password='pygmalion', **self.connect_kwargs + ) + + def test_13_reject_policy_gsskex(self): + """ + verify that SSHClient's RejectPolicy works, + even if gssapi-keyex was enabled but not used. + """ + # Test for a bug present in paramiko versions released before 2017-08-01 + if not paramiko.GSS_AUTH_AVAILABLE: + return # for python 2.6 lacks skipTest + threading.Thread(target=self._run).start() + + self.tc = paramiko.SSHClient() + self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) + self.assertEqual(0, len(self.tc.get_host_keys())) + self.assertRaises( + paramiko.SSHException, + self.tc.connect, + password='pygmalion', + gss_kex=True, + **self.connect_kwargs + ) + def _client_host_key_bad(self, host_key): threading.Thread(target=self._run).start() hostname = '[%s]:%d' % (self.addr, self.port) diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index 3bf788da..af342a7c 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -93,7 +93,7 @@ class GSSKexTest(unittest.TestCase): server = NullServer() self.ts.start_server(self.event, server) - def test_1_gsskex_and_auth(self): + def _test_gsskex_and_auth(self, gss_host, rekey=False): """ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated Diffie-Hellman Key Exchange and user authentication with the GSS-API @@ -106,16 +106,19 @@ class GSSKexTest(unittest.TestCase): self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port), 'ssh-rsa', public_host_key) self.tc.connect(self.hostname, self.port, username=self.username, - gss_auth=True, gss_kex=True) + gss_auth=True, gss_kex=True, gss_host=gss_host) self.event.wait(1.0) self.assert_(self.event.is_set()) self.assert_(self.ts.is_active()) self.assertEquals(self.username, self.ts.get_username()) self.assertEquals(True, self.ts.is_authenticated()) + self.assertEquals(True, self.tc.get_transport().gss_kex_used) stdin, stdout, stderr = self.tc.exec_command('yes') schan = self.ts.accept(1.0) + if rekey: + self.tc.get_transport().renegotiate_keys() schan.send('Hello there.\n') schan.send_stderr('This is on stderr.\n') @@ -129,3 +132,17 @@ class GSSKexTest(unittest.TestCase): stdin.close() stdout.close() stderr.close() + + def test_1_gsskex_and_auth(self): + """ + Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated + Diffie-Hellman Key Exchange and user authentication with the GSS-API + context created during key exchange. + """ + self._test_gsskex_and_auth(gss_host=None) + + def test_2_gsskex_and_auth_rekey(self): + """ + Verify that Paramiko can rekey. + """ + self._test_gsskex_and_auth(gss_host=None, rekey=True) diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index 967b3b81..d8d05d2b 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -29,11 +29,13 @@ import unittest import paramiko +from tests.util import test_path +from tests.test_client import FINGERPRINTS class NullServer (paramiko.ServerInterface): def get_allowed_auths(self, username): - return 'gssapi-with-mic' + return 'gssapi-with-mic,publickey' def check_auth_gssapi_with_mic(self, username, gss_authenticated=paramiko.AUTH_FAILED, @@ -45,6 +47,16 @@ class NullServer (paramiko.ServerInterface): def enable_auth_gssapi(self): return True + def check_auth_publickey(self, username, key): + try: + expected = FINGERPRINTS[key.get_name()] + except KeyError: + return paramiko.AUTH_FAILED + else: + if key.get_fingerprint() == expected: + return paramiko.AUTH_SUCCESSFUL + return paramiko.AUTH_FAILED + def check_channel_request(self, kind, chanid): return paramiko.OPEN_SUCCEEDED @@ -85,19 +97,21 @@ class GSSAuthTest(unittest.TestCase): server = NullServer() self.ts.start_server(self.event, server) - def test_1_gss_auth(self): + def _test_connection(self, **kwargs): """ - Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication - (gssapi-with-mic) in client and server mode. + (Most) kwargs get passed directly into SSHClient.connect(). + + The exception is ... no exception yet """ host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() - self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port), + self.tc.set_missing_host_key_policy(paramiko.WarningPolicy()) + self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key) - self.tc.connect(self.hostname, self.port, username=self.username, - gss_auth=True) + self.tc.connect(hostname=self.addr, port=self.port, username=self.username, gss_host=self.hostname, + gss_auth=True, **kwargs) self.event.wait(1.0) self.assert_(self.event.is_set()) @@ -120,3 +134,20 @@ class GSSAuthTest(unittest.TestCase): stdin.close() stdout.close() stderr.close() + + def test_1_gss_auth(self): + """ + Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication + (gssapi-with-mic) in client and server mode. + """ + self._test_connection(allow_agent=False, + look_for_keys=False) + + def test_2_auth_trickledown(self): + """ + Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding + """ + self.hostname = "this_host_does_not_exists_and_causes_a_GSSAPI-exception" + self._test_connection(key_filename=[test_path('test_rsa.key')], + allow_agent=False, + look_for_keys=False) |