1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
from os.path import dirname, realpath, join
import os
import sys
import unittest
import pytest
from paramiko.py3compat import builtins
from paramiko.ssh_gss import GSS_AUTH_AVAILABLE
tests_dir = dirname(realpath(__file__))
def _support(filename):
return join(tests_dir, filename)
def _config(name):
return join(tests_dir, "configs", name)
needs_gssapi = pytest.mark.skipif(
not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test"
)
def needs_builtin(name):
"""
Skip decorated test if builtin name does not exist.
"""
reason = "Test requires a builtin '{}'".format(name)
return pytest.mark.skipif(not hasattr(builtins, name), reason=reason)
slow = pytest.mark.slow
# GSSAPI / Kerberos related tests need a working Kerberos environment.
# The class `KerberosTestCase` provides such an environment or skips all tests.
# There are 3 distinct cases:
#
# - A Kerberos environment has already been created and the environment
# contains the required information.
#
# - We can use the package 'k5test' to setup an working kerberos environment on
# the fly.
#
# - We skip all tests.
#
# ToDo: add a Windows specific implementation?
if (
os.environ.get("K5TEST_USER_PRINC", None)
and os.environ.get("K5TEST_HOSTNAME", None)
and os.environ.get("KRB5_KTNAME", None)
): # add other vars as needed
# The environment provides the required information
class DummyK5Realm(object):
def __init__(self):
for k in os.environ:
if not k.startswith("K5TEST_"):
continue
setattr(self, k[7:].lower(), os.environ[k])
self.env = {}
class KerberosTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.realm = DummyK5Realm()
@classmethod
def tearDownClass(cls):
del cls.realm
else:
try:
# Try to setup a kerberos environment
from k5test import KerberosTestCase
except Exception:
# Use a dummy, that skips all tests
class KerberosTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
raise unittest.SkipTest(
"Missing extension package k5test. "
'Please run "pip install k5test" '
"to install it."
)
def update_env(testcase, mapping, env=os.environ):
"""Modify os.environ during a test case and restore during cleanup."""
saved_env = env.copy()
def replace(target, source):
target.update(source)
for k in list(target):
if k not in source:
target.pop(k, None)
testcase.addCleanup(replace, env, saved_env)
env.update(mapping)
return testcase
def k5shell(args=None):
"""Create a shell with an kerberos environment
This can be used to debug paramiko or to test the old GSSAPI.
To test a different GSSAPI, simply activate a suitable venv
within the shell.
"""
import k5test
import atexit
import subprocess
k5 = k5test.K5Realm()
atexit.register(k5.stop)
os.environ.update(k5.env)
for n in ("realm", "user_princ", "hostname"):
os.environ["K5TEST_" + n.upper()] = getattr(k5, n)
if not args:
args = sys.argv[1:]
if not args:
args = [os.environ.get("SHELL", "bash")]
sys.exit(subprocess.call(args))
|