diff options
-rw-r--r-- | paramiko/ssh_gss.py | 74 | ||||
-rw-r--r-- | tests/test_gssapi.py | 43 | ||||
-rw-r--r-- | tests/util.py | 28 |
3 files changed, 92 insertions, 53 deletions
diff --git a/paramiko/ssh_gss.py b/paramiko/ssh_gss.py index ff2fa065..06aac761 100644 --- a/paramiko/ssh_gss.py +++ b/paramiko/ssh_gss.py @@ -51,14 +51,17 @@ _API = None try: import gssapi - if hasattr(gssapi, '__title__') and gssapi.__title__ == 'python-gssapi': + + if hasattr(gssapi, "__title__") and gssapi.__title__ == "python-gssapi": # old, unmaintained python-gssapi package _API = "MIT" # keep this for compatibility GSS_EXCEPTIONS = (gssapi.GSSException,) else: _API = "PYTHON-GSSAPI-NEW" - GSS_EXCEPTIONS = (gssapi.exceptions.GeneralError, - gssapi.raw.misc.GSSError,) + GSS_EXCEPTIONS = ( + gssapi.exceptions.GeneralError, + gssapi.raw.misc.GSSError, + ) except (ImportError, OSError): try: import pywintypes @@ -422,6 +425,7 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth): :see: `.GSSAuth` """ + def __init__(self, auth_method, gss_deleg_creds): """ :param str auth_method: The name of the SSH authentication mechanism @@ -431,17 +435,22 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth): _SSH_GSSAuth.__init__(self, auth_method, gss_deleg_creds) if self._gss_deleg_creds: - self._gss_flags = (gssapi.RequirementFlag.protection_ready, - gssapi.RequirementFlag.integrity, - gssapi.RequirementFlag.mutual_authentication, - gssapi.RequirementFlag.delegate_to_peer) + self._gss_flags = ( + gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.mutual_authentication, + gssapi.RequirementFlag.delegate_to_peer, + ) else: - self._gss_flags = (gssapi.RequirementFlag.protection_ready, - gssapi.RequirementFlag.integrity, - gssapi.RequirementFlag.mutual_authentication) + self._gss_flags = ( + gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.mutual_authentication, + ) - def ssh_init_sec_context(self, target, desired_mech=None, - username=None, recv_token=None): + def ssh_init_sec_context( + self, target, desired_mech=None, username=None, recv_token=None + ): """ Initialize a GSS-API context. @@ -460,8 +469,10 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth): """ self._username = username self._gss_host = target - targ_name = gssapi.Name("host@" + self._gss_host, - name_type=gssapi.NameType.hostbased_service) + targ_name = gssapi.Name( + "host@" + self._gss_host, + name_type=gssapi.NameType.hostbased_service, + ) if desired_mech is not None: mech, __ = decoder.decode(desired_mech) if mech.__str__() != self._krb5_mech: @@ -469,10 +480,12 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth): krb5_mech = gssapi.MechType.kerberos token = None if recv_token is None: - self._gss_ctxt = gssapi.SecurityContext(name=targ_name, - flags=self._gss_flags, - mech=krb5_mech, - usage='initiate') + self._gss_ctxt = gssapi.SecurityContext( + name=targ_name, + flags=self._gss_flags, + mech=krb5_mech, + usage="initiate", + ) token = self._gss_ctxt.step(token) else: token = self._gss_ctxt.step(recv_token) @@ -495,10 +508,12 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth): """ self._session_id = session_id if not gss_kex: - mic_field = self._ssh_build_mic(self._session_id, - self._username, - self._service, - self._auth_method) + mic_field = self._ssh_build_mic( + self._session_id, + self._username, + self._service, + self._auth_method, + ) mic_token = self._gss_ctxt.get_signature(mic_field) else: # for key exchange with gssapi-keyex @@ -520,7 +535,7 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth): self._gss_host = hostname self._username = username if self._gss_srv_ctxt is None: - self._gss_srv_ctxt = gssapi.SecurityContext(usage='accept') + self._gss_srv_ctxt = gssapi.SecurityContext(usage="accept") token = self._gss_srv_ctxt.step(recv_token) self._gss_srv_ctxt_status = self._gss_srv_ctxt.complete return token @@ -539,16 +554,17 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth): self._username = username if self._username is not None: # server mode - mic_field = self._ssh_build_mic(self._session_id, - self._username, - self._service, - self._auth_method) + mic_field = self._ssh_build_mic( + self._session_id, + self._username, + self._service, + self._auth_method, + ) self._gss_srv_ctxt.verify_signature(mic_field, mic_token) else: # for key exchange with gssapi-keyex # client mode - self._gss_ctxt.verify_signature(self._session_id, - mic_token) + self._gss_ctxt.verify_signature(self._session_id, mic_token) @property def credentials_delegated(self): diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index 98d4d14e..8e6ec37a 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -53,8 +53,11 @@ class GSSAPITest(KerberosTestCase): """ try: import gssapi - if (hasattr(gssapi, '__title__') and - gssapi.__title__ == 'python-gssapi'): + + if ( + hasattr(gssapi, "__title__") + and gssapi.__title__ == "python-gssapi" + ): _API = "PYTHON-GSSAPI-OLD" else: _API = "PYTHON-GSSAPI-NEW" @@ -118,28 +121,36 @@ class GSSAPITest(KerberosTestCase): self.assertEquals(0, status) elif _API == "PYTHON-GSSAPI-NEW": if self.server_mode: - gss_flags = (gssapi.RequirementFlag.protection_ready, - gssapi.RequirementFlag.integrity, - gssapi.RequirementFlag.mutual_authentication, - gssapi.RequirementFlag.delegate_to_peer) + gss_flags = ( + gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.mutual_authentication, + gssapi.RequirementFlag.delegate_to_peer, + ) else: - gss_flags = (gssapi.RequirementFlag.protection_ready, - gssapi.RequirementFlag.integrity, - gssapi.RequirementFlag.delegate_to_peer) + gss_flags = ( + gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.delegate_to_peer, + ) # Initialize a GSS-API context. krb5_oid = gssapi.MechType.kerberos - target_name = gssapi.Name("host@" + self.targ_name, - name_type=gssapi.NameType.hostbased_service) - gss_ctxt = gssapi.SecurityContext(name=target_name, - flags=gss_flags, - mech=krb5_oid, - usage='initiate') + target_name = gssapi.Name( + "host@" + self.targ_name, + name_type=gssapi.NameType.hostbased_service, + ) + gss_ctxt = gssapi.SecurityContext( + name=target_name, + flags=gss_flags, + mech=krb5_oid, + usage="initiate", + ) if self.server_mode: c_token = gss_ctxt.step(c_token) gss_ctxt_status = gss_ctxt.complete self.assertEquals(False, gss_ctxt_status) # Accept a GSS-API context. - gss_srv_ctxt = gssapi.SecurityContext(usage='accept') + gss_srv_ctxt = gssapi.SecurityContext(usage="accept") s_token = gss_srv_ctxt.step(c_token) gss_ctxt_status = gss_srv_ctxt.complete self.assertNotEquals(None, s_token) diff --git a/tests/util.py b/tests/util.py index be56b37d..cdc835c9 100644 --- a/tests/util.py +++ b/tests/util.py @@ -13,8 +13,9 @@ def _support(filename): return join(dirname(realpath(__file__)), filename) -needs_gssapi = pytest.mark.skipif(not GSS_AUTH_AVAILABLE, - reason="No GSSAPI to test") +needs_gssapi = pytest.mark.skipif( + not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test" +) def needs_builtin(name): @@ -41,9 +42,11 @@ slow = pytest.mark.slow # # ToDo: add a Windows specific implementation? -if (os.environ.get("K5TEST_USER_PRINC", None) and - os.environ.get("K5TEST_HOSTNAME", None) and - os.environ.get("KRB5_KTNAME", None)): # add other vars as needed +if ( + os.environ.get("K5TEST_USER_PRINC", None) + and os.environ.get("K5TEST_HOSTNAME", None) + and os.environ.get("KRB5_KTNAME", None) +): # add other vars as needed # The environment provides the required information class DummyK5Realm(object): @@ -62,6 +65,8 @@ if (os.environ.get("K5TEST_USER_PRINC", None) and @classmethod def tearDownClass(cls): del cls.realm + + else: try: # Try to setup a kerberos environment @@ -71,22 +76,28 @@ else: class KerberosTestCase(unittest.TestCase): @classmethod def setUpClass(cls): - raise unittest.SkipTest('Missing extension package k5test. ' - 'Please run "pip install k5test" ' - 'to install it.') + raise unittest.SkipTest( + "Missing extension package k5test. " + 'Please run "pip install k5test" ' + "to install it." + ) + def update_env(testcase, mapping, env=os.environ): """Modify os.environ during a test case and restore during cleanup.""" saved_env = env.copy() + def replace(target, source): target.update(source) for k in list(target): if k not in source: target.pop(k, None) + testcase.addCleanup(replace, env, saved_env) env.update(mapping) return testcase + def k5shell(args=None): """Create a shell with an kerberos environment @@ -97,6 +108,7 @@ def k5shell(args=None): import k5test import atexit import subprocess + k5 = k5test.K5Realm() atexit.register(k5.stop) os.environ.update(k5.env) |