diff options
Diffstat (limited to 'tests/test_transport.py')
-rw-r--r-- | tests/test_transport.py | 82 |
1 files changed, 74 insertions, 8 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py index 5cf9a867..c426cef1 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -28,15 +28,19 @@ import socket import time import threading import random +from hashlib import sha1 import unittest -from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \ - SSHException, ChannelException +from paramiko import ( + Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, SSHException, + ChannelException, Packetizer, +) from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED -from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, \ - MIN_PACKET_SIZE, MIN_WINDOW_SIZE, MAX_WINDOW_SIZE, \ - DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE +from paramiko.common import ( + MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, MIN_PACKET_SIZE, MIN_WINDOW_SIZE, + MAX_WINDOW_SIZE, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE, +) from paramiko.py3compat import bytes from paramiko.message import Message from tests.loop import LoopSocket @@ -77,7 +81,7 @@ class NullServer (ServerInterface): return OPEN_SUCCEEDED def check_channel_exec_request(self, channel, command): - if command != 'yes': + if command != b'yes': return False return True @@ -161,6 +165,15 @@ class TransportTest(unittest.TestCase): except TypeError: pass + def test_1b_security_options_reset(self): + o = self.tc.get_security_options() + # should not throw any exceptions + o.ciphers = o.ciphers + o.digests = o.digests + o.key_types = o.key_types + o.kex = o.kex + o.compression = o.compression + def test_2_compute_key(self): self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929 self.tc.H = b'\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3' @@ -251,7 +264,7 @@ class TransportTest(unittest.TestCase): chan = self.tc.open_session() schan = self.ts.accept(1.0) try: - chan.exec_command('no') + chan.exec_command(b'command contains \xfc and is not a valid UTF-8 string') self.assertTrue(False) except SSHException: pass @@ -447,9 +460,11 @@ class TransportTest(unittest.TestCase): bytes = self.tc.packetizer._Packetizer__sent_bytes chan.send('x' * 1024) bytes2 = self.tc.packetizer._Packetizer__sent_bytes + block_size = self.tc._cipher_info[self.tc.local_cipher]['block-size'] + mac_size = self.tc._mac_info[self.tc.local_mac]['size'] # tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :) self.assertTrue(bytes2 - bytes < 1024) - self.assertEqual(52, bytes2 - bytes) + self.assertEqual(16 + block_size + mac_size, bytes2 - bytes) chan.close() schan.close() @@ -792,3 +807,54 @@ class TransportTest(unittest.TestCase): (None, DEFAULT_WINDOW_SIZE), (2**32, MAX_WINDOW_SIZE)]: self.assertEqual(self.tc._sanitize_window_size(val), correct) + + def test_L_handshake_timeout(self): + """ + verify that we can get a hanshake timeout. + """ + # Tweak client Transport instance's Packetizer instance so + # its read_message() sleeps a bit. This helps prevent race conditions + # where the client Transport's timeout timer thread doesn't even have + # time to get scheduled before the main client thread finishes + # handshaking with the server. + # (Doing this on the server's transport *sounds* more 'correct' but + # actually doesn't work nearly as well for whatever reason.) + class SlowPacketizer(Packetizer): + def read_message(self): + time.sleep(1) + return super(SlowPacketizer, self).read_message() + # NOTE: prettttty sure since the replaced .packetizer Packetizer is now + # no longer doing anything with its copy of the socket...everything'll + # be fine. Even tho it's a bit squicky. + self.tc.packetizer = SlowPacketizer(self.tc.sock) + # Continue with regular test red tape. + host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + public_host_key = RSAKey(data=host_key.asbytes()) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assertTrue(not event.is_set()) + self.tc.handshake_timeout = 0.000000000001 + self.ts.start_server(event, server) + self.assertRaises(EOFError, self.tc.connect, + hostkey=public_host_key, + username='slowdive', + password='pygmalion') + + def test_M_select_after_close(self): + """ + verify that select works when a channel is already closed. + """ + self.setup_test_server() + chan = self.tc.open_session() + chan.invoke_shell() + schan = self.ts.accept(1.0) + schan.close() + + # give client a moment to receive close notification + time.sleep(0.1) + + r, w, e = select.select([chan], [], [], 0.1) + self.assertEqual([chan], r) + self.assertEqual([], w) + self.assertEqual([], e) |