summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--.travis.yml23
-rw-r--r--README.rst7
-rw-r--r--paramiko/ssh_gss.py220
-rw-r--r--setup.py7
-rw-r--r--sites/docs/api/ssh_gss.rst5
-rw-r--r--sites/www/changelog.rst14
-rw-r--r--sites/www/installing.rst45
-rw-r--r--tests/test_gssapi.py87
-rw-r--r--tests/test_kex_gss.py22
-rw-r--r--tests/test_ssh_gss.py14
-rw-r--r--tests/util.py105
11 files changed, 489 insertions, 60 deletions
diff --git a/.travis.yml b/.travis.yml
index 41c074bb..84b73bd6 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -23,6 +23,10 @@ matrix:
env: "OLDEST_CRYPTO=2.5"
- python: 3.7
env: "OLDEST_CRYPTO=2.5"
+ - python: 2.7
+ env: "USE_K5TEST=yes"
+ - python: 3.7
+ env: "USE_K5TEST=yes"
install:
# Ensure modern pip/etc to avoid some issues w/ older worker environs
- pip install pip==9.0.1 setuptools==36.6.0
@@ -38,6 +42,25 @@ install:
# TODO: use pipenv + whatever contexty-type stuff it has
- pip install codecov # For codecov specifically
- pip install -r dev-requirements.txt
+ - |
+ if [[ -n "$USE_K5TEST" ]]; then
+ # we need a few commands and libraries
+ # Debian/Ubuntu package: commands used by package k5test
+ # libkrb5-dev: krb5-config
+ # krb5-kdc: kdb5_util, krb5kdc
+ # krb5-admin-server: kadmin.local, kprop, kadmind
+ # krb5-user: kinit, klist
+ #
+ # krb5-multidev: required to build gssapi
+ sudo apt-get -y install libkrb5-dev krb5-admin-server \
+ krb5-kdc krb5-user krb5-multidev && \
+ pip install k5test gssapi pyasn1
+ fi
+ # In case of problems uncomment the following to get the krb environment
+ # - |
+ # if [[ -n "$USE_K5TEST" ]]; then
+ # python -c 'from tests.util import k5shell; k5shell()' env | sort
+ # fi
script:
# Fast syntax check failures for more rapid feedback to submitters
# (Travis-oriented metatask that version checks Python, installs, runs.)
diff --git a/README.rst b/README.rst
index c918652f..72861d18 100644
--- a/README.rst
+++ b/README.rst
@@ -136,3 +136,10 @@ There are also unit tests here::
$ pytest
Which will verify that most of the core components are working correctly.
+
+To test Kerberos/GSSAPI, you need a Kerberos environment. On UNIX you can
+use the package k5test to setup a Kerberos environment on the fly::
+
+ $ pip install -r dev-requirements.txt
+ $ pip install k5test gssapi pyasn1
+ $ pytest
diff --git a/paramiko/ssh_gss.py b/paramiko/ssh_gss.py
index 3f299aee..eaafe94b 100644
--- a/paramiko/ssh_gss.py
+++ b/paramiko/ssh_gss.py
@@ -43,12 +43,21 @@ GSS_EXCEPTIONS = ()
#: :var str _API: Constraint for the used API
-_API = "MIT"
+_API = None
try:
import gssapi
- GSS_EXCEPTIONS = (gssapi.GSSException,)
+ if hasattr(gssapi, "__title__") and gssapi.__title__ == "python-gssapi":
+ # old, unmaintained python-gssapi package
+ _API = "MIT" # keep this for compatibility
+ GSS_EXCEPTIONS = (gssapi.GSSException,)
+ else:
+ _API = "PYTHON-GSSAPI-NEW"
+ GSS_EXCEPTIONS = (
+ gssapi.exceptions.GeneralError,
+ gssapi.raw.misc.GSSError,
+ )
except (ImportError, OSError):
try:
import pywintypes
@@ -63,6 +72,7 @@ except (ImportError, OSError):
from paramiko.common import MSG_USERAUTH_REQUEST
from paramiko.ssh_exception import SSHException
+from paramiko._version import __version_info__
def GSSAuth(auth_method, gss_deleg_creds=True):
@@ -73,21 +83,24 @@ def GSSAuth(auth_method, gss_deleg_creds=True):
(gssapi-with-mic or gss-keyex)
:param bool gss_deleg_creds: Delegate client credentials or not.
We delegate credentials by default.
- :return: Either an `._SSH_GSSAPI` (Unix) object or an
- `_SSH_SSPI` (Windows) object
+ :return: Either an `._SSH_GSSAPI_OLD` or `._SSH_GSSAPI_NEW` (Unix)
+ object or an `_SSH_SSPI` (Windows) object
+ :rtype: object
:raises: ``ImportError`` -- If no GSS-API / SSPI module could be imported.
:see: `RFC 4462 <http://www.ietf.org/rfc/rfc4462.txt>`_
- :note: Check for the available API and return either an `._SSH_GSSAPI`
- (MIT GSSAPI) object or an `._SSH_SSPI` (MS SSPI) object. If you
- get python-gssapi working on Windows, python-gssapi
- will be used and a `._SSH_GSSAPI` object will be returned.
+ :note: Check for the available API and return either an `._SSH_GSSAPI_OLD`
+ (MIT GSSAPI using python-gssapi package) object, an
+ `._SSH_GSSAPI_NEW` (MIT GSSAPI using gssapi package) object
+ or an `._SSH_SSPI` (MS SSPI) object.
If there is no supported API available,
``None`` will be returned.
"""
if _API == "MIT":
- return _SSH_GSSAPI(auth_method, gss_deleg_creds)
+ return _SSH_GSSAPI_OLD(auth_method, gss_deleg_creds)
+ elif _API == "PYTHON-GSSAPI-NEW":
+ return _SSH_GSSAPI_NEW(auth_method, gss_deleg_creds)
elif _API == "SSPI" and os.name == "nt":
return _SSH_SSPI(auth_method, gss_deleg_creds)
else:
@@ -96,8 +109,8 @@ def GSSAuth(auth_method, gss_deleg_creds=True):
class _SSH_GSSAuth(object):
"""
- Contains the shared variables and methods of `._SSH_GSSAPI` and
- `._SSH_SSPI`.
+ Contains the shared variables and methods of `._SSH_GSSAPI_OLD`,
+ `._SSH_GSSAPI_NEW` and `._SSH_SSPI`.
"""
def __init__(self, auth_method, gss_deleg_creds):
@@ -223,9 +236,10 @@ class _SSH_GSSAuth(object):
return mic
-class _SSH_GSSAPI(_SSH_GSSAuth):
+class _SSH_GSSAPI_OLD(_SSH_GSSAuth):
"""
- Implementation of the GSS-API MIT Kerberos Authentication for SSH2.
+ Implementation of the GSS-API MIT Kerberos Authentication for SSH2,
+ using the older (unmaintained) python-gssapi package.
:see: `.GSSAuth`
"""
@@ -402,6 +416,186 @@ class _SSH_GSSAPI(_SSH_GSSAuth):
raise NotImplementedError
+if __version_info__ < (2, 5):
+ # provide the old name for strict backward compatibility
+ _SSH_GSSAPI = _SSH_GSSAPI_OLD
+
+
+class _SSH_GSSAPI_NEW(_SSH_GSSAuth):
+ """
+ Implementation of the GSS-API MIT Kerberos Authentication for SSH2,
+ using the newer, currently maintained gssapi package.
+
+ :see: `.GSSAuth`
+ """
+
+ def __init__(self, auth_method, gss_deleg_creds):
+ """
+ :param str auth_method: The name of the SSH authentication mechanism
+ (gssapi-with-mic or gss-keyex)
+ :param bool gss_deleg_creds: Delegate client credentials or not
+ """
+ _SSH_GSSAuth.__init__(self, auth_method, gss_deleg_creds)
+
+ if self._gss_deleg_creds:
+ self._gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.mutual_authentication,
+ gssapi.RequirementFlag.delegate_to_peer,
+ )
+ else:
+ self._gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.mutual_authentication,
+ )
+
+ def ssh_init_sec_context(
+ self, target, desired_mech=None, username=None, recv_token=None
+ ):
+ """
+ Initialize a GSS-API context.
+
+ :param str username: The name of the user who attempts to login
+ :param str target: The hostname of the target to connect to
+ :param str desired_mech: The negotiated GSS-API mechanism
+ ("pseudo negotiated" mechanism, because we
+ support just the krb5 mechanism :-))
+ :param str recv_token: The GSS-API token received from the Server
+ :raises: `.SSHException` -- Is raised if the desired mechanism of the
+ client is not supported
+ :raises: ``gssapi.exceptions.GSSError`` if there is an error signaled
+ by the GSS-API implementation
+ :return: A ``String`` if the GSS-API has returned a token or ``None``
+ if no token was returned
+ """
+ from pyasn1.codec.der import decoder
+
+ self._username = username
+ self._gss_host = target
+ targ_name = gssapi.Name(
+ "host@" + self._gss_host,
+ name_type=gssapi.NameType.hostbased_service,
+ )
+ if desired_mech is not None:
+ mech, __ = decoder.decode(desired_mech)
+ if mech.__str__() != self._krb5_mech:
+ raise SSHException("Unsupported mechanism OID.")
+ krb5_mech = gssapi.MechType.kerberos
+ token = None
+ if recv_token is None:
+ self._gss_ctxt = gssapi.SecurityContext(
+ name=targ_name,
+ flags=self._gss_flags,
+ mech=krb5_mech,
+ usage="initiate",
+ )
+ token = self._gss_ctxt.step(token)
+ else:
+ token = self._gss_ctxt.step(recv_token)
+ self._gss_ctxt_status = self._gss_ctxt.complete
+ return token
+
+ def ssh_get_mic(self, session_id, gss_kex=False):
+ """
+ Create the MIC token for a SSH2 message.
+
+ :param str session_id: The SSH session ID
+ :param bool gss_kex: Generate the MIC for GSS-API Key Exchange or not
+ :return: gssapi-with-mic:
+ Returns the MIC token from GSS-API for the message we created
+ with ``_ssh_build_mic``.
+ gssapi-keyex:
+ Returns the MIC token from GSS-API with the SSH session ID as
+ message.
+ :rtype: str
+ """
+ self._session_id = session_id
+ if not gss_kex:
+ mic_field = self._ssh_build_mic(
+ self._session_id,
+ self._username,
+ self._service,
+ self._auth_method,
+ )
+ mic_token = self._gss_ctxt.get_signature(mic_field)
+ else:
+ # for key exchange with gssapi-keyex
+ mic_token = self._gss_srv_ctxt.get_signature(self._session_id)
+ return mic_token
+
+ def ssh_accept_sec_context(self, hostname, recv_token, username=None):
+ """
+ Accept a GSS-API context (server mode).
+
+ :param str hostname: The servers hostname
+ :param str username: The name of the user who attempts to login
+ :param str recv_token: The GSS-API Token received from the server,
+ if it's not the initial call.
+ :return: A ``String`` if the GSS-API has returned a token or ``None``
+ if no token was returned
+ """
+ # hostname and username are not required for GSSAPI, but for SSPI
+ self._gss_host = hostname
+ self._username = username
+ if self._gss_srv_ctxt is None:
+ self._gss_srv_ctxt = gssapi.SecurityContext(usage="accept")
+ token = self._gss_srv_ctxt.step(recv_token)
+ self._gss_srv_ctxt_status = self._gss_srv_ctxt.complete
+ return token
+
+ def ssh_check_mic(self, mic_token, session_id, username=None):
+ """
+ Verify the MIC token for a SSH2 message.
+
+ :param str mic_token: The MIC token received from the client
+ :param str session_id: The SSH session ID
+ :param str username: The name of the user who attempts to login
+ :return: None if the MIC check was successful
+ :raises: ``gssapi.exceptions.GSSError`` -- if the MIC check failed
+ """
+ self._session_id = session_id
+ self._username = username
+ if self._username is not None:
+ # server mode
+ mic_field = self._ssh_build_mic(
+ self._session_id,
+ self._username,
+ self._service,
+ self._auth_method,
+ )
+ self._gss_srv_ctxt.verify_signature(mic_field, mic_token)
+ else:
+ # for key exchange with gssapi-keyex
+ # client mode
+ self._gss_ctxt.verify_signature(self._session_id, mic_token)
+
+ @property
+ def credentials_delegated(self):
+ """
+ Checks if credentials are delegated (server mode).
+
+ :return: ``True`` if credentials are delegated, otherwise ``False``
+ :rtype: bool
+ """
+ if self._gss_srv_ctxt.delegated_creds is not None:
+ return True
+ return False
+
+ def save_client_creds(self, client_token):
+ """
+ Save the Client token in a file. This is used by the SSH server
+ to store the client credentials if credentials are delegated
+ (server mode).
+
+ :param str client_token: The GSS-API token received form the client
+ :raises: ``NotImplementedError`` -- Credential delegation is currently
+ not supported in server mode
+ """
+ raise NotImplementedError
+
+
class _SSH_SSPI(_SSH_GSSAuth):
"""
Implementation of the Microsoft SSPI Kerberos Authentication for SSH2.
diff --git a/setup.py b/setup.py
index e6c4a077..cf063c44 100644
--- a/setup.py
+++ b/setup.py
@@ -74,4 +74,11 @@ setup(
"Programming Language :: Python :: 3.8",
],
install_requires=["bcrypt>=3.1.3", "cryptography>=2.5", "pynacl>=1.0.1"],
+ extras_require={
+ "gssapi": [
+ "pyasn1>=0.1.7",
+ 'gssapi>=1.4.1;platform_system!="Windows"',
+ 'pywin32>=2.1.8;platform_system=="Windows"',
+ ]
+ },
)
diff --git a/sites/docs/api/ssh_gss.rst b/sites/docs/api/ssh_gss.rst
index 7a687e11..155fcfff 100644
--- a/sites/docs/api/ssh_gss.rst
+++ b/sites/docs/api/ssh_gss.rst
@@ -7,7 +7,10 @@ GSS-API authentication
.. autoclass:: _SSH_GSSAuth
:member-order: bysource
-.. autoclass:: _SSH_GSSAPI
+.. autoclass:: _SSH_GSSAPI_OLD
+ :member-order: bysource
+
+.. autoclass:: _SSH_GSSAPI_NEW
:member-order: bysource
.. autoclass:: _SSH_SSPI
diff --git a/sites/www/changelog.rst b/sites/www/changelog.rst
index 8838f5aa..67242b22 100644
--- a/sites/www/changelog.rst
+++ b/sites/www/changelog.rst
@@ -2,6 +2,20 @@
Changelog
=========
+- :support:`1311` (for :issue:`584`, replacing :issue:`1166`) Add
+ backwards-compatible support for the ``gssapi`` GSSAPI library, as the
+ previous backend (``python-gssapi``) has since become defunct. This change
+ also includes tests for the GSSAPI functionality.
+
+ Big thanks to Anselm Kruis for the patch and to Sebastian Deiß (author of our
+ initial GSSAPI functionality) for review.
+
+ .. note::
+ To be very clear, this patch **does not** remove support for the older
+ ``python-gssapi`` library. We *may* remove that support in a later release,
+ but for now, either library will work. Please upgrade to ``gssapi`` when
+ you can, however, as ``python-gssapi`` is no longer maintained upstream.
+
- :bug:`322 major` `SSHClient.exec_command
<paramiko.client.SSHClient.exec_command>` previously returned a naive
`~paramiko.channel.ChannelFile` object for its ``stdin`` value; such objects
diff --git a/sites/www/installing.rst b/sites/www/installing.rst
index 3631eb0d..cffdba5f 100644
--- a/sites/www/installing.rst
+++ b/sites/www/installing.rst
@@ -95,22 +95,41 @@ In general, you'll need one of the following setups:
Optional dependencies for GSS-API / SSPI / Kerberos
===================================================
-In order to use GSS-API/Kerberos & related functionality, a couple of
-additional dependencies are required (these are not listed in our ``setup.py``
-due to their infrequent utility & non-platform-agnostic requirements):
-
-* It hopefully goes without saying but **all platforms** need **a working
- installation of GSS-API itself**, e.g. Heimdal.
-* **Unix** needs `python-gssapi <https://pypi.org/project/python-gssapi/>`_
- ``0.6.1`` or better.
-
- .. note:: This library appears to only function on Python 2.7 and up.
-
-* **Windows** needs `pywin32 <https://pypi.python.org/pypi/pywin32>`_ ``2.1.8``
- or better.
+In order to use GSS-API/Kerberos & related functionality, additional
+dependencies are required. It hopefully goes without saying but **all
+platforms** need **a working installation of GSS-API itself**, e.g. Heimdal.
.. note::
If you use Microsoft SSPI for kerberos authentication and credential
delegation, make sure that the target host is trusted for delegation in the
active directory configuration. For details see:
http://technet.microsoft.com/en-us/library/cc738491%28v=ws.10%29.aspx
+
+The ``gssapi`` "extra" install flavor
+-------------------------------------
+
+If you're installing via ``pip`` (recommended), you should be able to get the
+optional Python package requirements by changing your installation to refer to
+``paramiko[gssapi]`` (from simply ``paramiko``), e.g.::
+
+ pip install "paramiko[gssapi]"
+
+(Or update your ``requirements.txt``, or etc.)
+
+Manual dependency installation
+------------------------------
+
+If you're not using ``pip`` or your ``pip`` is too old to support the "extras"
+functionality, the optional dependencies are as follows:
+
+* All platforms need `pyasn1 <https://pypi.org/project/pyasn1/>`_ ``0.1.7`` or
+ later.
+* **Unix** needs: `gssapi <https://pypi.org/project/gssapi/>`__ ``1.4.1`` or better.
+
+ * An alternative is the `python-gssapi
+ <https://pypi.org/project/python-gssapi/>`_ library (``0.6.1`` or above),
+ though it is no longer maintained upstream, and Paramiko's support for
+ its API may eventually become deprecated.
+
+* **Windows** needs `pywin32 <https://pypi.python.org/pypi/pywin32>`_ ``2.1.8``
+ or better.
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index 46d5bbd1..30ffb56d 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -22,20 +22,21 @@
Test the used APIs for GSS-API / SSPI authentication
"""
-import unittest
import socket
-from .util import needs_gssapi
+from .util import needs_gssapi, KerberosTestCase, update_env
@needs_gssapi
-class GSSAPITest(unittest.TestCase):
- def setup(self):
+class GSSAPITest(KerberosTestCase):
+ def setUp(self):
+ super(GSSAPITest, self).setUp()
# TODO: these vars should all come from os.environ or whatever the
# approved pytest method is for runtime-configuring test data.
self.krb5_mech = "1.2.840.113554.1.2.2"
- self.targ_name = "hostname"
+ self.targ_name = self.realm.hostname
self.server_mode = False
+ update_env(self, self.realm.env)
def test_pyasn1(self):
"""
@@ -48,13 +49,20 @@ class GSSAPITest(unittest.TestCase):
mech, __ = decoder.decode(oid)
self.assertEquals(self.krb5_mech, mech.__str__())
- def test_gssapi_sspi(self):
+ def _gssapi_sspi_test(self):
"""
Test the used methods of python-gssapi or sspi, sspicon from pywin32.
"""
- _API = "MIT"
try:
import gssapi
+
+ if (
+ hasattr(gssapi, "__title__")
+ and gssapi.__title__ == "python-gssapi"
+ ):
+ _API = "PYTHON-GSSAPI-OLD"
+ else:
+ _API = "PYTHON-GSSAPI-NEW"
except ImportError:
import sspicon
import sspi
@@ -65,7 +73,7 @@ class GSSAPITest(unittest.TestCase):
gss_ctxt_status = False
mic_msg = b"G'day Mate!"
- if _API == "MIT":
+ if _API == "PYTHON-GSSAPI-OLD":
if self.server_mode:
gss_flags = (
gssapi.C_PROT_READY_FLAG,
@@ -113,6 +121,56 @@ class GSSAPITest(unittest.TestCase):
# Check MIC
status = gss_srv_ctxt.verify_mic(mic_msg, mic_token)
self.assertEquals(0, status)
+ elif _API == "PYTHON-GSSAPI-NEW":
+ if self.server_mode:
+ gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.mutual_authentication,
+ gssapi.RequirementFlag.delegate_to_peer,
+ )
+ else:
+ gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.delegate_to_peer,
+ )
+ # Initialize a GSS-API context.
+ krb5_oid = gssapi.MechType.kerberos
+ target_name = gssapi.Name(
+ "host@" + self.targ_name,
+ name_type=gssapi.NameType.hostbased_service,
+ )
+ gss_ctxt = gssapi.SecurityContext(
+ name=target_name,
+ flags=gss_flags,
+ mech=krb5_oid,
+ usage="initiate",
+ )
+ if self.server_mode:
+ c_token = gss_ctxt.step(c_token)
+ gss_ctxt_status = gss_ctxt.complete
+ self.assertEquals(False, gss_ctxt_status)
+ # Accept a GSS-API context.
+ gss_srv_ctxt = gssapi.SecurityContext(usage="accept")
+ s_token = gss_srv_ctxt.step(c_token)
+ gss_ctxt_status = gss_srv_ctxt.complete
+ self.assertNotEquals(None, s_token)
+ self.assertEquals(True, gss_ctxt_status)
+ # Establish the client context
+ c_token = gss_ctxt.step(s_token)
+ self.assertEquals(None, c_token)
+ else:
+ while not gss_ctxt.complete:
+ c_token = gss_ctxt.step(c_token)
+ self.assertNotEquals(None, c_token)
+ # Build MIC
+ mic_token = gss_ctxt.get_signature(mic_msg)
+
+ if self.server_mode:
+ # Check MIC
+ status = gss_srv_ctxt.verify_signature(mic_msg, mic_token)
+ self.assertEquals(0, status)
else:
gss_flags = (
sspicon.ISC_REQ_INTEGRITY
@@ -145,3 +203,16 @@ class GSSAPITest(unittest.TestCase):
error, token = gss_ctxt.authorize(c_token)
c_token = token[0].Buffer
self.assertNotEquals(0, error)
+
+ def test_2_gssapi_sspi_client(self):
+ """
+ Test the used methods of python-gssapi or sspi, sspicon from pywin32.
+ """
+ self._gssapi_sspi_test()
+
+ def test_3_gssapi_sspi_server(self):
+ """
+ Test the used methods of python-gssapi or sspi, sspicon from pywin32.
+ """
+ self.server_mode = True
+ self._gssapi_sspi_test()
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index 42e0a101..b0e3970b 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -31,7 +31,7 @@ import unittest
import paramiko
-from .util import needs_gssapi
+from .util import needs_gssapi, KerberosTestCase, update_env
class NullServer(paramiko.ServerInterface):
@@ -53,27 +53,22 @@ class NullServer(paramiko.ServerInterface):
return paramiko.OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != "yes":
+ if command != b"yes":
return False
return True
@needs_gssapi
-class GSSKexTest(unittest.TestCase):
- @staticmethod
- def init(username, hostname):
- global krb5_principal, targ_name
- krb5_principal = username
- targ_name = hostname
-
+class GSSKexTest(KerberosTestCase):
def setUp(self):
- self.username = krb5_principal
- self.hostname = socket.getfqdn(targ_name)
+ self.username = self.realm.user_princ
+ self.hostname = socket.getfqdn(self.realm.hostname)
self.sockl = socket.socket()
- self.sockl.bind((targ_name, 0))
+ self.sockl.bind((self.realm.hostname, 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
+ update_env(self, self.realm.env)
thread = threading.Thread(target=self._run)
thread.start()
@@ -87,7 +82,7 @@ class GSSKexTest(unittest.TestCase):
self.ts = paramiko.Transport(self.socks, gss_kex=True)
host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
self.ts.add_server_key(host_key)
- self.ts.set_gss_host(targ_name)
+ self.ts.set_gss_host(self.realm.hostname)
try:
self.ts.load_server_moduli()
except:
@@ -150,6 +145,7 @@ class GSSKexTest(unittest.TestCase):
"""
self._test_gsskex_and_auth(gss_host=None)
+ @unittest.expectedFailure # to be investigated, see https://github.com/paramiko/paramiko/issues/1312
def test_gsskex_and_auth_rekey(self):
"""
Verify that Paramiko can rekey.
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index d8c151f9..92801c20 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -25,11 +25,10 @@ Unit Tests for the GSS-API / SSPI SSHv2 Authentication (gssapi-with-mic)
import socket
import threading
-import unittest
import paramiko
-from .util import _support, needs_gssapi
+from .util import _support, needs_gssapi, KerberosTestCase, update_env
from .test_client import FINGERPRINTS
@@ -61,23 +60,24 @@ class NullServer(paramiko.ServerInterface):
return paramiko.OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != "yes":
+ if command != b"yes":
return False
return True
@needs_gssapi
-class GSSAuthTest(unittest.TestCase):
+class GSSAuthTest(KerberosTestCase):
def setUp(self):
# TODO: username and targ_name should come from os.environ or whatever
# the approved pytest method is for runtime-configuring test data.
- self.username = "krb5_principal"
- self.hostname = socket.getfqdn("targ_name")
+ self.username = self.realm.user_princ
+ self.hostname = socket.getfqdn(self.realm.hostname)
self.sockl = socket.socket()
- self.sockl.bind(("targ_name", 0))
+ self.sockl.bind((self.realm.hostname, 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
+ update_env(self, self.realm.env)
thread = threading.Thread(target=self._run)
thread.start()
diff --git a/tests/util.py b/tests/util.py
index 4ca02374..cdc835c9 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -1,19 +1,21 @@
from os.path import dirname, realpath, join
+import os
+import sys
+import unittest
import pytest
from paramiko.py3compat import builtins
+from paramiko.ssh_gss import GSS_AUTH_AVAILABLE
def _support(filename):
return join(dirname(realpath(__file__)), filename)
-# TODO: consider using pytest.importorskip('gssapi') instead? We presumably
-# still need CLI configurability for the Kerberos parameters, though, so can't
-# JUST key off presence of GSSAPI optional dependency...
-# TODO: anyway, s/True/os.environ.get('RUN_GSSAPI', False)/ or something.
-needs_gssapi = pytest.mark.skipif(True, reason="No GSSAPI to test")
+needs_gssapi = pytest.mark.skipif(
+ not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test"
+)
def needs_builtin(name):
@@ -25,3 +27,96 @@ def needs_builtin(name):
slow = pytest.mark.slow
+
+# GSSAPI / Kerberos related tests need a working Kerberos environment.
+# The class `KerberosTestCase` provides such an environment or skips all tests.
+# There are 3 distinct cases:
+#
+# - A Kerberos environment has already been created and the environment
+# contains the required information.
+#
+# - We can use the package 'k5test' to setup an working kerberos environment on
+# the fly.
+#
+# - We skip all tests.
+#
+# ToDo: add a Windows specific implementation?
+
+if (
+ os.environ.get("K5TEST_USER_PRINC", None)
+ and os.environ.get("K5TEST_HOSTNAME", None)
+ and os.environ.get("KRB5_KTNAME", None)
+): # add other vars as needed
+
+ # The environment provides the required information
+ class DummyK5Realm(object):
+ def __init__(self):
+ for k in os.environ:
+ if not k.startswith("K5TEST_"):
+ continue
+ setattr(self, k[7:].lower(), os.environ[k])
+ self.env = {}
+
+ class KerberosTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.realm = DummyK5Realm()
+
+ @classmethod
+ def tearDownClass(cls):
+ del cls.realm
+
+
+else:
+ try:
+ # Try to setup a kerberos environment
+ from k5test import KerberosTestCase
+ except Exception:
+ # Use a dummy, that skips all tests
+ class KerberosTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ raise unittest.SkipTest(
+ "Missing extension package k5test. "
+ 'Please run "pip install k5test" '
+ "to install it."
+ )
+
+
+def update_env(testcase, mapping, env=os.environ):
+ """Modify os.environ during a test case and restore during cleanup."""
+ saved_env = env.copy()
+
+ def replace(target, source):
+ target.update(source)
+ for k in list(target):
+ if k not in source:
+ target.pop(k, None)
+
+ testcase.addCleanup(replace, env, saved_env)
+ env.update(mapping)
+ return testcase
+
+
+def k5shell(args=None):
+ """Create a shell with an kerberos environment
+
+ This can be used to debug paramiko or to test the old GSSAPI.
+ To test a different GSSAPI, simply activate a suitable venv
+ within the shell.
+ """
+ import k5test
+ import atexit
+ import subprocess
+
+ k5 = k5test.K5Realm()
+ atexit.register(k5.stop)
+ os.environ.update(k5.env)
+ for n in ("realm", "user_princ", "hostname"):
+ os.environ["K5TEST_" + n.upper()] = getattr(k5, n)
+
+ if not args:
+ args = sys.argv[1:]
+ if not args:
+ args = [os.environ.get("SHELL", "bash")]
+ sys.exit(subprocess.call(args))