summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAnselm Kruis <Anselm.Kruis@atos.net>2018-10-05 18:15:24 +0200
committerAnselm Kruis <Anselm.Kruis@atos.net>2018-10-05 18:58:52 +0200
commita36499fd8762a19da43ee16429b148cb89f4d39f (patch)
tree7243ba5466d9dfadd25443820e1cd8f3f4587fa5
parentdb358dc149f7549c147e520bbe5c26b571d899d4 (diff)
Use k5test (if available) to execute GSSAPI related tests
Previously testing of GSSAPI (Kerberos) related functions required an externally provided Kerberos environment. Therefore all GSSAPI tests were skipped. Now the package k5test is used to setup a self-contained Kerberos environment. Because k5test requires the new GSSAPI, this commit also merges pull request #1166 and fixes broken GSSAPI test. If k5test is not available (i.e. on Windows), the tests still get skipped. The test case test_kex_gss.test_2_gsskex_and_auth_rekey is expected to fail.
-rw-r--r--tests/test_gssapi.py41
-rw-r--r--tests/test_kex_gss.py20
-rw-r--r--tests/test_ssh_gss.py15
-rw-r--r--tests/util.py93
4 files changed, 131 insertions, 38 deletions
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index 04304c0f..98d4d14e 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -22,20 +22,19 @@
Test the used APIs for GSS-API / SSPI authentication
"""
-import unittest
import socket
-from .util import needs_gssapi
+from .util import needs_gssapi, KerberosTestCase, update_env
@needs_gssapi
-class GSSAPITest(unittest.TestCase):
- def setup():
- # TODO: these vars should all come from os.environ or whatever the
- # approved pytest method is for runtime-configuring test data.
+class GSSAPITest(KerberosTestCase):
+ def setUp(self):
+ super(GSSAPITest, self).setUp()
self.krb5_mech = "1.2.840.113554.1.2.2"
- self.targ_name = "hostname"
+ self.targ_name = self.realm.hostname
self.server_mode = False
+ update_env(self, self.realm.env)
def test_1_pyasn1(self):
"""
@@ -48,13 +47,14 @@ class GSSAPITest(unittest.TestCase):
mech, __ = decoder.decode(oid)
self.assertEquals(self.krb5_mech, mech.__str__())
- def test_2_gssapi_sspi(self):
+ def _gssapi_sspi_test(self):
"""
Test the used methods of python-gssapi or sspi, sspicon from pywin32.
"""
try:
import gssapi
- if hasattr(gssapi, '__title__') and gssapi.__title__ == 'python-gssapi':
+ if (hasattr(gssapi, '__title__') and
+ gssapi.__title__ == 'python-gssapi'):
_API = "PYTHON-GSSAPI-OLD"
else:
_API = "PYTHON-GSSAPI-NEW"
@@ -117,7 +117,7 @@ class GSSAPITest(unittest.TestCase):
status = gss_srv_ctxt.verify_mic(mic_msg, mic_token)
self.assertEquals(0, status)
elif _API == "PYTHON-GSSAPI-NEW":
- if server_mode:
+ if self.server_mode:
gss_flags = (gssapi.RequirementFlag.protection_ready,
gssapi.RequirementFlag.integrity,
gssapi.RequirementFlag.mutual_authentication,
@@ -128,13 +128,13 @@ class GSSAPITest(unittest.TestCase):
gssapi.RequirementFlag.delegate_to_peer)
# Initialize a GSS-API context.
krb5_oid = gssapi.MechType.kerberos
- target_name = gssapi.Name("host@" + targ_name,
- name_type=gssapi.NameType.hostbased_service)
+ target_name = gssapi.Name("host@" + self.targ_name,
+ name_type=gssapi.NameType.hostbased_service)
gss_ctxt = gssapi.SecurityContext(name=target_name,
flags=gss_flags,
mech=krb5_oid,
usage='initiate')
- if server_mode:
+ if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.complete
self.assertEquals(False, gss_ctxt_status)
@@ -154,7 +154,7 @@ class GSSAPITest(unittest.TestCase):
# Build MIC
mic_token = gss_ctxt.get_signature(mic_msg)
- if server_mode:
+ if self.server_mode:
# Check MIC
status = gss_srv_ctxt.verify_signature(mic_msg, mic_token)
self.assertEquals(0, status)
@@ -190,3 +190,16 @@ class GSSAPITest(unittest.TestCase):
error, token = gss_ctxt.authorize(c_token)
c_token = token[0].Buffer
self.assertNotEquals(0, error)
+
+ def test_2_gssapi_sspi_client(self):
+ """
+ Test the used methods of python-gssapi or sspi, sspicon from pywin32.
+ """
+ self._gssapi_sspi_test()
+
+ def test_3_gssapi_sspi_server(self):
+ """
+ Test the used methods of python-gssapi or sspi, sspicon from pywin32.
+ """
+ self.server_mode = True
+ self._gssapi_sspi_test()
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index c71ff91c..e58be65d 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -31,7 +31,7 @@ import unittest
import paramiko
-from .util import needs_gssapi
+from .util import needs_gssapi, KerberosTestCase, update_env
class NullServer(paramiko.ServerInterface):
@@ -59,21 +59,16 @@ class NullServer(paramiko.ServerInterface):
@needs_gssapi
-class GSSKexTest(unittest.TestCase):
- @staticmethod
- def init(username, hostname):
- global krb5_principal, targ_name
- krb5_principal = username
- targ_name = hostname
-
+class GSSKexTest(KerberosTestCase):
def setUp(self):
- self.username = krb5_principal
- self.hostname = socket.getfqdn(targ_name)
+ self.username = self.realm.user_princ
+ self.hostname = socket.getfqdn(self.realm.hostname)
self.sockl = socket.socket()
- self.sockl.bind((targ_name, 0))
+ self.sockl.bind((self.realm.hostname, 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
+ update_env(self, self.realm.env)
thread = threading.Thread(target=self._run)
thread.start()
@@ -87,7 +82,7 @@ class GSSKexTest(unittest.TestCase):
self.ts = paramiko.Transport(self.socks, gss_kex=True)
host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
self.ts.add_server_key(host_key)
- self.ts.set_gss_host(targ_name)
+ self.ts.set_gss_host(self.realm.hostname)
try:
self.ts.load_server_moduli()
except:
@@ -150,6 +145,7 @@ class GSSKexTest(unittest.TestCase):
"""
self._test_gsskex_and_auth(gss_host=None)
+ @unittest.expectedFailure # to be investigated
def test_2_gsskex_and_auth_rekey(self):
"""
Verify that Paramiko can rekey.
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index b6b50152..d326f522 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -25,11 +25,10 @@ Unit Tests for the GSS-API / SSPI SSHv2 Authentication (gssapi-with-mic)
import socket
import threading
-import unittest
import paramiko
-from .util import _support, needs_gssapi
+from .util import _support, needs_gssapi, KerberosTestCase, update_env
from .test_client import FINGERPRINTS
@@ -67,17 +66,18 @@ class NullServer(paramiko.ServerInterface):
@needs_gssapi
-class GSSAuthTest(unittest.TestCase):
+class GSSAuthTest(KerberosTestCase):
def setUp(self):
# TODO: username and targ_name should come from os.environ or whatever
# the approved pytest method is for runtime-configuring test data.
- self.username = "krb5_principal"
- self.hostname = socket.getfqdn("targ_name")
+ self.username = self.realm.user_princ
+ self.hostname = socket.getfqdn(self.realm.hostname)
self.sockl = socket.socket()
- self.sockl.bind(("targ_name", 0))
+ self.sockl.bind((self.realm.hostname, 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
+ update_env(self, self.realm.env)
thread = threading.Thread(target=self._run)
thread.start()
@@ -148,7 +148,8 @@ class GSSAuthTest(unittest.TestCase):
def test_2_auth_trickledown(self):
"""
- Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
+ Failed gssapi-with-mic auth doesn't prevent subsequent key auth from
+ succeeding
"""
self.hostname = (
"this_host_does_not_exists_and_causes_a_GSSAPI-exception"
diff --git a/tests/util.py b/tests/util.py
index 4ca02374..be56b37d 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -1,19 +1,20 @@
from os.path import dirname, realpath, join
+import os
+import sys
+import unittest
import pytest
from paramiko.py3compat import builtins
+from paramiko.ssh_gss import GSS_AUTH_AVAILABLE
def _support(filename):
return join(dirname(realpath(__file__)), filename)
-# TODO: consider using pytest.importorskip('gssapi') instead? We presumably
-# still need CLI configurability for the Kerberos parameters, though, so can't
-# JUST key off presence of GSSAPI optional dependency...
-# TODO: anyway, s/True/os.environ.get('RUN_GSSAPI', False)/ or something.
-needs_gssapi = pytest.mark.skipif(True, reason="No GSSAPI to test")
+needs_gssapi = pytest.mark.skipif(not GSS_AUTH_AVAILABLE,
+ reason="No GSSAPI to test")
def needs_builtin(name):
@@ -25,3 +26,85 @@ def needs_builtin(name):
slow = pytest.mark.slow
+
+# GSSAPI / Kerberos related tests need a working Kerberos environment.
+# The class `KerberosTestCase` provides such an environment or skips all tests.
+# There are 3 distinct cases:
+#
+# - A Kerberos environment has already been created and the environment
+# contains the required information.
+#
+# - We can use the package 'k5test' to setup an working kerberos environment on
+# the fly.
+#
+# - We skip all tests.
+#
+# ToDo: add a Windows specific implementation?
+
+if (os.environ.get("K5TEST_USER_PRINC", None) and
+ os.environ.get("K5TEST_HOSTNAME", None) and
+ os.environ.get("KRB5_KTNAME", None)): # add other vars as needed
+
+ # The environment provides the required information
+ class DummyK5Realm(object):
+ def __init__(self):
+ for k in os.environ:
+ if not k.startswith("K5TEST_"):
+ continue
+ setattr(self, k[7:].lower(), os.environ[k])
+ self.env = {}
+
+ class KerberosTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.realm = DummyK5Realm()
+
+ @classmethod
+ def tearDownClass(cls):
+ del cls.realm
+else:
+ try:
+ # Try to setup a kerberos environment
+ from k5test import KerberosTestCase
+ except Exception:
+ # Use a dummy, that skips all tests
+ class KerberosTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ raise unittest.SkipTest('Missing extension package k5test. '
+ 'Please run "pip install k5test" '
+ 'to install it.')
+
+def update_env(testcase, mapping, env=os.environ):
+ """Modify os.environ during a test case and restore during cleanup."""
+ saved_env = env.copy()
+ def replace(target, source):
+ target.update(source)
+ for k in list(target):
+ if k not in source:
+ target.pop(k, None)
+ testcase.addCleanup(replace, env, saved_env)
+ env.update(mapping)
+ return testcase
+
+def k5shell(args=None):
+ """Create a shell with an kerberos environment
+
+ This can be used to debug paramiko or to test the old GSSAPI.
+ To test a different GSSAPI, simply activate a suitable venv
+ within the shell.
+ """
+ import k5test
+ import atexit
+ import subprocess
+ k5 = k5test.K5Realm()
+ atexit.register(k5.stop)
+ os.environ.update(k5.env)
+ for n in ("realm", "user_princ", "hostname"):
+ os.environ["K5TEST_" + n.upper()] = getattr(k5, n)
+
+ if not args:
+ args = sys.argv[1:]
+ if not args:
+ args = [os.environ.get("SHELL", "bash")]
+ sys.exit(subprocess.call(args))