summaryrefslogtreecommitdiffhomepage
path: root/tests/test_kex_gss.py
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2018-09-17 17:58:44 -0700
committerJeff Forcier <jeff@bitprophet.org>2018-09-17 17:58:44 -0700
commit4cc767f5cd109a459571abbf21c68b783ce237c9 (patch)
tree68338fb6f9dc72cfaf8ee0ef5ef6cc572f613403 /tests/test_kex_gss.py
parentab823cc9836b8f3ec7cdd916f0dc53f0096e1c59 (diff)
parent068b0914e396e8a47e28245a0a69ea7bf6bea6ff (diff)
Merge branch '2.1' into 2.2
Foregoes a handful of unblackened bits to make the merge easier; will blacken next anyways
Diffstat (limited to 'tests/test_kex_gss.py')
-rw-r--r--tests/test_kex_gss.py49
1 files changed, 29 insertions, 20 deletions
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index af342a7c..c71ff91c 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -31,15 +31,16 @@ import unittest
import paramiko
+from .util import needs_gssapi
-class NullServer (paramiko.ServerInterface):
+class NullServer(paramiko.ServerInterface):
def get_allowed_auths(self, username):
- return 'gssapi-keyex'
+ return "gssapi-keyex"
- def check_auth_gssapi_keyex(self, username,
- gss_authenticated=paramiko.AUTH_FAILED,
- cc_file=None):
+ def check_auth_gssapi_keyex(
+ self, username, gss_authenticated=paramiko.AUTH_FAILED, cc_file=None
+ ):
if gss_authenticated == paramiko.AUTH_SUCCESSFUL:
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
@@ -52,11 +53,12 @@ class NullServer (paramiko.ServerInterface):
return paramiko.OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != 'yes':
+ if command != "yes":
return False
return True
+@needs_gssapi
class GSSKexTest(unittest.TestCase):
@staticmethod
def init(username, hostname):
@@ -83,13 +85,13 @@ class GSSKexTest(unittest.TestCase):
def _run(self):
self.socks, addr = self.sockl.accept()
self.ts = paramiko.Transport(self.socks, gss_kex=True)
- host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
self.ts.add_server_key(host_key)
self.ts.set_gss_host(targ_name)
try:
self.ts.load_server_moduli()
except:
- print ('(Failed to load moduli -- gex will be unsupported.)')
+ print("(Failed to load moduli -- gex will be unsupported.)")
server = NullServer()
self.ts.start_server(self.event, server)
@@ -99,14 +101,21 @@ class GSSKexTest(unittest.TestCase):
Diffie-Hellman Key Exchange and user authentication with the GSS-API
context created during key exchange.
"""
- host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key")
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
- self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port),
- 'ssh-rsa', public_host_key)
- self.tc.connect(self.hostname, self.port, username=self.username,
- gss_auth=True, gss_kex=True, gss_host=gss_host)
+ self.tc.get_host_keys().add(
+ "[%s]:%d" % (self.hostname, self.port), "ssh-rsa", public_host_key
+ )
+ self.tc.connect(
+ self.hostname,
+ self.port,
+ username=self.username,
+ gss_auth=True,
+ gss_kex=True,
+ gss_host=gss_host,
+ )
self.event.wait(1.0)
self.assert_(self.event.is_set())
@@ -115,19 +124,19 @@ class GSSKexTest(unittest.TestCase):
self.assertEquals(True, self.ts.is_authenticated())
self.assertEquals(True, self.tc.get_transport().gss_kex_used)
- stdin, stdout, stderr = self.tc.exec_command('yes')
+ stdin, stdout, stderr = self.tc.exec_command("yes")
schan = self.ts.accept(1.0)
if rekey:
self.tc.get_transport().renegotiate_keys()
- schan.send('Hello there.\n')
- schan.send_stderr('This is on stderr.\n')
+ schan.send("Hello there.\n")
+ schan.send_stderr("This is on stderr.\n")
schan.close()
- self.assertEquals('Hello there.\n', stdout.readline())
- self.assertEquals('', stdout.readline())
- self.assertEquals('This is on stderr.\n', stderr.readline())
- self.assertEquals('', stderr.readline())
+ self.assertEquals("Hello there.\n", stdout.readline())
+ self.assertEquals("", stdout.readline())
+ self.assertEquals("This is on stderr.\n", stderr.readline())
+ self.assertEquals("", stderr.readline())
stdin.close()
stdout.close()