summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/test_channelfile.py31
-rw-r--r--tests/test_client.py44
-rw-r--r--tests/test_gssapi.py87
-rw-r--r--tests/test_kex_gss.py23
-rw-r--r--tests/test_ssh_exception.py42
-rw-r--r--tests/test_ssh_gss.py14
-rw-r--r--tests/test_transport.py67
-rw-r--r--tests/util.py105
8 files changed, 360 insertions, 53 deletions
diff --git a/tests/test_channelfile.py b/tests/test_channelfile.py
index ffcbea3f..4448fdfb 100644
--- a/tests/test_channelfile.py
+++ b/tests/test_channelfile.py
@@ -1,32 +1,36 @@
from mock import patch, MagicMock
-from paramiko import Channel, ChannelFile, ChannelStderrFile
+from paramiko import Channel, ChannelFile, ChannelStderrFile, ChannelStdinFile
-class TestChannelFile(object):
+class ChannelFileBase(object):
@patch("paramiko.channel.ChannelFile._set_mode")
def test_defaults_to_unbuffered_reading(self, setmode):
- ChannelFile(Channel(None))
+ self.klass(Channel(None))
setmode.assert_called_once_with("r", -1)
@patch("paramiko.channel.ChannelFile._set_mode")
def test_can_override_mode_and_bufsize(self, setmode):
- ChannelFile(Channel(None), mode="w", bufsize=25)
+ self.klass(Channel(None), mode="w", bufsize=25)
setmode.assert_called_once_with("w", 25)
def test_read_recvs_from_channel(self):
chan = MagicMock()
- cf = ChannelFile(chan)
+ cf = self.klass(chan)
cf.read(100)
chan.recv.assert_called_once_with(100)
def test_write_calls_channel_sendall(self):
chan = MagicMock()
- cf = ChannelFile(chan, mode="w")
+ cf = self.klass(chan, mode="w")
cf.write("ohai")
chan.sendall.assert_called_once_with(b"ohai")
+class TestChannelFile(ChannelFileBase):
+ klass = ChannelFile
+
+
class TestChannelStderrFile(object):
def test_read_calls_channel_recv_stderr(self):
chan = MagicMock()
@@ -39,3 +43,18 @@ class TestChannelStderrFile(object):
cf = ChannelStderrFile(chan, mode="w")
cf.write("ohai")
chan.sendall_stderr.assert_called_once_with(b"ohai")
+
+
+class TestChannelStdinFile(ChannelFileBase):
+ klass = ChannelStdinFile
+
+ def test_close_calls_channel_shutdown_write(self):
+ chan = MagicMock()
+ cf = ChannelStdinFile(chan, mode="wb")
+ cf.flush = MagicMock()
+ cf.close()
+ # Sanity check that we still call BufferedFile.close()
+ cf.flush.assert_called_once_with()
+ assert cf._closed is True
+ # Actual point of test
+ chan.shutdown_write.assert_called_once_with()
diff --git a/tests/test_client.py b/tests/test_client.py
index 26de2d37..ad5c36ad 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -34,8 +34,10 @@ import weakref
from tempfile import mkstemp
from pytest_relaxed import raises
+from mock import patch, Mock
import paramiko
+from paramiko import SSHClient
from paramiko.pkey import PublicBlob
from paramiko.ssh_exception import SSHException, AuthenticationException
@@ -191,7 +193,7 @@ class ClientTest(unittest.TestCase):
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
# Client setup
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.get_host_keys().add(
"[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key
)
@@ -213,7 +215,7 @@ class ClientTest(unittest.TestCase):
# Nobody else tests the API of exec_command so let's do it here for
# now. :weary:
- assert isinstance(stdin, paramiko.ChannelFile)
+ assert isinstance(stdin, paramiko.ChannelStdinFile)
assert isinstance(stdout, paramiko.ChannelFile)
assert isinstance(stderr, paramiko.ChannelStderrFile)
@@ -349,7 +351,7 @@ class SSHClientTest(ClientTest):
key_file = _support("test_ecdsa_256.key")
public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file)
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(password="pygmalion", **self.connect_kwargs)
@@ -376,7 +378,7 @@ class SSHClientTest(ClientTest):
fd, localname = mkstemp()
os.close(fd)
- client = paramiko.SSHClient()
+ client = SSHClient()
assert len(client.get_host_keys()) == 0
host_id = "[%s]:%d" % (self.addr, self.port)
@@ -403,7 +405,7 @@ class SSHClientTest(ClientTest):
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(**dict(self.connect_kwargs, password="pygmalion"))
@@ -431,7 +433,7 @@ class SSHClientTest(ClientTest):
"""
threading.Thread(target=self._run).start()
- with paramiko.SSHClient() as tc:
+ with SSHClient() as tc:
self.tc = tc
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
assert len(self.tc.get_host_keys()) == 0
@@ -456,7 +458,7 @@ class SSHClientTest(ClientTest):
)
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.get_host_keys().add(
"[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key
)
@@ -519,7 +521,7 @@ class SSHClientTest(ClientTest):
"""
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.assertRaises(
@@ -539,7 +541,7 @@ class SSHClientTest(ClientTest):
# 2017-08-01
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.assertRaises(
@@ -554,7 +556,7 @@ class SSHClientTest(ClientTest):
threading.Thread(target=self._run).start()
hostname = "[%s]:%d" % (self.addr, self.port)
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.WarningPolicy())
known_hosts = self.tc.get_host_keys()
known_hosts.add(hostname, host_key.get_name(), host_key)
@@ -570,7 +572,7 @@ class SSHClientTest(ClientTest):
threading.Thread(target=self._run).start()
hostname = "[%s]:%d" % (self.addr, self.port)
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
host_key = ktype.from_private_key_file(_support(kfile))
known_hosts = self.tc.get_host_keys()
@@ -599,7 +601,7 @@ class SSHClientTest(ClientTest):
def _setup_for_env(self):
threading.Thread(target=self._run).start()
- self.tc = paramiko.SSHClient()
+ self.tc = SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
self.tc.connect(
@@ -643,7 +645,7 @@ class SSHClientTest(ClientTest):
"""
# AN ACTUAL UNIT TEST?! GOOD LORD
# (But then we have to test a private API...meh.)
- client = paramiko.SSHClient()
+ client = SSHClient()
# Default
assert isinstance(client._policy, paramiko.RejectPolicy)
# Hand in an instance (classic behavior)
@@ -653,6 +655,22 @@ class SSHClientTest(ClientTest):
client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
assert isinstance(client._policy, paramiko.AutoAddPolicy)
+ @patch("paramiko.client.Transport")
+ def test_disabled_algorithms_defaults_to_None(self, Transport):
+ SSHClient().connect("host", sock=Mock(), password="no")
+ assert Transport.call_args[1]["disabled_algorithms"] is None
+
+ @patch("paramiko.client.Transport")
+ def test_disabled_algorithms_passed_directly_if_given(self, Transport):
+ SSHClient().connect(
+ "host",
+ sock=Mock(),
+ password="no",
+ disabled_algorithms={"keys": ["ssh-dss"]},
+ )
+ call_arg = Transport.call_args[1]["disabled_algorithms"]
+ assert call_arg == {"keys": ["ssh-dss"]}
+
class PasswordPassphraseTests(ClientTest):
# TODO: most of these could reasonably be set up to use mocks/assertions
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index 46d5bbd1..30ffb56d 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -22,20 +22,21 @@
Test the used APIs for GSS-API / SSPI authentication
"""
-import unittest
import socket
-from .util import needs_gssapi
+from .util import needs_gssapi, KerberosTestCase, update_env
@needs_gssapi
-class GSSAPITest(unittest.TestCase):
- def setup(self):
+class GSSAPITest(KerberosTestCase):
+ def setUp(self):
+ super(GSSAPITest, self).setUp()
# TODO: these vars should all come from os.environ or whatever the
# approved pytest method is for runtime-configuring test data.
self.krb5_mech = "1.2.840.113554.1.2.2"
- self.targ_name = "hostname"
+ self.targ_name = self.realm.hostname
self.server_mode = False
+ update_env(self, self.realm.env)
def test_pyasn1(self):
"""
@@ -48,13 +49,20 @@ class GSSAPITest(unittest.TestCase):
mech, __ = decoder.decode(oid)
self.assertEquals(self.krb5_mech, mech.__str__())
- def test_gssapi_sspi(self):
+ def _gssapi_sspi_test(self):
"""
Test the used methods of python-gssapi or sspi, sspicon from pywin32.
"""
- _API = "MIT"
try:
import gssapi
+
+ if (
+ hasattr(gssapi, "__title__")
+ and gssapi.__title__ == "python-gssapi"
+ ):
+ _API = "PYTHON-GSSAPI-OLD"
+ else:
+ _API = "PYTHON-GSSAPI-NEW"
except ImportError:
import sspicon
import sspi
@@ -65,7 +73,7 @@ class GSSAPITest(unittest.TestCase):
gss_ctxt_status = False
mic_msg = b"G'day Mate!"
- if _API == "MIT":
+ if _API == "PYTHON-GSSAPI-OLD":
if self.server_mode:
gss_flags = (
gssapi.C_PROT_READY_FLAG,
@@ -113,6 +121,56 @@ class GSSAPITest(unittest.TestCase):
# Check MIC
status = gss_srv_ctxt.verify_mic(mic_msg, mic_token)
self.assertEquals(0, status)
+ elif _API == "PYTHON-GSSAPI-NEW":
+ if self.server_mode:
+ gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.mutual_authentication,
+ gssapi.RequirementFlag.delegate_to_peer,
+ )
+ else:
+ gss_flags = (
+ gssapi.RequirementFlag.protection_ready,
+ gssapi.RequirementFlag.integrity,
+ gssapi.RequirementFlag.delegate_to_peer,
+ )
+ # Initialize a GSS-API context.
+ krb5_oid = gssapi.MechType.kerberos
+ target_name = gssapi.Name(
+ "host@" + self.targ_name,
+ name_type=gssapi.NameType.hostbased_service,
+ )
+ gss_ctxt = gssapi.SecurityContext(
+ name=target_name,
+ flags=gss_flags,
+ mech=krb5_oid,
+ usage="initiate",
+ )
+ if self.server_mode:
+ c_token = gss_ctxt.step(c_token)
+ gss_ctxt_status = gss_ctxt.complete
+ self.assertEquals(False, gss_ctxt_status)
+ # Accept a GSS-API context.
+ gss_srv_ctxt = gssapi.SecurityContext(usage="accept")
+ s_token = gss_srv_ctxt.step(c_token)
+ gss_ctxt_status = gss_srv_ctxt.complete
+ self.assertNotEquals(None, s_token)
+ self.assertEquals(True, gss_ctxt_status)
+ # Establish the client context
+ c_token = gss_ctxt.step(s_token)
+ self.assertEquals(None, c_token)
+ else:
+ while not gss_ctxt.complete:
+ c_token = gss_ctxt.step(c_token)
+ self.assertNotEquals(None, c_token)
+ # Build MIC
+ mic_token = gss_ctxt.get_signature(mic_msg)
+
+ if self.server_mode:
+ # Check MIC
+ status = gss_srv_ctxt.verify_signature(mic_msg, mic_token)
+ self.assertEquals(0, status)
else:
gss_flags = (
sspicon.ISC_REQ_INTEGRITY
@@ -145,3 +203,16 @@ class GSSAPITest(unittest.TestCase):
error, token = gss_ctxt.authorize(c_token)
c_token = token[0].Buffer
self.assertNotEquals(0, error)
+
+ def test_2_gssapi_sspi_client(self):
+ """
+ Test the used methods of python-gssapi or sspi, sspicon from pywin32.
+ """
+ self._gssapi_sspi_test()
+
+ def test_3_gssapi_sspi_server(self):
+ """
+ Test the used methods of python-gssapi or sspi, sspicon from pywin32.
+ """
+ self.server_mode = True
+ self._gssapi_sspi_test()
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index 42e0a101..6f5625dc 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -31,7 +31,7 @@ import unittest
import paramiko
-from .util import needs_gssapi
+from .util import needs_gssapi, KerberosTestCase, update_env
class NullServer(paramiko.ServerInterface):
@@ -53,27 +53,22 @@ class NullServer(paramiko.ServerInterface):
return paramiko.OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != "yes":
+ if command != b"yes":
return False
return True
@needs_gssapi
-class GSSKexTest(unittest.TestCase):
- @staticmethod
- def init(username, hostname):
- global krb5_principal, targ_name
- krb5_principal = username
- targ_name = hostname
-
+class GSSKexTest(KerberosTestCase):
def setUp(self):
- self.username = krb5_principal
- self.hostname = socket.getfqdn(targ_name)
+ self.username = self.realm.user_princ
+ self.hostname = socket.getfqdn(self.realm.hostname)
self.sockl = socket.socket()
- self.sockl.bind((targ_name, 0))
+ self.sockl.bind((self.realm.hostname, 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
+ update_env(self, self.realm.env)
thread = threading.Thread(target=self._run)
thread.start()
@@ -87,7 +82,7 @@ class GSSKexTest(unittest.TestCase):
self.ts = paramiko.Transport(self.socks, gss_kex=True)
host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
self.ts.add_server_key(host_key)
- self.ts.set_gss_host(targ_name)
+ self.ts.set_gss_host(self.realm.hostname)
try:
self.ts.load_server_moduli()
except:
@@ -150,6 +145,8 @@ class GSSKexTest(unittest.TestCase):
"""
self._test_gsskex_and_auth(gss_host=None)
+ # To be investigated, see https://github.com/paramiko/paramiko/issues/1312
+ @unittest.expectedFailure
def test_gsskex_and_auth_rekey(self):
"""
Verify that Paramiko can rekey.
diff --git a/tests/test_ssh_exception.py b/tests/test_ssh_exception.py
index d9e0bd22..1628986a 100644
--- a/tests/test_ssh_exception.py
+++ b/tests/test_ssh_exception.py
@@ -1,7 +1,15 @@
import pickle
import unittest
-from paramiko.ssh_exception import NoValidConnectionsError
+from paramiko import RSAKey
+from paramiko.ssh_exception import (
+ NoValidConnectionsError,
+ BadAuthenticationType,
+ PartialAuthentication,
+ ChannelException,
+ BadHostKeyException,
+ ProxyCommandFailure,
+)
class NoValidConnectionsErrorTest(unittest.TestCase):
@@ -33,3 +41,35 @@ class NoValidConnectionsErrorTest(unittest.TestCase):
)
exp = "Unable to connect to port 22 on 10.0.0.42, 127.0.0.1 or ::1"
assert exp in str(exc)
+
+
+class ExceptionStringDisplayTest(unittest.TestCase):
+ def test_BadAuthenticationType(self):
+ exc = BadAuthenticationType(
+ "Bad authentication type", ["ok", "also-ok"]
+ )
+ expected = "Bad authentication type; allowed types: ['ok', 'also-ok']"
+ assert str(exc) == expected
+
+ def test_PartialAuthentication(self):
+ exc = PartialAuthentication(["ok", "also-ok"])
+ expected = "Partial authentication; allowed types: ['ok', 'also-ok']"
+ assert str(exc) == expected
+
+ def test_BadHostKeyException(self):
+ got_key = RSAKey.generate(2048)
+ wanted_key = RSAKey.generate(2048)
+ exc = BadHostKeyException("myhost", got_key, wanted_key)
+ expected = "Host key for server 'myhost' does not match: got '{}', expected '{}'" # noqa
+ assert str(exc) == expected.format(
+ got_key.get_base64(), wanted_key.get_base64()
+ )
+
+ def test_ProxyCommandFailure(self):
+ exc = ProxyCommandFailure("man squid", 7)
+ expected = 'ProxyCommand("man squid") returned nonzero exit status: 7'
+ assert str(exc) == expected
+
+ def test_ChannelException(self):
+ exc = ChannelException(17, "whatever")
+ assert str(exc) == "ChannelException(17, 'whatever')"
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index d8c151f9..92801c20 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -25,11 +25,10 @@ Unit Tests for the GSS-API / SSPI SSHv2 Authentication (gssapi-with-mic)
import socket
import threading
-import unittest
import paramiko
-from .util import _support, needs_gssapi
+from .util import _support, needs_gssapi, KerberosTestCase, update_env
from .test_client import FINGERPRINTS
@@ -61,23 +60,24 @@ class NullServer(paramiko.ServerInterface):
return paramiko.OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != "yes":
+ if command != b"yes":
return False
return True
@needs_gssapi
-class GSSAuthTest(unittest.TestCase):
+class GSSAuthTest(KerberosTestCase):
def setUp(self):
# TODO: username and targ_name should come from os.environ or whatever
# the approved pytest method is for runtime-configuring test data.
- self.username = "krb5_principal"
- self.hostname = socket.getfqdn("targ_name")
+ self.username = self.realm.user_princ
+ self.hostname = socket.getfqdn(self.realm.hostname)
self.sockl = socket.socket()
- self.sockl.bind(("targ_name", 0))
+ self.sockl.bind((self.realm.hostname, 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
+ update_env(self, self.realm.env)
thread = threading.Thread(target=self._run)
thread.start()
diff --git a/tests/test_transport.py b/tests/test_transport.py
index ad267e28..e2174896 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -1102,3 +1102,70 @@ class TransportTest(unittest.TestCase):
assert not self.ts.auth_handler.authenticated
# Real fix's behavior
self._expect_unimplemented()
+
+
+class AlgorithmDisablingTests(unittest.TestCase):
+ def test_preferred_lists_default_to_private_attribute_contents(self):
+ t = Transport(sock=Mock())
+ assert t.preferred_ciphers == t._preferred_ciphers
+ assert t.preferred_macs == t._preferred_macs
+ assert t.preferred_keys == t._preferred_keys
+ assert t.preferred_kex == t._preferred_kex
+
+ def test_preferred_lists_filter_disabled_algorithms(self):
+ t = Transport(
+ sock=Mock(),
+ disabled_algorithms={
+ "ciphers": ["aes128-cbc"],
+ "macs": ["hmac-md5"],
+ "keys": ["ssh-dss"],
+ "kex": ["diffie-hellman-group14-sha256"],
+ },
+ )
+ assert "aes128-cbc" in t._preferred_ciphers
+ assert "aes128-cbc" not in t.preferred_ciphers
+ assert "hmac-md5" in t._preferred_macs
+ assert "hmac-md5" not in t.preferred_macs
+ assert "ssh-dss" in t._preferred_keys
+ assert "ssh-dss" not in t.preferred_keys
+ assert "diffie-hellman-group14-sha256" in t._preferred_kex
+ assert "diffie-hellman-group14-sha256" not in t.preferred_kex
+
+ def test_implementation_refers_to_public_algo_lists(self):
+ t = Transport(
+ sock=Mock(),
+ disabled_algorithms={
+ "ciphers": ["aes128-cbc"],
+ "macs": ["hmac-md5"],
+ "keys": ["ssh-dss"],
+ "kex": ["diffie-hellman-group14-sha256"],
+ "compression": ["zlib"],
+ },
+ )
+ # Enable compression cuz otherwise disabling one option for it makes no
+ # sense...
+ t.use_compression(True)
+ # Effectively a random spot check, but kex init touches most/all of the
+ # algorithm lists so it's a good spot.
+ t._send_message = Mock()
+ t._send_kex_init()
+ # Cribbed from Transport._parse_kex_init, which didn't feel worth
+ # refactoring given all the vars involved :(
+ m = t._send_message.call_args[0][0]
+ m.rewind()
+ m.get_byte() # the msg type
+ m.get_bytes(16) # cookie, discarded
+ kexen = m.get_list()
+ server_keys = m.get_list()
+ ciphers = m.get_list()
+ m.get_list()
+ macs = m.get_list()
+ m.get_list()
+ compressions = m.get_list()
+ # OK, now we can actually check that our disabled algos were not
+ # included (as this message includes the full lists)
+ assert "aes128-cbc" not in ciphers
+ assert "hmac-md5" not in macs
+ assert "ssh-dss" not in server_keys
+ assert "diffie-hellman-group14-sha256" not in kexen
+ assert "zlib" not in compressions
diff --git a/tests/util.py b/tests/util.py
index 4ca02374..cdc835c9 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -1,19 +1,21 @@
from os.path import dirname, realpath, join
+import os
+import sys
+import unittest
import pytest
from paramiko.py3compat import builtins
+from paramiko.ssh_gss import GSS_AUTH_AVAILABLE
def _support(filename):
return join(dirname(realpath(__file__)), filename)
-# TODO: consider using pytest.importorskip('gssapi') instead? We presumably
-# still need CLI configurability for the Kerberos parameters, though, so can't
-# JUST key off presence of GSSAPI optional dependency...
-# TODO: anyway, s/True/os.environ.get('RUN_GSSAPI', False)/ or something.
-needs_gssapi = pytest.mark.skipif(True, reason="No GSSAPI to test")
+needs_gssapi = pytest.mark.skipif(
+ not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test"
+)
def needs_builtin(name):
@@ -25,3 +27,96 @@ def needs_builtin(name):
slow = pytest.mark.slow
+
+# GSSAPI / Kerberos related tests need a working Kerberos environment.
+# The class `KerberosTestCase` provides such an environment or skips all tests.
+# There are 3 distinct cases:
+#
+# - A Kerberos environment has already been created and the environment
+# contains the required information.
+#
+# - We can use the package 'k5test' to setup an working kerberos environment on
+# the fly.
+#
+# - We skip all tests.
+#
+# ToDo: add a Windows specific implementation?
+
+if (
+ os.environ.get("K5TEST_USER_PRINC", None)
+ and os.environ.get("K5TEST_HOSTNAME", None)
+ and os.environ.get("KRB5_KTNAME", None)
+): # add other vars as needed
+
+ # The environment provides the required information
+ class DummyK5Realm(object):
+ def __init__(self):
+ for k in os.environ:
+ if not k.startswith("K5TEST_"):
+ continue
+ setattr(self, k[7:].lower(), os.environ[k])
+ self.env = {}
+
+ class KerberosTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.realm = DummyK5Realm()
+
+ @classmethod
+ def tearDownClass(cls):
+ del cls.realm
+
+
+else:
+ try:
+ # Try to setup a kerberos environment
+ from k5test import KerberosTestCase
+ except Exception:
+ # Use a dummy, that skips all tests
+ class KerberosTestCase(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ raise unittest.SkipTest(
+ "Missing extension package k5test. "
+ 'Please run "pip install k5test" '
+ "to install it."
+ )
+
+
+def update_env(testcase, mapping, env=os.environ):
+ """Modify os.environ during a test case and restore during cleanup."""
+ saved_env = env.copy()
+
+ def replace(target, source):
+ target.update(source)
+ for k in list(target):
+ if k not in source:
+ target.pop(k, None)
+
+ testcase.addCleanup(replace, env, saved_env)
+ env.update(mapping)
+ return testcase
+
+
+def k5shell(args=None):
+ """Create a shell with an kerberos environment
+
+ This can be used to debug paramiko or to test the old GSSAPI.
+ To test a different GSSAPI, simply activate a suitable venv
+ within the shell.
+ """
+ import k5test
+ import atexit
+ import subprocess
+
+ k5 = k5test.K5Realm()
+ atexit.register(k5.stop)
+ os.environ.update(k5.env)
+ for n in ("realm", "user_princ", "hostname"):
+ os.environ["K5TEST_" + n.upper()] = getattr(k5, n)
+
+ if not args:
+ args = sys.argv[1:]
+ if not args:
+ args = [os.environ.get("SHELL", "bash")]
+ sys.exit(subprocess.call(args))