diff options
Diffstat (limited to 'tests/test_ssh_gss.py')
-rw-r--r-- | tests/test_ssh_gss.py | 79 |
1 files changed, 43 insertions, 36 deletions
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index d8d05d2b..b6b50152 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -29,17 +29,17 @@ import unittest import paramiko -from tests.util import test_path -from tests.test_client import FINGERPRINTS +from .util import _support, needs_gssapi +from .test_client import FINGERPRINTS -class NullServer (paramiko.ServerInterface): +class NullServer(paramiko.ServerInterface): def get_allowed_auths(self, username): - return 'gssapi-with-mic,publickey' + return "gssapi-with-mic,publickey" - def check_auth_gssapi_with_mic(self, username, - gss_authenticated=paramiko.AUTH_FAILED, - cc_file=None): + def check_auth_gssapi_with_mic( + self, username, gss_authenticated=paramiko.AUTH_FAILED, cc_file=None + ): if gss_authenticated == paramiko.AUTH_SUCCESSFUL: return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED @@ -61,23 +61,20 @@ class NullServer (paramiko.ServerInterface): return paramiko.OPEN_SUCCEEDED def check_channel_exec_request(self, channel, command): - if command != 'yes': + if command != "yes": return False return True +@needs_gssapi class GSSAuthTest(unittest.TestCase): - @staticmethod - def init(username, hostname): - global krb5_principal, targ_name - krb5_principal = username - targ_name = hostname - def setUp(self): - self.username = krb5_principal - self.hostname = socket.getfqdn(targ_name) + # TODO: username and targ_name should come from os.environ or whatever + # the approved pytest method is for runtime-configuring test data. + self.username = "krb5_principal" + self.hostname = socket.getfqdn("targ_name") self.sockl = socket.socket() - self.sockl.bind((targ_name, 0)) + self.sockl.bind(("targ_name", 0)) self.sockl.listen(1) self.addr, self.port = self.sockl.getsockname() self.event = threading.Event() @@ -92,7 +89,7 @@ class GSSAuthTest(unittest.TestCase): def _run(self): self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks) - host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") self.ts.add_server_key(host_key) server = NullServer() self.ts.start_server(self.event, server) @@ -103,15 +100,22 @@ class GSSAuthTest(unittest.TestCase): The exception is ... no exception yet """ - host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.WarningPolicy()) - self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), - 'ssh-rsa', public_host_key) - self.tc.connect(hostname=self.addr, port=self.port, username=self.username, gss_host=self.hostname, - gss_auth=True, **kwargs) + self.tc.get_host_keys().add( + "[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key + ) + self.tc.connect( + hostname=self.addr, + port=self.port, + username=self.username, + gss_host=self.hostname, + gss_auth=True, + **kwargs + ) self.event.wait(1.0) self.assert_(self.event.is_set()) @@ -119,17 +123,17 @@ class GSSAuthTest(unittest.TestCase): self.assertEquals(self.username, self.ts.get_username()) self.assertEquals(True, self.ts.is_authenticated()) - stdin, stdout, stderr = self.tc.exec_command('yes') + stdin, stdout, stderr = self.tc.exec_command("yes") schan = self.ts.accept(1.0) - schan.send('Hello there.\n') - schan.send_stderr('This is on stderr.\n') + schan.send("Hello there.\n") + schan.send_stderr("This is on stderr.\n") schan.close() - self.assertEquals('Hello there.\n', stdout.readline()) - self.assertEquals('', stdout.readline()) - self.assertEquals('This is on stderr.\n', stderr.readline()) - self.assertEquals('', stderr.readline()) + self.assertEquals("Hello there.\n", stdout.readline()) + self.assertEquals("", stdout.readline()) + self.assertEquals("This is on stderr.\n", stderr.readline()) + self.assertEquals("", stderr.readline()) stdin.close() stdout.close() @@ -140,14 +144,17 @@ class GSSAuthTest(unittest.TestCase): Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication (gssapi-with-mic) in client and server mode. """ - self._test_connection(allow_agent=False, - look_for_keys=False) + self._test_connection(allow_agent=False, look_for_keys=False) def test_2_auth_trickledown(self): """ Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding """ - self.hostname = "this_host_does_not_exists_and_causes_a_GSSAPI-exception" - self._test_connection(key_filename=[test_path('test_rsa.key')], - allow_agent=False, - look_for_keys=False) + self.hostname = ( + "this_host_does_not_exists_and_causes_a_GSSAPI-exception" + ) + self._test_connection( + key_filename=[_support("test_rsa.key")], + allow_agent=False, + look_for_keys=False, + ) |