summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/test_gssapi.py41
-rw-r--r--tests/test_kex_gss.py20
-rw-r--r--tests/test_ssh_gss.py15
-rw-r--r--tests/util.py93
4 files changed, 131 insertions, 38 deletions
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index 04304c0f..98d4d14e 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -22,20 +22,19 @@
Test the used APIs for GSS-API / SSPI authentication
"""
-import unittest
import socket
-from .util import needs_gssapi
+from .util import needs_gssapi, KerberosTestCase, update_env
@needs_gssapi
-class GSSAPITest(unittest.TestCase):
- def setup():
- # TODO: these vars should all come from os.environ or whatever the
- # approved pytest method is for runtime-configuring test data.
+class GSSAPITest(KerberosTestCase):
+ def setUp(self):
+ super(GSSAPITest, self).setUp()
self.krb5_mech = "1.2.840.113554.1.2.2"
- self.targ_name = "hostname"
+ self.targ_name = self.realm.hostname
self.server_mode = False
+ update_env(self, self.realm.env)
def test_1_pyasn1(self):
"""
@@ -48,13 +47,14 @@ class GSSAPITest(unittest.TestCase):
mech, __ = decoder.decode(oid)
self.assertEquals(self.krb5_mech, mech.__str__())
- def test_2_gssapi_sspi(self):
+ def _gssapi_sspi_test(self):
"""
Test the used methods of python-gssapi or sspi, sspicon from pywin32.
"""
try:
import gssapi
- if hasattr(gssapi, '__title__') and gssapi.__title__ == 'python-gssapi':
+ if (hasattr(gssapi, '__title__') and
+ gssapi.__title__ == 'python-gssapi'):
_API = "PYTHON-GSSAPI-OLD"
else:
_API = "PYTHON-GSSAPI-NEW"
@@ -117,7 +117,7 @@ class GSSAPITest(unittest.TestCase):
status = gss_srv_ctxt.verify_mic(mic_msg, mic_token)
self.assertEquals(0, status)
elif _API == "PYTHON-GSSAPI-NEW":
- if server_mode:
+ if self.server_mode:
gss_flags = (gssapi.RequirementFlag.protection_ready,
gssapi.RequirementFlag.integrity,
gssapi.RequirementFlag.mutual_authentication,
@@ -128,13 +128,13 @@ class GSSAPITest(unittest.TestCase):
gssapi.RequirementFlag.delegate_to_peer)
# Initialize a GSS-API context.
krb5_oid = gssapi.MechType.kerberos
- target_name = gssapi.Name("host@" + targ_name,
- name_type=gssapi.NameType.hostbased_service)
+ target_name = gssapi.Name("host@" + self.targ_name,
+ name_type=gssapi.NameType.hostbased_service)
gss_ctxt = gssapi.SecurityContext(name=target_name,
flags=gss_flags,
mech=krb5_oid,
usage='initiate')
- if server_mode:
+ if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.complete
self.assertEquals(False, gss_ctxt_status)
@@ -154,7 +154,7 @@ class GSSAPITest(unittest.TestCase):
# Build MIC
mic_token = gss_ctxt.get_signature(mic_msg)
- if server_mode:
+ if self.server_mode:
# Check MIC
status = gss_srv_ctxt.verify_signature(mic_msg, mic_token)
self.assertEquals(0, status)
@@ -190,3 +190,16 @@ class GSSAPITest(unittest.TestCase):
error, token = gss_ctxt.authorize(c_token)
c_token = token[0].Buffer
self.assertNotEquals(0, error)
+
+ def test_2_gssapi_sspi_client(self):
+ """
+ Test the used methods of python-gssapi or sspi, sspicon from pywin32.
+ """
+ self._gssapi_sspi_test()
+
+ def test_3_gssapi_sspi_server(self):
+ """
+ Test the used methods of python-gssapi or sspi, sspicon from pywin32.
+ """
+ self.server_mode = True
+ self._gssapi_sspi_test()
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index c71ff91c..e58be65d 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -31,7 +31,7 @@ import unittest
import paramiko
-from .util import needs_gssapi
+from .util import needs_gssapi, KerberosTestCase, update_env
class NullServer(paramiko.ServerInterface):
@@ -59,21 +59,16 @@ class NullServer(paramiko.ServerInterface):
@needs_gssapi
-class GSSKexTest(unittest.TestCase):
- @staticmethod
- def init(username, hostname):
- global krb5_principal, targ_name
- krb5_principal = username
- targ_name = hostname
-
+class GSSKexTest(KerberosTestCase):
def setUp(self):
- self.username = krb5_principal
- self.hostname = socket.getfqdn(targ_name)
+ self.username = self.realm.user_princ
+ self.hostname = socket.getfqdn(self.realm.hostname)
self.sockl = socket.socket()
- self.sockl.bind((targ_name, 0))
+ self.sockl.bind((self.realm.hostname, 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
+ update_env(self, self.realm.env)
thread = threading.Thread(target=self._run)
thread.start()
@@ -87,7 +82,7 @@ class GSSKexTest(unittest.TestCase):
self.ts = paramiko.Transport(self.socks, gss_kex=True)
host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
self.ts.add_server_key(host_key)
- self.ts.set_gss_host(targ_name)
+ self.ts.set_gss_host(self.realm.hostname)
try:
self.ts.load_server_moduli()
except:
@@ -150,6 +145,7 @@ class GSSKexTest(unittest.TestCase):
"""
self._test_gsskex_and_auth(gss_host=None)
+ @unittest.expectedFailure # to be investigated
def test_2_gsskex_and_auth_rekey(self):
"""
Verify that Paramiko can rekey.
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index b6b50152..d326f522 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -25,11 +25,10 @@ Unit Tests for the GSS-API / SSPI SSHv2 Authentication (gssapi-with-mic)
import socket
import threading
-import unittest
import paramiko
-from .util import _support, needs_gssapi
+from .util import _support, needs_gssapi, KerberosTestCase, update_env
from .test_client import FINGERPRINTS
@@ -67,17 +66,18 @@ class NullServer(paramiko.ServerInterface):
@needs_gssapi
-class GSSAuthTest(unittest.TestCase):
+class GSSAuthTest(KerberosTestCase):
def setUp(self):
# TODO: username and targ_name should come from os.environ or whatever
# the approved pytest method is for runtime-configuring test data.
- self.username = "krb5_principal"
- self.hostname = socket.getfqdn("targ_name")
+ self.username = self.realm.user_princ
+ self.hostname = socket.getfqdn(self.realm.hostname)
self.sockl = socket.socket()
- self.sockl.bind(("targ_name", 0))
+ self.sockl.bind((self.realm.hostname, 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
+ update_env(self, self.realm.env)
thread = threading.Thread(target=self._run)
thread.start()
@@ -148,7 +148,8 @@ class GSSAuthTest(unittest.TestCase):
def test_2_auth_trickledown(self):
"""
- Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
+ Failed gssapi-with-mic auth doesn't prevent subsequent key auth from
+ succeeding
"""
self.hostname = (
"this_host_does_not_exists_and_causes_a_GSSAPI-exception"
diff --git a/tests/util.py b/tests/util.py
index 4ca02374..be56b37d 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -1,19 +1,20 @@
from os.path import dirname, realpath, join
+import os
+import sys
+import unittest
import pytest
from paramiko.py3compat import builtins
+from paramiko.ssh_gss import GSS_AUTH_AVAILABLE
def _support(filename):
return join(dirname(realpath(__file__)), filename)
-# TODO: consider using pytest.importorskip('gssapi') instead? We presumably
-# still need CLI configurability for the Kerberos parameters, though, so can't
-# JUST key off presence of GSSAPI optional dependency...
-# TODO: anyway, s/True/os.environ.get('RUN_GSSAPI', False)/ or something.
-needs_gssapi = pytest.mark.skipif(True, reason="No GSSAPI to test")
+needs_gssapi = pytest.mark.skipif(not GSS_AUTH_AVAILABLE,
+ reason="No GSSAPI to test")
def needs_builtin(name):
@@ -25,3 +26,85 @@ def needs_builtin(name):
slow = pytest.mark.slow
+
+# GSSAPI / Kerberos related tests need a working Kerberos environment.
+# The class `KerberosTestCase` provides such an environment or skips all tests.
+# There are 3 distinct cases:
+#
+# - A Kerberos environment has already been created and the environment
+# contains the required information.
+#
+# - We can use the package 'k5test' to setup an working kerberos environment on
+# the fly.
+#
+# - We skip all tests.
+#
+# ToDo: add a Windows specific implementation?
+
+if (os.environ.get("K5TEST_USER_PRINC", None) and
+ os.environ.get("K5TEST_HOSTNAME", None) and
+ os.environ.get("KRB5_KTNAME", None)): # add other vars as needed
+
+ # The environment provides the required information
+ class DummyK5Realm(object):
+ def __init__(self):
+ for k in os.environ:
+ if not k.startswith("K5TEST_"):
+ continue
+ setattr(self, k[7:].lower(), os.environ[k])
+ self.env = {}
+
+ class KerberosTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.realm = DummyK5Realm()
+
+ @classmethod
+ def tearDownClass(cls):
+ del cls.realm
+else:
+ try:
+ # Try to setup a kerberos environment
+ from k5test import KerberosTestCase
+ except Exception:
+ # Use a dummy, that skips all tests
+ class KerberosTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ raise unittest.SkipTest('Missing extension package k5test. '
+ 'Please run "pip install k5test" '
+ 'to install it.')
+
+def update_env(testcase, mapping, env=os.environ):
+ """Modify os.environ during a test case and restore during cleanup."""
+ saved_env = env.copy()
+ def replace(target, source):
+ target.update(source)
+ for k in list(target):
+ if k not in source:
+ target.pop(k, None)
+ testcase.addCleanup(replace, env, saved_env)
+ env.update(mapping)
+ return testcase
+
+def k5shell(args=None):
+ """Create a shell with an kerberos environment
+
+ This can be used to debug paramiko or to test the old GSSAPI.
+ To test a different GSSAPI, simply activate a suitable venv
+ within the shell.
+ """
+ import k5test
+ import atexit
+ import subprocess
+ k5 = k5test.K5Realm()
+ atexit.register(k5.stop)
+ os.environ.update(k5.env)
+ for n in ("realm", "user_princ", "hostname"):
+ os.environ["K5TEST_" + n.upper()] = getattr(k5, n)
+
+ if not args:
+ args = sys.argv[1:]
+ if not args:
+ args = [os.environ.get("SHELL", "bash")]
+ sys.exit(subprocess.call(args))