diff options
author | Alex Gaynor <alex.gaynor@gmail.com> | 2015-11-04 10:10:16 -0500 |
---|---|---|
committer | Alex Gaynor <alex.gaynor@gmail.com> | 2015-11-04 10:10:16 -0500 |
commit | 30a7afcdc74443c74f2ebb344a1fbf235c23bf57 (patch) | |
tree | 8d4acdf8a79ae564cfe16e9336ea8455a86f73af /tests | |
parent | 9e67e71377cea9adaaeab6dbb024f2af94ba7999 (diff) | |
parent | 4565fb517ceb54aa994ff96380b2c8f24df43968 (diff) |
Merge branch 'master' into switch-to-cryptography
Conflicts:
.travis.yml
paramiko/ecdsakey.py
paramiko/transport.py
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_client.py | 2 | ||||
-rw-r--r-- | tests/test_hostkeys.py | 1 | ||||
-rw-r--r-- | tests/test_kex.py | 120 | ||||
-rw-r--r-- | tests/test_message.py | 8 | ||||
-rwxr-xr-x | tests/test_sftp.py | 5 | ||||
-rw-r--r-- | tests/test_transport.py | 26 |
6 files changed, 153 insertions, 9 deletions
diff --git a/tests/test_client.py b/tests/test_client.py index cef7b28d..54a2fb9b 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -78,7 +78,7 @@ class NullServer (paramiko.ServerInterface): return paramiko.OPEN_SUCCEEDED def check_channel_exec_request(self, channel, command): - if command != 'yes': + if command != b'yes': return False return True diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py index 0ee1bbf0..2bdcad9c 100644 --- a/tests/test_hostkeys.py +++ b/tests/test_hostkeys.py @@ -31,6 +31,7 @@ test_hosts_file = """\ secure.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA1PD6U2/TVxET6lkpKhOk5r\ 9q/kAYG6sP9f5zuUYP8i7FOFp/6ncCEbbtg/lB+A3iidyxoSWl+9jtoyyDOOVX4UIDV9G11Ml8om3\ D+jrpI9cycZHqilK0HmxDeCuxbwyMuaCygU9gS2qoRvNLWZk70OpIKSSpBo0Wl3/XUmz9uhc= +broken.example.com ssh-rsa AAAA happy.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31M\ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\ 5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M= diff --git a/tests/test_kex.py b/tests/test_kex.py index 56f1b7c7..19804fbf 100644 --- a/tests/test_kex.py +++ b/tests/test_kex.py @@ -26,7 +26,7 @@ import unittest import paramiko.util from paramiko.kex_group1 import KexGroup1 -from paramiko.kex_gex import KexGex +from paramiko.kex_gex import KexGex, KexGexSHA256 from paramiko import Message from paramiko.common import byte_chr @@ -252,3 +252,121 @@ class KexTest (unittest.TestCase): self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) + + def test_7_gex_sha256_client(self): + transport = FakeTransport() + transport.server_mode = False + kex = KexGexSHA256(transport) + kex.start_kex() + x = b'22000004000000080000002000' + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect) + + msg = Message() + msg.add_mpint(FakeModulusPack.P) + msg.add_mpint(FakeModulusPack.G) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) + x = b'20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4' + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect) + + msg = Message() + msg.add_string('fake-host-key') + msg.add_mpint(69) + msg.add_string('fake-sig') + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg) + H = b'AD1A9365A67B4496F05594AD1BF656E3CDA0851289A4C1AFF549FEAE50896DF4' + self.assertEqual(self.K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify) + self.assertTrue(transport._activated) + + def test_8_gex_sha256_old_client(self): + transport = FakeTransport() + transport.server_mode = False + kex = KexGexSHA256(transport) + kex.start_kex(_test_old_style=True) + x = b'1E00000800' + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect) + + msg = Message() + msg.add_mpint(FakeModulusPack.P) + msg.add_mpint(FakeModulusPack.G) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) + x = b'20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4' + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect) + + msg = Message() + msg.add_string('fake-host-key') + msg.add_mpint(69) + msg.add_string('fake-sig') + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg) + H = b'518386608B15891AE5237DEE08DCADDE76A0BCEFCE7F6DB3AD66BC41D256DFE5' + self.assertEqual(self.K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify) + self.assertTrue(transport._activated) + + def test_9_gex_sha256_server(self): + transport = FakeTransport() + transport.server_mode = True + kex = KexGexSHA256(transport) + kex.start_kex() + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect) + + msg = Message() + msg.add_int(1024) + msg.add_int(2048) + msg.add_int(4096) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg) + x = b'1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102' + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect) + + msg = Message() + msg.add_mpint(12345) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) + K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 + H = b'CCAC0497CF0ABA1DBF55E1A3995D17F4CC31824B0E8D95CDF8A06F169D050D80' + x = b'210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967' + self.assertEqual(K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertTrue(transport._activated) + + def test_10_gex_sha256_server_with_old_client(self): + transport = FakeTransport() + transport.server_mode = True + kex = KexGexSHA256(transport) + kex.start_kex() + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect) + + msg = Message() + msg.add_int(2048) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg) + x = b'1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102' + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect) + + msg = Message() + msg.add_mpint(12345) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) + K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 + H = b'3DDD2AD840AD095E397BA4D0573972DC60F6461FD38A187CACA6615A5BC8ADBB' + x = b'210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967' + self.assertEqual(K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertTrue(transport._activated) + + diff --git a/tests/test_message.py b/tests/test_message.py index f308c037..f18cae90 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -92,12 +92,12 @@ class MessageTest (unittest.TestCase): def test_4_misc(self): msg = Message(self.__d) - self.assertEqual(msg.get_int(), 5) - self.assertEqual(msg.get_int(), 0x1122334455) - self.assertEqual(msg.get_int(), 0xf00000000000000000) + self.assertEqual(msg.get_adaptive_int(), 5) + self.assertEqual(msg.get_adaptive_int(), 0x1122334455) + self.assertEqual(msg.get_adaptive_int(), 0xf00000000000000000) self.assertEqual(msg.get_so_far(), self.__d[:29]) self.assertEqual(msg.get_remainder(), self.__d[29:]) msg.rewind() - self.assertEqual(msg.get_int(), 5) + self.assertEqual(msg.get_adaptive_int(), 5) self.assertEqual(msg.get_so_far(), self.__d[:4]) self.assertEqual(msg.get_remainder(), self.__d[4:]) diff --git a/tests/test_sftp.py b/tests/test_sftp.py index cb8f7f84..aa450f59 100755 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -811,6 +811,11 @@ class SFTPTest (unittest.TestCase): sftp.remove('%s/nonutf8data' % FOLDER) + def test_sftp_attributes_empty_str(self): + sftp_attributes = SFTPAttributes() + self.assertEqual(str(sftp_attributes), "?--------- 1 0 0 0 (unknown date) ?") + + if __name__ == '__main__': SFTPTest.init_loopback() # logging is required by test_N_file_with_percent diff --git a/tests/test_transport.py b/tests/test_transport.py index 5cf9a867..a93d8b63 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -28,6 +28,7 @@ import socket import time import threading import random +from hashlib import sha1 import unittest from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \ @@ -77,7 +78,7 @@ class NullServer (ServerInterface): return OPEN_SUCCEEDED def check_channel_exec_request(self, channel, command): - if command != 'yes': + if command != b'yes': return False return True @@ -251,7 +252,7 @@ class TransportTest(unittest.TestCase): chan = self.tc.open_session() schan = self.ts.accept(1.0) try: - chan.exec_command('no') + chan.exec_command(b'command contains \xfc and is not a valid UTF-8 string') self.assertTrue(False) except SSHException: pass @@ -447,9 +448,11 @@ class TransportTest(unittest.TestCase): bytes = self.tc.packetizer._Packetizer__sent_bytes chan.send('x' * 1024) bytes2 = self.tc.packetizer._Packetizer__sent_bytes + block_size = self.tc._cipher_info[self.tc.local_cipher]['block-size'] + mac_size = self.tc._mac_info[self.tc.local_mac]['size'] # tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :) self.assertTrue(bytes2 - bytes < 1024) - self.assertEqual(52, bytes2 - bytes) + self.assertEqual(16 + block_size + mac_size, bytes2 - bytes) chan.close() schan.close() @@ -792,3 +795,20 @@ class TransportTest(unittest.TestCase): (None, DEFAULT_WINDOW_SIZE), (2**32, MAX_WINDOW_SIZE)]: self.assertEqual(self.tc._sanitize_window_size(val), correct) + + def test_L_handshake_timeout(self): + """ + verify that we can get a hanshake timeout. + """ + host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + public_host_key = RSAKey(data=host_key.asbytes()) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assertTrue(not event.is_set()) + self.tc.handshake_timeout = 0.000000000001 + self.ts.start_server(event, server) + self.assertRaises(EOFError, self.tc.connect, + hostkey=public_host_key, + username='slowdive', + password='pygmalion') |