summaryrefslogtreecommitdiffhomepage
path: root/tests/test_gssapi.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_gssapi.py')
-rw-r--r--tests/test_gssapi.py44
1 files changed, 26 insertions, 18 deletions
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index d4b632be..d7fbdd53 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -30,6 +30,7 @@ from .util import needs_gssapi
@needs_gssapi
class GSSAPITest(unittest.TestCase):
+
def setup():
# TODO: these vars should all come from os.environ or whatever the
# approved pytest method is for runtime-configuring test data.
@@ -43,6 +44,7 @@ class GSSAPITest(unittest.TestCase):
"""
from pyasn1.type.univ import ObjectIdentifier
from pyasn1.codec.der import encoder, decoder
+
oid = encoder.encode(ObjectIdentifier(self.krb5_mech))
mech, __ = decoder.decode(oid)
self.assertEquals(self.krb5_mech, mech.__str__())
@@ -57,6 +59,7 @@ class GSSAPITest(unittest.TestCase):
except ImportError:
import sspicon
import sspi
+
_API = "SSPI"
c_token = None
@@ -65,23 +68,28 @@ class GSSAPITest(unittest.TestCase):
if _API == "MIT":
if self.server_mode:
- gss_flags = (gssapi.C_PROT_READY_FLAG,
- gssapi.C_INTEG_FLAG,
- gssapi.C_MUTUAL_FLAG,
- gssapi.C_DELEG_FLAG)
+ gss_flags = (
+ gssapi.C_PROT_READY_FLAG,
+ gssapi.C_INTEG_FLAG,
+ gssapi.C_MUTUAL_FLAG,
+ gssapi.C_DELEG_FLAG,
+ )
else:
- gss_flags = (gssapi.C_PROT_READY_FLAG,
- gssapi.C_INTEG_FLAG,
- gssapi.C_DELEG_FLAG)
+ gss_flags = (
+ gssapi.C_PROT_READY_FLAG,
+ gssapi.C_INTEG_FLAG,
+ gssapi.C_DELEG_FLAG,
+ )
# Initialize a GSS-API context.
ctx = gssapi.Context()
ctx.flags = gss_flags
krb5_oid = gssapi.OID.mech_from_string(self.krb5_mech)
- target_name = gssapi.Name("host@" + self.targ_name,
- gssapi.C_NT_HOSTBASED_SERVICE)
- gss_ctxt = gssapi.InitContext(peer_name=target_name,
- mech_type=krb5_oid,
- req_flags=ctx.flags)
+ target_name = gssapi.Name(
+ "host@" + self.targ_name, gssapi.C_NT_HOSTBASED_SERVICE
+ )
+ gss_ctxt = gssapi.InitContext(
+ peer_name=target_name, mech_type=krb5_oid, req_flags=ctx.flags
+ )
if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.established
@@ -108,15 +116,15 @@ class GSSAPITest(unittest.TestCase):
self.assertEquals(0, status)
else:
gss_flags = (
- sspicon.ISC_REQ_INTEGRITY |
- sspicon.ISC_REQ_MUTUAL_AUTH |
- sspicon.ISC_REQ_DELEGATE
+ sspicon.ISC_REQ_INTEGRITY
+ | sspicon.ISC_REQ_MUTUAL_AUTH
+ | sspicon.ISC_REQ_DELEGATE
)
# Initialize a GSS-API context.
target_name = "host/" + socket.getfqdn(self.targ_name)
- gss_ctxt = sspi.ClientAuth("Kerberos",
- scflags=gss_flags,
- targetspn=target_name)
+ gss_ctxt = sspi.ClientAuth(
+ "Kerberos", scflags=gss_flags, targetspn=target_name
+ )
if self.server_mode:
error, token = gss_ctxt.authorize(c_token)
c_token = token[0].Buffer