summaryrefslogtreecommitdiffhomepage
path: root/tests/test_ssh_gss.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_ssh_gss.py')
-rw-r--r--tests/test_ssh_gss.py79
1 files changed, 43 insertions, 36 deletions
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index d8d05d2b..b6b50152 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -29,17 +29,17 @@ import unittest
import paramiko
-from tests.util import test_path
-from tests.test_client import FINGERPRINTS
+from .util import _support, needs_gssapi
+from .test_client import FINGERPRINTS
-class NullServer (paramiko.ServerInterface):
+class NullServer(paramiko.ServerInterface):
def get_allowed_auths(self, username):
- return 'gssapi-with-mic,publickey'
+ return "gssapi-with-mic,publickey"
- def check_auth_gssapi_with_mic(self, username,
- gss_authenticated=paramiko.AUTH_FAILED,
- cc_file=None):
+ def check_auth_gssapi_with_mic(
+ self, username, gss_authenticated=paramiko.AUTH_FAILED, cc_file=None
+ ):
if gss_authenticated == paramiko.AUTH_SUCCESSFUL:
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
@@ -61,23 +61,20 @@ class NullServer (paramiko.ServerInterface):
return paramiko.OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != 'yes':
+ if command != "yes":
return False
return True
+@needs_gssapi
class GSSAuthTest(unittest.TestCase):
- @staticmethod
- def init(username, hostname):
- global krb5_principal, targ_name
- krb5_principal = username
- targ_name = hostname
-
def setUp(self):
- self.username = krb5_principal
- self.hostname = socket.getfqdn(targ_name)
+ # TODO: username and targ_name should come from os.environ or whatever
+ # the approved pytest method is for runtime-configuring test data.
+ self.username = "krb5_principal"
+ self.hostname = socket.getfqdn("targ_name")
self.sockl = socket.socket()
- self.sockl.bind((targ_name, 0))
+ self.sockl.bind(("targ_name", 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
@@ -92,7 +89,7 @@ class GSSAuthTest(unittest.TestCase):
def _run(self):
self.socks, addr = self.sockl.accept()
self.ts = paramiko.Transport(self.socks)
- host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
self.ts.add_server_key(host_key)
server = NullServer()
self.ts.start_server(self.event, server)
@@ -103,15 +100,22 @@ class GSSAuthTest(unittest.TestCase):
The exception is ... no exception yet
"""
- host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.WarningPolicy())
- self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port),
- 'ssh-rsa', public_host_key)
- self.tc.connect(hostname=self.addr, port=self.port, username=self.username, gss_host=self.hostname,
- gss_auth=True, **kwargs)
+ self.tc.get_host_keys().add(
+ "[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key
+ )
+ self.tc.connect(
+ hostname=self.addr,
+ port=self.port,
+ username=self.username,
+ gss_host=self.hostname,
+ gss_auth=True,
+ **kwargs
+ )
self.event.wait(1.0)
self.assert_(self.event.is_set())
@@ -119,17 +123,17 @@ class GSSAuthTest(unittest.TestCase):
self.assertEquals(self.username, self.ts.get_username())
self.assertEquals(True, self.ts.is_authenticated())
- stdin, stdout, stderr = self.tc.exec_command('yes')
+ stdin, stdout, stderr = self.tc.exec_command("yes")
schan = self.ts.accept(1.0)
- schan.send('Hello there.\n')
- schan.send_stderr('This is on stderr.\n')
+ schan.send("Hello there.\n")
+ schan.send_stderr("This is on stderr.\n")
schan.close()
- self.assertEquals('Hello there.\n', stdout.readline())
- self.assertEquals('', stdout.readline())
- self.assertEquals('This is on stderr.\n', stderr.readline())
- self.assertEquals('', stderr.readline())
+ self.assertEquals("Hello there.\n", stdout.readline())
+ self.assertEquals("", stdout.readline())
+ self.assertEquals("This is on stderr.\n", stderr.readline())
+ self.assertEquals("", stderr.readline())
stdin.close()
stdout.close()
@@ -140,14 +144,17 @@ class GSSAuthTest(unittest.TestCase):
Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication
(gssapi-with-mic) in client and server mode.
"""
- self._test_connection(allow_agent=False,
- look_for_keys=False)
+ self._test_connection(allow_agent=False, look_for_keys=False)
def test_2_auth_trickledown(self):
"""
Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
"""
- self.hostname = "this_host_does_not_exists_and_causes_a_GSSAPI-exception"
- self._test_connection(key_filename=[test_path('test_rsa.key')],
- allow_agent=False,
- look_for_keys=False)
+ self.hostname = (
+ "this_host_does_not_exists_and_causes_a_GSSAPI-exception"
+ )
+ self._test_connection(
+ key_filename=[_support("test_rsa.key")],
+ allow_agent=False,
+ look_for_keys=False,
+ )