diff options
author | Jeff Forcier <jeff@bitprophet.org> | 2018-09-17 17:58:44 -0700 |
---|---|---|
committer | Jeff Forcier <jeff@bitprophet.org> | 2018-09-17 17:58:44 -0700 |
commit | 4cc767f5cd109a459571abbf21c68b783ce237c9 (patch) | |
tree | 68338fb6f9dc72cfaf8ee0ef5ef6cc572f613403 /tests/test_kex_gss.py | |
parent | ab823cc9836b8f3ec7cdd916f0dc53f0096e1c59 (diff) | |
parent | 068b0914e396e8a47e28245a0a69ea7bf6bea6ff (diff) |
Merge branch '2.1' into 2.2
Foregoes a handful of unblackened bits to make the merge easier; will
blacken next anyways
Diffstat (limited to 'tests/test_kex_gss.py')
-rw-r--r-- | tests/test_kex_gss.py | 49 |
1 files changed, 29 insertions, 20 deletions
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index af342a7c..c71ff91c 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -31,15 +31,16 @@ import unittest import paramiko +from .util import needs_gssapi -class NullServer (paramiko.ServerInterface): +class NullServer(paramiko.ServerInterface): def get_allowed_auths(self, username): - return 'gssapi-keyex' + return "gssapi-keyex" - def check_auth_gssapi_keyex(self, username, - gss_authenticated=paramiko.AUTH_FAILED, - cc_file=None): + def check_auth_gssapi_keyex( + self, username, gss_authenticated=paramiko.AUTH_FAILED, cc_file=None + ): if gss_authenticated == paramiko.AUTH_SUCCESSFUL: return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED @@ -52,11 +53,12 @@ class NullServer (paramiko.ServerInterface): return paramiko.OPEN_SUCCEEDED def check_channel_exec_request(self, channel, command): - if command != 'yes': + if command != "yes": return False return True +@needs_gssapi class GSSKexTest(unittest.TestCase): @staticmethod def init(username, hostname): @@ -83,13 +85,13 @@ class GSSKexTest(unittest.TestCase): def _run(self): self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks, gss_kex=True) - host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") self.ts.add_server_key(host_key) self.ts.set_gss_host(targ_name) try: self.ts.load_server_moduli() except: - print ('(Failed to load moduli -- gex will be unsupported.)') + print("(Failed to load moduli -- gex will be unsupported.)") server = NullServer() self.ts.start_server(self.event, server) @@ -99,14 +101,21 @@ class GSSKexTest(unittest.TestCase): Diffie-Hellman Key Exchange and user authentication with the GSS-API context created during key exchange. """ - host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() - self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port), - 'ssh-rsa', public_host_key) - self.tc.connect(self.hostname, self.port, username=self.username, - gss_auth=True, gss_kex=True, gss_host=gss_host) + self.tc.get_host_keys().add( + "[%s]:%d" % (self.hostname, self.port), "ssh-rsa", public_host_key + ) + self.tc.connect( + self.hostname, + self.port, + username=self.username, + gss_auth=True, + gss_kex=True, + gss_host=gss_host, + ) self.event.wait(1.0) self.assert_(self.event.is_set()) @@ -115,19 +124,19 @@ class GSSKexTest(unittest.TestCase): self.assertEquals(True, self.ts.is_authenticated()) self.assertEquals(True, self.tc.get_transport().gss_kex_used) - stdin, stdout, stderr = self.tc.exec_command('yes') + stdin, stdout, stderr = self.tc.exec_command("yes") schan = self.ts.accept(1.0) if rekey: self.tc.get_transport().renegotiate_keys() - schan.send('Hello there.\n') - schan.send_stderr('This is on stderr.\n') + schan.send("Hello there.\n") + schan.send_stderr("This is on stderr.\n") schan.close() - self.assertEquals('Hello there.\n', stdout.readline()) - self.assertEquals('', stdout.readline()) - self.assertEquals('This is on stderr.\n', stderr.readline()) - self.assertEquals('', stderr.readline()) + self.assertEquals("Hello there.\n", stdout.readline()) + self.assertEquals("", stdout.readline()) + self.assertEquals("This is on stderr.\n", stderr.readline()) + self.assertEquals("", stderr.readline()) stdin.close() stdout.close() |