diff options
-rw-r--r-- | tests/test_transport.py | 102 |
1 files changed, 51 insertions, 51 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py index 485a18e8..ac744b26 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -55,7 +55,7 @@ class NullServer (ServerInterface): paranoid_did_password = False paranoid_did_public_key = False paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key')) - + def get_allowed_auths(self, username): if username == 'slowdive': return 'publickey,password' @@ -78,24 +78,24 @@ class NullServer (ServerInterface): def check_channel_shell_request(self, channel): return True - + def check_global_request(self, kind, msg): self._global_request = kind return False - + def check_channel_x11_request(self, channel, single_connection, auth_protocol, auth_cookie, screen_number): self._x11_single_connection = single_connection self._x11_auth_protocol = auth_protocol self._x11_auth_cookie = auth_cookie self._x11_screen_number = screen_number return True - + def check_port_forward_request(self, addr, port): self._listen = socket.socket() self._listen.bind(('127.0.0.1', 0)) self._listen.listen(1) return self._listen.getsockname()[1] - + def cancel_port_forward_request(self, addr, port): self._listen.close() self._listen = None @@ -123,12 +123,12 @@ class TransportTest(ParamikoTest): host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) - + if client_options is not None: client_options(self.tc.get_security_options()) if server_options is not None: server_options(self.ts.get_security_options()) - + event = threading.Event() self.server = NullServer() self.assertTrue(not event.isSet()) @@ -155,7 +155,7 @@ class TransportTest(ParamikoTest): self.assertTrue(False) except TypeError: pass - + def test_2_compute_key(self): self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929 self.tc.H = b'\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3' @@ -208,7 +208,7 @@ class TransportTest(ParamikoTest): event.wait(1.0) self.assertTrue(event.isSet()) self.assertTrue(self.ts.is_active()) - + def test_4_special(self): """ verify that the client can demand odd handshake settings, and can @@ -222,7 +222,7 @@ class TransportTest(ParamikoTest): self.assertEqual('aes256-cbc', self.tc.remote_cipher) self.assertEqual(12, self.tc.packetizer.get_mac_size_out()) self.assertEqual(12, self.tc.packetizer.get_mac_size_in()) - + self.tc.send_ignore(1024) self.tc.renegotiate_keys() self.ts.send_ignore(1024) @@ -236,7 +236,7 @@ class TransportTest(ParamikoTest): self.tc.set_keepalive(1) time.sleep(2) self.assertEqual('keepalive@lag.net', self.server._global_request) - + def test_6_exec_command(self): """ verify that exec_command() does something reasonable. @@ -250,7 +250,7 @@ class TransportTest(ParamikoTest): self.assertTrue(False) except SSHException: pass - + chan = self.tc.open_session() chan.exec_command('yes') schan = self.ts.accept(1.0) @@ -264,7 +264,7 @@ class TransportTest(ParamikoTest): f = chan.makefile_stderr() self.assertEqual('This is on stderr.\n', f.readline()) self.assertEqual('', f.readline()) - + # now try it with combined stdout/stderr chan = self.tc.open_session() chan.exec_command('yes') @@ -273,7 +273,7 @@ class TransportTest(ParamikoTest): schan.send_stderr('This is on stderr.\n') schan.close() - chan.set_combine_stderr(True) + chan.set_combine_stderr(True) f = chan.makefile() self.assertEqual('Hello there.\n', f.readline()) self.assertEqual('This is on stderr.\n', f.readline()) @@ -320,7 +320,7 @@ class TransportTest(ParamikoTest): schan.shutdown_write() schan.send_exit_status(23) schan.close() - + f = chan.makefile() self.assertEqual('Hello there.\n', f.readline()) self.assertEqual('', f.readline()) @@ -342,14 +342,14 @@ class TransportTest(ParamikoTest): chan.invoke_shell() schan = self.ts.accept(1.0) - # nothing should be ready + # nothing should be ready r, w, e = select.select([chan], [], [], 0.1) self.assertEqual([], r) self.assertEqual([], w) self.assertEqual([], e) - + schan.send('hello\n') - + # something should be ready now (give it 1 second to appear) for i in range(10): r, w, e = select.select([chan], [], [], 0.1) @@ -361,7 +361,7 @@ class TransportTest(ParamikoTest): self.assertEqual([], e) self.assertEqual(b'hello\n', chan.recv(6)) - + # and, should be dead again now r, w, e = select.select([chan], [], [], 0.1) self.assertEqual([], r) @@ -369,7 +369,7 @@ class TransportTest(ParamikoTest): self.assertEqual([], e) schan.close() - + # detect eof? for i in range(10): r, w, e = select.select([chan], [], [], 0.1) @@ -380,14 +380,14 @@ class TransportTest(ParamikoTest): self.assertEqual([], w) self.assertEqual([], e) self.assertEqual(bytes(), chan.recv(16)) - + # make sure the pipe is still open for now... p = chan._pipe self.assertEqual(False, p._closed) chan.close() # ...and now is closed. self.assertEqual(True, p._closed) - + def test_B_renegotiate(self): """ verify that a transport can correctly renegotiate mid-stream. @@ -402,7 +402,7 @@ class TransportTest(ParamikoTest): for i in range(20): chan.send('x' * 1024) chan.close() - + # allow a few seconds for the rekeying to complete for i in range(50): if self.tc.H != self.tc.session_id: @@ -441,28 +441,28 @@ class TransportTest(ParamikoTest): chan = self.tc.open_session() chan.exec_command('yes') schan = self.ts.accept(1.0) - + requested = [] def handler(c, addr_port): addr, port = addr_port requested.append((addr, port)) self.tc._queue_incoming_channel(c) - + self.assertEqual(None, getattr(self.server, '_x11_screen_number', None)) cookie = chan.request_x11(0, single_connection=True, handler=handler) self.assertEqual(0, self.server._x11_screen_number) self.assertEqual('MIT-MAGIC-COOKIE-1', self.server._x11_auth_protocol) self.assertEqual(cookie, self.server._x11_auth_cookie) self.assertEqual(True, self.server._x11_single_connection) - + x11_server = self.ts.open_x11_channel(('localhost', 6093)) x11_client = self.tc.accept() self.assertEqual('localhost', requested[0][0]) self.assertEqual(6093, requested[0][1]) - + x11_server.send('hello') self.assertEqual(b'hello', x11_client.recv(5)) - + x11_server.close() x11_client.close() chan.close() @@ -477,13 +477,13 @@ class TransportTest(ParamikoTest): chan = self.tc.open_session() chan.exec_command('yes') schan = self.ts.accept(1.0) - + requested = [] def handler(c, origin_addr_port, server_addr_port): requested.append(origin_addr_port) requested.append(server_addr_port) self.tc._queue_incoming_channel(c) - + port = self.tc.request_port_forward('127.0.0.1', 0, handler) self.assertEqual(port, self.server._listen.getsockname()[1]) @@ -492,14 +492,14 @@ class TransportTest(ParamikoTest): ss, _ = self.server._listen.accept() sch = self.ts.open_forwarded_tcpip_channel(ss.getsockname(), ss.getpeername()) cch = self.tc.accept() - + sch.send('hello') self.assertEqual(b'hello', cch.recv(5)) sch.close() cch.close() ss.close() cs.close() - + # now cancel it. self.tc.cancel_port_forward('127.0.0.1', port) self.assertTrue(self.server._listen is None) @@ -513,7 +513,7 @@ class TransportTest(ParamikoTest): chan = self.tc.open_session() chan.exec_command('yes') schan = self.ts.accept(1.0) - + # open a port on the "server" that the client will ask to forward to. greeting_server = socket.socket() greeting_server.bind(('127.0.0.1', 0)) @@ -524,13 +524,13 @@ class TransportTest(ParamikoTest): sch = self.ts.accept(1.0) cch = socket.socket() cch.connect(self.server._tcpip_dest) - + ss, _ = greeting_server.accept() ss.send(b'Hello!\n') ss.close() sch.send(cch.recv(8192)) sch.close() - + self.assertEqual(b'Hello!\n', cs.recv(7)) cs.close() @@ -544,14 +544,14 @@ class TransportTest(ParamikoTest): chan.invoke_shell() schan = self.ts.accept(1.0) - # nothing should be ready + # nothing should be ready r, w, e = select.select([chan], [], [], 0.1) self.assertEqual([], r) self.assertEqual([], w) self.assertEqual([], e) - + schan.send_stderr('hello\n') - + # something should be ready now (give it 1 second to appear) for i in range(10): r, w, e = select.select([chan], [], [], 0.1) @@ -563,7 +563,7 @@ class TransportTest(ParamikoTest): self.assertEqual([], e) self.assertEqual(b'hello\n', chan.recv_stderr(6)) - + # and, should be dead again now r, w, e = select.select([chan], [], [], 0.1) self.assertEqual([], r) @@ -599,10 +599,10 @@ class TransportTest(ParamikoTest): def test_I_rekey_deadlock(self): """ Regression test for deadlock when in-transit messages are received after MSG_KEXINIT is sent - + Note: When this test fails, it may leak threads. """ - + # Test for an obscure deadlocking bug that can occur if we receive # certain messages while initiating a key exchange. # @@ -619,7 +619,7 @@ class TransportTest(ParamikoTest): # NeedRekeyException. # 4. In response to NeedRekeyException, the transport thread sends # MSG_KEXINIT to the remote host. - # + # # On the remote host (using any SSH implementation): # 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST is sent. # 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is sent. @@ -654,7 +654,7 @@ class TransportTest(ParamikoTest): self.done_event = done_event self.watchdog_event = threading.Event() self.last = None - + def run(self): try: for i in range(1, 1+self.iterations): @@ -666,7 +666,7 @@ class TransportTest(ParamikoTest): finally: self.done_event.set() self.watchdog_event.set() - + class ReceiveThread(threading.Thread): def __init__(self, chan, done_event): threading.Thread.__init__(self, None, None, self.__class__.__name__) @@ -674,7 +674,7 @@ class TransportTest(ParamikoTest): self.chan = chan self.done_event = done_event self.watchdog_event = threading.Event() - + def run(self): try: while not self.done_event.isSet(): @@ -687,10 +687,10 @@ class TransportTest(ParamikoTest): finally: self.done_event.set() self.watchdog_event.set() - + self.setup_test_server() self.ts.packetizer.REKEY_BYTES = 2048 - + chan = self.tc.open_session() chan.exec_command('yes') schan = self.ts.accept(1.0) @@ -712,7 +712,7 @@ class TransportTest(ParamikoTest): self._send_message(m2) return _negotiate_keys(self, m) self.tc._handler_table[MSG_KEXINIT] = _negotiate_keys_wrapper - + # Parameters for the test iterations = 500 # The deadlock does not happen every time, but it # should after many iterations. @@ -724,12 +724,12 @@ class TransportTest(ParamikoTest): # Start the sending thread st = SendThread(schan, iterations, done_event) st.start() - + # Start the receiving thread rt = ReceiveThread(chan, done_event) rt.start() - # Act as a watchdog timer, checking + # Act as a watchdog timer, checking deadlocked = False while not deadlocked and not done_event.isSet(): for event in (st.watchdog_event, rt.watchdog_event): @@ -740,7 +740,7 @@ class TransportTest(ParamikoTest): deadlocked = True break event.clear() - + # Tell the threads to stop (if they haven't already stopped). Note # that if one or more threads are deadlocked, they might hang around # forever (until the process exits). |