summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/loop.py2
-rw-r--r--tests/test_client.py2
-rw-r--r--tests/test_kex.py120
-rwxr-xr-xtests/test_sftp.py3
-rw-r--r--tests/test_sftp_big.py18
-rw-r--r--tests/test_ssh_exception.py31
-rw-r--r--tests/test_transport.py27
7 files changed, 191 insertions, 12 deletions
diff --git a/tests/loop.py b/tests/loop.py
index 4f5dc163..e805ad96 100644
--- a/tests/loop.py
+++ b/tests/loop.py
@@ -37,9 +37,11 @@ class LoopSocket (object):
self.__cv = threading.Condition(self.__lock)
self.__timeout = None
self.__mate = None
+ self._closed = False
def close(self):
self.__unlink()
+ self._closed = True
try:
self.__lock.acquire()
self.__in_buffer = bytes()
diff --git a/tests/test_client.py b/tests/test_client.py
index e080221e..f71efd5a 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -75,7 +75,7 @@ class NullServer (paramiko.ServerInterface):
return paramiko.OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != 'yes':
+ if command != b'yes':
return False
return True
diff --git a/tests/test_kex.py b/tests/test_kex.py
index 56f1b7c7..19804fbf 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -26,7 +26,7 @@ import unittest
import paramiko.util
from paramiko.kex_group1 import KexGroup1
-from paramiko.kex_gex import KexGex
+from paramiko.kex_gex import KexGex, KexGexSHA256
from paramiko import Message
from paramiko.common import byte_chr
@@ -252,3 +252,121 @@ class KexTest (unittest.TestCase):
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
+
+ def test_7_gex_sha256_client(self):
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexGexSHA256(transport)
+ kex.start_kex()
+ x = b'22000004000000080000002000'
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
+
+ msg = Message()
+ msg.add_mpint(FakeModulusPack.P)
+ msg.add_mpint(FakeModulusPack.G)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
+ x = b'20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4'
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
+
+ msg = Message()
+ msg.add_string('fake-host-key')
+ msg.add_mpint(69)
+ msg.add_string('fake-sig')
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
+ H = b'AD1A9365A67B4496F05594AD1BF656E3CDA0851289A4C1AFF549FEAE50896DF4'
+ self.assertEqual(self.K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify)
+ self.assertTrue(transport._activated)
+
+ def test_8_gex_sha256_old_client(self):
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexGexSHA256(transport)
+ kex.start_kex(_test_old_style=True)
+ x = b'1E00000800'
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
+
+ msg = Message()
+ msg.add_mpint(FakeModulusPack.P)
+ msg.add_mpint(FakeModulusPack.G)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
+ x = b'20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4'
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
+
+ msg = Message()
+ msg.add_string('fake-host-key')
+ msg.add_mpint(69)
+ msg.add_string('fake-sig')
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
+ H = b'518386608B15891AE5237DEE08DCADDE76A0BCEFCE7F6DB3AD66BC41D256DFE5'
+ self.assertEqual(self.K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify)
+ self.assertTrue(transport._activated)
+
+ def test_9_gex_sha256_server(self):
+ transport = FakeTransport()
+ transport.server_mode = True
+ kex = KexGexSHA256(transport)
+ kex.start_kex()
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect)
+
+ msg = Message()
+ msg.add_int(1024)
+ msg.add_int(2048)
+ msg.add_int(4096)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg)
+ x = b'1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102'
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
+
+ msg = Message()
+ msg.add_mpint(12345)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
+ H = b'CCAC0497CF0ABA1DBF55E1A3995D17F4CC31824B0E8D95CDF8A06F169D050D80'
+ x = b'210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967'
+ self.assertEqual(K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertTrue(transport._activated)
+
+ def test_10_gex_sha256_server_with_old_client(self):
+ transport = FakeTransport()
+ transport.server_mode = True
+ kex = KexGexSHA256(transport)
+ kex.start_kex()
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect)
+
+ msg = Message()
+ msg.add_int(2048)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg)
+ x = b'1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102'
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
+
+ msg = Message()
+ msg.add_mpint(12345)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
+ H = b'3DDD2AD840AD095E397BA4D0573972DC60F6461FD38A187CACA6615A5BC8ADBB'
+ x = b'210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967'
+ self.assertEqual(K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertTrue(transport._activated)
+
+
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index 6bce1794..ff146ade 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -696,7 +696,8 @@ class SFTPTest (unittest.TestCase):
f.readv([(0, 12)])
with sftp.open(FOLDER + '/zero', 'r') as f:
- f.prefetch()
+ file_size = f.stat().st_size
+ f.prefetch(file_size)
f.read(100)
finally:
sftp.unlink(FOLDER + '/zero')
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index abed27b8..cfad5682 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -132,7 +132,8 @@ class BigSFTPTest (unittest.TestCase):
start = time.time()
with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
- f.prefetch()
+ file_size = f.stat().st_size
+ f.prefetch(file_size)
# read on odd boundaries to make sure the bytes aren't getting scrambled
n = 0
@@ -171,7 +172,8 @@ class BigSFTPTest (unittest.TestCase):
chunk = 793
for i in range(10):
with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
- f.prefetch()
+ file_size = f.stat().st_size
+ f.prefetch(file_size)
base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
offsets = [base_offset + j * chunk for j in range(100)]
# randomly seek around and read them out
@@ -245,9 +247,11 @@ class BigSFTPTest (unittest.TestCase):
for i in range(10):
with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
- f.prefetch()
+ file_size = f.stat().st_size
+ f.prefetch(file_size)
with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
- f.prefetch()
+ file_size = f.stat().st_size
+ f.prefetch(file_size)
for n in range(1024):
data = f.read(1024)
self.assertEqual(data, kblob)
@@ -275,7 +279,8 @@ class BigSFTPTest (unittest.TestCase):
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
- f.prefetch()
+ file_size = f.stat().st_size
+ f.prefetch(file_size)
data = f.read(1024)
self.assertEqual(data, kblob)
@@ -353,7 +358,8 @@ class BigSFTPTest (unittest.TestCase):
# try to read it too.
with sftp.open('%s/hongry.txt' % FOLDER, 'r', 128 * 1024) as f:
- f.prefetch()
+ file_size = f.stat().st_size
+ f.prefetch(file_size)
total = 0
while total < 1024 * 1024:
total += len(f.read(32 * 1024))
diff --git a/tests/test_ssh_exception.py b/tests/test_ssh_exception.py
new file mode 100644
index 00000000..18f2a97d
--- /dev/null
+++ b/tests/test_ssh_exception.py
@@ -0,0 +1,31 @@
+import pickle
+import unittest
+
+from paramiko.ssh_exception import NoValidConnectionsError
+
+
+class NoValidConnectionsErrorTest (unittest.TestCase):
+
+ def test_pickling(self):
+ # Regression test for https://github.com/paramiko/paramiko/issues/617
+ exc = NoValidConnectionsError({('127.0.0.1', '22'): Exception()})
+ new_exc = pickle.loads(pickle.dumps(exc))
+ self.assertEqual(type(exc), type(new_exc))
+ self.assertEqual(str(exc), str(new_exc))
+ self.assertEqual(exc.args, new_exc.args)
+
+ def test_error_message_for_single_host(self):
+ exc = NoValidConnectionsError({('127.0.0.1', '22'): Exception()})
+ assert "Unable to connect to port 22 on 127.0.0.1" in str(exc)
+
+ def test_error_message_for_two_hosts(self):
+ exc = NoValidConnectionsError({('127.0.0.1', '22'): Exception(),
+ ('::1', '22'): Exception()})
+ assert "Unable to connect to port 22 on 127.0.0.1 or ::1" in str(exc)
+
+ def test_error_message_for_multiple_hosts(self):
+ exc = NoValidConnectionsError({('127.0.0.1', '22'): Exception(),
+ ('::1', '22'): Exception(),
+ ('10.0.0.42', '22'): Exception()})
+ exp = "Unable to connect to port 22 on 10.0.0.42, 127.0.0.1 or ::1"
+ assert exp in str(exc)
diff --git a/tests/test_transport.py b/tests/test_transport.py
index bf4bac5c..d81ad8f3 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -28,6 +28,7 @@ import socket
import time
import threading
import random
+from hashlib import sha1
import unittest
from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \
@@ -77,7 +78,7 @@ class NullServer (ServerInterface):
return OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != 'yes':
+ if command != b'yes':
return False
return True
@@ -251,7 +252,7 @@ class TransportTest(unittest.TestCase):
chan = self.tc.open_session()
schan = self.ts.accept(1.0)
try:
- chan.exec_command('no')
+ chan.exec_command(b'command contains \xfc and is not a valid UTF-8 string')
self.assertTrue(False)
except SSHException:
pass
@@ -447,9 +448,11 @@ class TransportTest(unittest.TestCase):
bytes = self.tc.packetizer._Packetizer__sent_bytes
chan.send('x' * 1024)
bytes2 = self.tc.packetizer._Packetizer__sent_bytes
+ block_size = self.tc._cipher_info[self.tc.local_cipher]['block-size']
+ mac_size = self.tc._mac_info[self.tc.local_mac]['size']
# tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :)
self.assertTrue(bytes2 - bytes < 1024)
- self.assertEqual(52, bytes2 - bytes)
+ self.assertEqual(16 + block_size + mac_size, bytes2 - bytes)
chan.close()
schan.close()
@@ -825,3 +828,21 @@ class TransportTest(unittest.TestCase):
hostkey=public_host_key,
username='slowdive',
password='pygmalion')
+
+ def test_M_select_after_close(self):
+ """
+ verify that select works when a channel is already closed.
+ """
+ self.setup_test_server()
+ chan = self.tc.open_session()
+ chan.invoke_shell()
+ schan = self.ts.accept(1.0)
+ schan.close()
+
+ # give client a moment to receive close notification
+ time.sleep(0.1)
+
+ r, w, e = select.select([chan], [], [], 0.1)
+ self.assertEqual([chan], r)
+ self.assertEqual([], w)
+ self.assertEqual([], e)