diff options
Diffstat (limited to 'tests/test_transport.py')
-rw-r--r-- | tests/test_transport.py | 111 |
1 files changed, 56 insertions, 55 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py index 2b8ee3bc..ad267e28 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -28,36 +28,32 @@ import socket import time import threading import random -from hashlib import sha1 import unittest from mock import Mock from paramiko import ( - Transport, - SecurityOptions, - ServerInterface, - RSAKey, - DSSKey, - SSHException, + AuthHandler, ChannelException, + DSSKey, Packetizer, - Channel, - AuthHandler, + RSAKey, + SSHException, + SecurityOptions, + ServerInterface, + Transport, ) from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED from paramiko.common import ( - MSG_KEXINIT, - cMSG_CHANNEL_WINDOW_ADJUST, - cMSG_UNIMPLEMENTED, + DEFAULT_MAX_PACKET_SIZE, + DEFAULT_WINDOW_SIZE, + MAX_WINDOW_SIZE, MIN_PACKET_SIZE, MIN_WINDOW_SIZE, - MAX_WINDOW_SIZE, - DEFAULT_WINDOW_SIZE, - DEFAULT_MAX_PACKET_SIZE, - MSG_NAMES, - MSG_UNIMPLEMENTED, + MSG_KEXINIT, MSG_USERAUTH_SUCCESS, + cMSG_CHANNEL_WINDOW_ADJUST, + cMSG_UNIMPLEMENTED, ) from paramiko.py3compat import bytes, byte_chr from paramiko.message import Message @@ -185,7 +181,7 @@ class TransportTest(unittest.TestCase): self.assertTrue(event.is_set()) self.assertTrue(self.ts.is_active()) - def test_1_security_options(self): + def test_security_options(self): o = self.tc.get_security_options() self.assertEqual(type(o), SecurityOptions) self.assertTrue(("aes256-cbc", "blowfish-cbc") != o.ciphers) @@ -202,7 +198,7 @@ class TransportTest(unittest.TestCase): except TypeError: pass - def test_1b_security_options_reset(self): + def testb_security_options_reset(self): o = self.tc.get_security_options() # should not throw any exceptions o.ciphers = o.ciphers @@ -211,17 +207,17 @@ class TransportTest(unittest.TestCase): o.kex = o.kex o.compression = o.compression - def test_2_compute_key(self): - self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929 - self.tc.H = b"\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3" + def test_compute_key(self): + self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929 # noqa + self.tc.H = b"\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3" # noqa self.tc.session_id = self.tc.H key = self.tc._compute_key("C", 32) self.assertEqual( - b"207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995", + b"207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995", # noqa hexlify(key).upper(), ) - def test_3_simple(self): + def test_simple(self): """ verify that we can establish an ssh link with ourselves across the loopback sockets. this is hardly "simple" but it's simpler than the @@ -249,7 +245,7 @@ class TransportTest(unittest.TestCase): self.assertEqual(True, self.tc.is_authenticated()) self.assertEqual(True, self.ts.is_authenticated()) - def test_3a_long_banner(self): + def testa_long_banner(self): """ verify that a long banner doesn't mess up the handshake. """ @@ -268,7 +264,7 @@ class TransportTest(unittest.TestCase): self.assertTrue(event.is_set()) self.assertTrue(self.ts.is_active()) - def test_4_special(self): + def test_special(self): """ verify that the client can demand odd handshake settings, and can renegotiate keys in mid-stream. @@ -289,7 +285,7 @@ class TransportTest(unittest.TestCase): self.ts.send_ignore(1024) @slow - def test_5_keepalive(self): + def test_keepalive(self): """ verify that the keepalive will be sent. """ @@ -299,7 +295,7 @@ class TransportTest(unittest.TestCase): time.sleep(2) self.assertEqual("keepalive@lag.net", self.server._global_request) - def test_6_exec_command(self): + def test_exec_command(self): """ verify that exec_command() does something reasonable. """ @@ -343,7 +339,7 @@ class TransportTest(unittest.TestCase): self.assertEqual("This is on stderr.\n", f.readline()) self.assertEqual("", f.readline()) - def test_6a_channel_can_be_used_as_context_manager(self): + def testa_channel_can_be_used_as_context_manager(self): """ verify that exec_command() does something reasonable. """ @@ -359,7 +355,7 @@ class TransportTest(unittest.TestCase): self.assertEqual("Hello there.\n", f.readline()) self.assertEqual("", f.readline()) - def test_7_invoke_shell(self): + def test_invoke_shell(self): """ verify that invoke_shell() does something reasonable. """ @@ -373,18 +369,18 @@ class TransportTest(unittest.TestCase): chan.close() self.assertEqual("", f.readline()) - def test_8_channel_exception(self): + def test_channel_exception(self): """ verify that ChannelException is thrown for a bad open-channel request. """ self.setup_test_server() try: - chan = self.tc.open_channel("bogus") + self.tc.open_channel("bogus") self.fail("expected exception") except ChannelException as e: self.assertTrue(e.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED) - def test_9_exit_status(self): + def test_exit_status(self): """ verify that get_exit_status() works. """ @@ -413,7 +409,7 @@ class TransportTest(unittest.TestCase): self.assertEqual(23, chan.recv_exit_status()) chan.close() - def test_A_select(self): + def test_select(self): """ verify that select() on a channel works. """ @@ -468,7 +464,7 @@ class TransportTest(unittest.TestCase): # ...and now is closed. self.assertEqual(True, p._closed) - def test_B_renegotiate(self): + def test_renegotiate(self): """ verify that a transport can correctly renegotiate mid-stream. """ @@ -492,7 +488,7 @@ class TransportTest(unittest.TestCase): schan.close() - def test_C_compression(self): + def test_compression(self): """ verify that zlib compression is basically working. """ @@ -510,14 +506,15 @@ class TransportTest(unittest.TestCase): bytes2 = self.tc.packetizer._Packetizer__sent_bytes block_size = self.tc._cipher_info[self.tc.local_cipher]["block-size"] mac_size = self.tc._mac_info[self.tc.local_mac]["size"] - # tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :) + # tests show this is actually compressed to *52 bytes*! including + # packet overhead! nice!! :) self.assertTrue(bytes2 - bytes < 1024) self.assertEqual(16 + block_size + mac_size, bytes2 - bytes) chan.close() schan.close() - def test_D_x11(self): + def test_x11(self): """ verify that an x11 port can be requested and opened. """ @@ -555,7 +552,7 @@ class TransportTest(unittest.TestCase): chan.close() schan.close() - def test_E_reverse_port_forwarding(self): + def test_reverse_port_forwarding(self): """ verify that a client can ask the server to open a reverse port for forwarding. @@ -563,7 +560,7 @@ class TransportTest(unittest.TestCase): self.setup_test_server() chan = self.tc.open_session() chan.exec_command("yes") - schan = self.ts.accept(1.0) + self.ts.accept(1.0) requested = [] @@ -594,7 +591,7 @@ class TransportTest(unittest.TestCase): self.tc.cancel_port_forward("127.0.0.1", port) self.assertTrue(self.server._listen is None) - def test_F_port_forwarding(self): + def test_port_forwarding(self): """ verify that a client can forward new connections from a locally- forwarded port. @@ -602,7 +599,7 @@ class TransportTest(unittest.TestCase): self.setup_test_server() chan = self.tc.open_session() chan.exec_command("yes") - schan = self.ts.accept(1.0) + self.ts.accept(1.0) # open a port on the "server" that the client will ask to forward to. greeting_server = socket.socket() @@ -626,7 +623,7 @@ class TransportTest(unittest.TestCase): self.assertEqual(b"Hello!\n", cs.recv(7)) cs.close() - def test_G_stderr_select(self): + def test_stderr_select(self): """ verify that select() on a channel works even if only stderr is receiving data. @@ -665,7 +662,7 @@ class TransportTest(unittest.TestCase): schan.close() chan.close() - def test_H_send_ready(self): + def test_send_ready(self): """ verify that send_ready() indicates when a send would not block. """ @@ -689,9 +686,10 @@ class TransportTest(unittest.TestCase): chan.close() self.assertEqual(chan.send_ready(), True) - def test_I_rekey_deadlock(self): + def test_rekey_deadlock(self): """ - Regression test for deadlock when in-transit messages are received after MSG_KEXINIT is sent + Regression test for deadlock when in-transit messages are received + after MSG_KEXINIT is sent Note: When this test fails, it may leak threads. """ @@ -714,12 +712,15 @@ class TransportTest(unittest.TestCase): # MSG_KEXINIT to the remote host. # # On the remote host (using any SSH implementation): - # 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST is sent. - # 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is sent. + # 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST + # is sent. + # 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is + # sent. # # In the main thread: # 7. The user's program calls Channel.send(). - # 8. Channel.send acquires Channel.lock, then calls Transport._send_user_message(). + # 8. Channel.send acquires Channel.lock, then calls + # Transport._send_user_message(). # 9. Transport._send_user_message waits for Transport.clear_to_send # to be set (i.e., it waits for re-keying to complete). # Channel.lock is still held. @@ -854,7 +855,7 @@ class TransportTest(unittest.TestCase): schan.close() chan.close() - def test_J_sanitze_packet_size(self): + def test_sanitze_packet_size(self): """ verify that we conform to the rfc of packet and window sizes. """ @@ -865,7 +866,7 @@ class TransportTest(unittest.TestCase): ]: self.assertEqual(self.tc._sanitize_packet_size(val), correct) - def test_K_sanitze_window_size(self): + def test_sanitze_window_size(self): """ verify that we conform to the rfc of packet and window sizes. """ @@ -877,7 +878,7 @@ class TransportTest(unittest.TestCase): self.assertEqual(self.tc._sanitize_window_size(val), correct) @slow - def test_L_handshake_timeout(self): + def test_handshake_timeout(self): """ verify that we can get a hanshake timeout. """ @@ -914,7 +915,7 @@ class TransportTest(unittest.TestCase): password="pygmalion", ) - def test_M_select_after_close(self): + def test_select_after_close(self): """ verify that select works when a channel is already closed. """ @@ -969,11 +970,11 @@ class TransportTest(unittest.TestCase): # send() accepts buffer instances sent = 0 while sent < len(data): - sent += chan.send(buffer(data, sent, 8)) + sent += chan.send(buffer(data, sent, 8)) # noqa self.assertEqual(sfile.read(len(data)), data) # sendall() accepts a buffer instance - chan.sendall(buffer(data)) + chan.sendall(buffer(data)) # noqa self.assertEqual(sfile.read(len(data)), data) @needs_builtin("memoryview") |