summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--paramiko/ssh_gss.py74
-rw-r--r--tests/test_gssapi.py43
-rw-r--r--tests/util.py28
3 files changed, 92 insertions, 53 deletions
diff --git a/paramiko/ssh_gss.py b/paramiko/ssh_gss.py
index ff2fa065..06aac761 100644
--- a/paramiko/ssh_gss.py
+++ b/paramiko/ssh_gss.py
@@ -51,14 +51,17 @@ _API = None
try:
import gssapi
- if hasattr(gssapi, '__title__') and gssapi.__title__ == 'python-gssapi':
+
+ if hasattr(gssapi, "__title__") and gssapi.__title__ == "python-gssapi":
# old, unmaintained python-gssapi package
_API = "MIT" # keep this for compatibility
GSS_EXCEPTIONS = (gssapi.GSSException,)
else:
_API = "PYTHON-GSSAPI-NEW"
- GSS_EXCEPTIONS = (gssapi.exceptions.GeneralError,
- gssapi.raw.misc.GSSError,)
+ GSS_EXCEPTIONS = (
+ gssapi.exceptions.GeneralError,
+ gssapi.raw.misc.GSSError,
+ )
except (ImportError, OSError):
try:
import pywintypes
@@ -422,6 +425,7 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth):
:see: `.GSSAuth`
"""
+
def __init__(self, auth_method, gss_deleg_creds):
"""
:param str auth_method: The name of the SSH authentication mechanism
@@ -431,17 +435,22 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth):
_SSH_GSSAuth.__init__(self, auth_method, gss_deleg_creds)
if self._gss_deleg_creds:
- self._gss_flags = (gssapi.RequirementFlag.protection_ready,
- gssapi.RequirementFlag.integrity,
- gssapi.RequirementFlag.mutual_authentication,
- gssapi.RequirementFlag.delegate_to_peer)
+ self._gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.mutual_authentication,
+ gssapi.RequirementFlag.delegate_to_peer,
+ )
else:
- self._gss_flags = (gssapi.RequirementFlag.protection_ready,
- gssapi.RequirementFlag.integrity,
- gssapi.RequirementFlag.mutual_authentication)
+ self._gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.mutual_authentication,
+ )
- def ssh_init_sec_context(self, target, desired_mech=None,
- username=None, recv_token=None):
+ def ssh_init_sec_context(
+ self, target, desired_mech=None, username=None, recv_token=None
+ ):
"""
Initialize a GSS-API context.
@@ -460,8 +469,10 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth):
"""
self._username = username
self._gss_host = target
- targ_name = gssapi.Name("host@" + self._gss_host,
- name_type=gssapi.NameType.hostbased_service)
+ targ_name = gssapi.Name(
+ "host@" + self._gss_host,
+ name_type=gssapi.NameType.hostbased_service,
+ )
if desired_mech is not None:
mech, __ = decoder.decode(desired_mech)
if mech.__str__() != self._krb5_mech:
@@ -469,10 +480,12 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth):
krb5_mech = gssapi.MechType.kerberos
token = None
if recv_token is None:
- self._gss_ctxt = gssapi.SecurityContext(name=targ_name,
- flags=self._gss_flags,
- mech=krb5_mech,
- usage='initiate')
+ self._gss_ctxt = gssapi.SecurityContext(
+ name=targ_name,
+ flags=self._gss_flags,
+ mech=krb5_mech,
+ usage="initiate",
+ )
token = self._gss_ctxt.step(token)
else:
token = self._gss_ctxt.step(recv_token)
@@ -495,10 +508,12 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth):
"""
self._session_id = session_id
if not gss_kex:
- mic_field = self._ssh_build_mic(self._session_id,
- self._username,
- self._service,
- self._auth_method)
+ mic_field = self._ssh_build_mic(
+ self._session_id,
+ self._username,
+ self._service,
+ self._auth_method,
+ )
mic_token = self._gss_ctxt.get_signature(mic_field)
else:
# for key exchange with gssapi-keyex
@@ -520,7 +535,7 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth):
self._gss_host = hostname
self._username = username
if self._gss_srv_ctxt is None:
- self._gss_srv_ctxt = gssapi.SecurityContext(usage='accept')
+ self._gss_srv_ctxt = gssapi.SecurityContext(usage="accept")
token = self._gss_srv_ctxt.step(recv_token)
self._gss_srv_ctxt_status = self._gss_srv_ctxt.complete
return token
@@ -539,16 +554,17 @@ class _SSH_GSSAPI_NEW(_SSH_GSSAuth):
self._username = username
if self._username is not None:
# server mode
- mic_field = self._ssh_build_mic(self._session_id,
- self._username,
- self._service,
- self._auth_method)
+ mic_field = self._ssh_build_mic(
+ self._session_id,
+ self._username,
+ self._service,
+ self._auth_method,
+ )
self._gss_srv_ctxt.verify_signature(mic_field, mic_token)
else:
# for key exchange with gssapi-keyex
# client mode
- self._gss_ctxt.verify_signature(self._session_id,
- mic_token)
+ self._gss_ctxt.verify_signature(self._session_id, mic_token)
@property
def credentials_delegated(self):
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index 98d4d14e..8e6ec37a 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -53,8 +53,11 @@ class GSSAPITest(KerberosTestCase):
"""
try:
import gssapi
- if (hasattr(gssapi, '__title__') and
- gssapi.__title__ == 'python-gssapi'):
+
+ if (
+ hasattr(gssapi, "__title__")
+ and gssapi.__title__ == "python-gssapi"
+ ):
_API = "PYTHON-GSSAPI-OLD"
else:
_API = "PYTHON-GSSAPI-NEW"
@@ -118,28 +121,36 @@ class GSSAPITest(KerberosTestCase):
self.assertEquals(0, status)
elif _API == "PYTHON-GSSAPI-NEW":
if self.server_mode:
- gss_flags = (gssapi.RequirementFlag.protection_ready,
- gssapi.RequirementFlag.integrity,
- gssapi.RequirementFlag.mutual_authentication,
- gssapi.RequirementFlag.delegate_to_peer)
+ gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.mutual_authentication,
+ gssapi.RequirementFlag.delegate_to_peer,
+ )
else:
- gss_flags = (gssapi.RequirementFlag.protection_ready,
- gssapi.RequirementFlag.integrity,
- gssapi.RequirementFlag.delegate_to_peer)
+ gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.delegate_to_peer,
+ )
# Initialize a GSS-API context.
krb5_oid = gssapi.MechType.kerberos
- target_name = gssapi.Name("host@" + self.targ_name,
- name_type=gssapi.NameType.hostbased_service)
- gss_ctxt = gssapi.SecurityContext(name=target_name,
- flags=gss_flags,
- mech=krb5_oid,
- usage='initiate')
+ target_name = gssapi.Name(
+ "host@" + self.targ_name,
+ name_type=gssapi.NameType.hostbased_service,
+ )
+ gss_ctxt = gssapi.SecurityContext(
+ name=target_name,
+ flags=gss_flags,
+ mech=krb5_oid,
+ usage="initiate",
+ )
if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.complete
self.assertEquals(False, gss_ctxt_status)
# Accept a GSS-API context.
- gss_srv_ctxt = gssapi.SecurityContext(usage='accept')
+ gss_srv_ctxt = gssapi.SecurityContext(usage="accept")
s_token = gss_srv_ctxt.step(c_token)
gss_ctxt_status = gss_srv_ctxt.complete
self.assertNotEquals(None, s_token)
diff --git a/tests/util.py b/tests/util.py
index be56b37d..cdc835c9 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -13,8 +13,9 @@ def _support(filename):
return join(dirname(realpath(__file__)), filename)
-needs_gssapi = pytest.mark.skipif(not GSS_AUTH_AVAILABLE,
- reason="No GSSAPI to test")
+needs_gssapi = pytest.mark.skipif(
+ not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test"
+)
def needs_builtin(name):
@@ -41,9 +42,11 @@ slow = pytest.mark.slow
#
# ToDo: add a Windows specific implementation?
-if (os.environ.get("K5TEST_USER_PRINC", None) and
- os.environ.get("K5TEST_HOSTNAME", None) and
- os.environ.get("KRB5_KTNAME", None)): # add other vars as needed
+if (
+ os.environ.get("K5TEST_USER_PRINC", None)
+ and os.environ.get("K5TEST_HOSTNAME", None)
+ and os.environ.get("KRB5_KTNAME", None)
+): # add other vars as needed
# The environment provides the required information
class DummyK5Realm(object):
@@ -62,6 +65,8 @@ if (os.environ.get("K5TEST_USER_PRINC", None) and
@classmethod
def tearDownClass(cls):
del cls.realm
+
+
else:
try:
# Try to setup a kerberos environment
@@ -71,22 +76,28 @@ else:
class KerberosTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
- raise unittest.SkipTest('Missing extension package k5test. '
- 'Please run "pip install k5test" '
- 'to install it.')
+ raise unittest.SkipTest(
+ "Missing extension package k5test. "
+ 'Please run "pip install k5test" '
+ "to install it."
+ )
+
def update_env(testcase, mapping, env=os.environ):
"""Modify os.environ during a test case and restore during cleanup."""
saved_env = env.copy()
+
def replace(target, source):
target.update(source)
for k in list(target):
if k not in source:
target.pop(k, None)
+
testcase.addCleanup(replace, env, saved_env)
env.update(mapping)
return testcase
+
def k5shell(args=None):
"""Create a shell with an kerberos environment
@@ -97,6 +108,7 @@ def k5shell(args=None):
import k5test
import atexit
import subprocess
+
k5 = k5test.K5Realm()
atexit.register(k5.stop)
os.environ.update(k5.env)