summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/test_buffered_pipe.py9
-rw-r--r--tests/test_client.py164
-rwxr-xr-xtests/test_file.py10
-rw-r--r--tests/test_packetizer.py46
-rwxr-xr-xtests/test_sftp.py24
-rw-r--r--tests/test_transport.py134
-rw-r--r--tests/test_util.py123
-rw-r--r--tests/util.py10
8 files changed, 395 insertions, 125 deletions
diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py
index a53081a9..3b8f97ad 100644
--- a/tests/test_buffered_pipe.py
+++ b/tests/test_buffered_pipe.py
@@ -22,11 +22,10 @@ Some unit tests for BufferedPipe.
import threading
import time
+import unittest
from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
from paramiko import pipe
-from tests.util import ParamikoTest
-
def delay_thread(p):
p.feed('a')
@@ -40,7 +39,7 @@ def close_thread(p):
p.close()
-class BufferedPipeTest(ParamikoTest):
+class BufferedPipeTest(unittest.TestCase):
def test_1_buffered_pipe(self):
p = BufferedPipe()
self.assertTrue(not p.read_ready())
@@ -48,12 +47,12 @@ class BufferedPipeTest(ParamikoTest):
self.assertTrue(p.read_ready())
data = p.read(6)
self.assertEqual(b'hello.', data)
-
+
p.feed('plus/minus')
self.assertEqual(b'plu', p.read(3))
self.assertEqual(b's/m', p.read(3))
self.assertEqual(b'inus', p.read(4))
-
+
p.close()
self.assertTrue(not p.read_ready())
self.assertEqual(b'', p.read(1))
diff --git a/tests/test_client.py b/tests/test_client.py
index 7e5c80b4..6fda7f5e 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -27,12 +27,25 @@ import unittest
import weakref
import warnings
import os
+import time
from tests.util import test_path
import paramiko
-from paramiko.common import PY2
+from paramiko.common import PY2, b
+from paramiko.ssh_exception import SSHException
+
+
+FINGERPRINTS = {
+ 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c',
+ 'ssh-rsa': b'\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5',
+ 'ecdsa-sha2-nistp256': b'\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60',
+}
class NullServer (paramiko.ServerInterface):
+ def __init__(self, *args, **kwargs):
+ # Allow tests to enable/disable specific key types
+ self.__allowed_keys = kwargs.pop('allowed_keys', [])
+ super(NullServer, self).__init__(*args, **kwargs)
def get_allowed_auths(self, username):
if username == 'slowdive':
@@ -45,7 +58,14 @@ class NullServer (paramiko.ServerInterface):
return paramiko.AUTH_FAILED
def check_auth_publickey(self, username, key):
- if (key.get_name() == 'ssh-dss') and key.get_fingerprint() == b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c':
+ try:
+ expected = FINGERPRINTS[key.get_name()]
+ except KeyError:
+ return paramiko.AUTH_FAILED
+ if (
+ key.get_name() in self.__allowed_keys and
+ key.get_fingerprint() == expected
+ ):
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
@@ -72,32 +92,46 @@ class SSHClientTest (unittest.TestCase):
if hasattr(self, attr):
getattr(self, attr).close()
- def _run(self):
+ def _run(self, allowed_keys=None, delay=0):
+ if allowed_keys is None:
+ allowed_keys = FINGERPRINTS.keys()
self.socks, addr = self.sockl.accept()
self.ts = paramiko.Transport(self.socks)
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
self.ts.add_server_key(host_key)
- server = NullServer()
+ server = NullServer(allowed_keys=allowed_keys)
+ if delay:
+ time.sleep(delay)
self.ts.start_server(self.event, server)
- def test_1_client(self):
+ def _test_connection(self, **kwargs):
"""
- verify that the SSHClient stuff works too.
+ (Most) kwargs get passed directly into SSHClient.connect().
+
+ The exception is ``allowed_keys`` which is stripped and handed to the
+ ``NullServer`` used for testing.
"""
- threading.Thread(target=self._run).start()
+ run_kwargs = {'allowed_keys': kwargs.pop('allowed_keys', None)}
+ # Server setup
+ threading.Thread(target=self._run, kwargs=run_kwargs).start()
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+ # Client setup
self.tc = paramiko.SSHClient()
self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
- self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
+ # Actual connection
+ self.tc.connect(self.addr, self.port, username='slowdive', **kwargs)
+
+ # Authentication successful?
self.event.wait(1.0)
self.assertTrue(self.event.isSet())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
+ # Command execution functions?
stdin, stdout, stderr = self.tc.exec_command('yes')
schan = self.ts.accept(1.0)
@@ -110,61 +144,71 @@ class SSHClientTest (unittest.TestCase):
self.assertEqual('This is on stderr.\n', stderr.readline())
self.assertEqual('', stderr.readline())
+ # Cleanup
stdin.close()
stdout.close()
stderr.close()
+ def test_1_client(self):
+ """
+ verify that the SSHClient stuff works too.
+ """
+ self._test_connection(password='pygmalion')
+
def test_2_client_dsa(self):
"""
verify that SSHClient works with a DSA key.
"""
- threading.Thread(target=self._run).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
- public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+ self._test_connection(key_filename=test_path('test_dss.key'))
- self.tc = paramiko.SSHClient()
- self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
- self.tc.connect(self.addr, self.port, username='slowdive', key_filename=test_path('test_dss.key'))
-
- self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
- self.assertTrue(self.ts.is_active())
- self.assertEqual('slowdive', self.ts.get_username())
- self.assertEqual(True, self.ts.is_authenticated())
-
- stdin, stdout, stderr = self.tc.exec_command('yes')
- schan = self.ts.accept(1.0)
-
- schan.send('Hello there.\n')
- schan.send_stderr('This is on stderr.\n')
- schan.close()
-
- self.assertEqual('Hello there.\n', stdout.readline())
- self.assertEqual('', stdout.readline())
- self.assertEqual('This is on stderr.\n', stderr.readline())
- self.assertEqual('', stderr.readline())
+ def test_client_rsa(self):
+ """
+ verify that SSHClient works with an RSA key.
+ """
+ self._test_connection(key_filename=test_path('test_rsa.key'))
- stdin.close()
- stdout.close()
- stderr.close()
+ def test_2_5_client_ecdsa(self):
+ """
+ verify that SSHClient works with an ECDSA key.
+ """
+ self._test_connection(key_filename=test_path('test_ecdsa.key'))
def test_3_multiple_key_files(self):
"""
verify that SSHClient accepts and tries multiple key files.
"""
- threading.Thread(target=self._run).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
- public_host_key = paramiko.RSAKey(data=host_key.asbytes())
-
- self.tc = paramiko.SSHClient()
- self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
- self.tc.connect(self.addr, self.port, username='slowdive', key_filename=[test_path('test_rsa.key'), test_path('test_dss.key')])
-
- self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
- self.assertTrue(self.ts.is_active())
- self.assertEqual('slowdive', self.ts.get_username())
- self.assertEqual(True, self.ts.is_authenticated())
+ # This is dumb :(
+ types_ = {
+ 'rsa': 'ssh-rsa',
+ 'dss': 'ssh-dss',
+ 'ecdsa': 'ecdsa-sha2-nistp256',
+ }
+ # Various combos of attempted & valid keys
+ # TODO: try every possible combo using itertools functions
+ for attempt, accept in (
+ (['rsa', 'dss'], ['dss']), # Original test #3
+ (['dss', 'rsa'], ['dss']), # Ordering matters sometimes, sadly
+ (['dss', 'rsa', 'ecdsa'], ['dss']), # Try ECDSA but fail
+ (['rsa', 'ecdsa'], ['ecdsa']), # ECDSA success
+ ):
+ self._test_connection(
+ key_filename=[
+ test_path('test_{0}.key'.format(x)) for x in attempt
+ ],
+ allowed_keys=[types_[x] for x in accept],
+ )
+
+ def test_multiple_key_files_failure(self):
+ """
+ Expect failure when multiple keys in play and none are accepted
+ """
+ # Until #387 is fixed we have to catch a high-up exception since
+ # various platforms trigger different errors here >_<
+ self.assertRaises(SSHException,
+ self._test_connection,
+ key_filename=[test_path('test_rsa.key')],
+ allowed_keys=['ecdsa-sha2-nistp256'],
+ )
def test_4_auto_add_policy(self):
"""
@@ -221,6 +265,8 @@ class SSHClientTest (unittest.TestCase):
"""
# Unclear why this is borked on Py3, but it is, and does not seem worth
# pursuing at the moment.
+ # XXX: It's the release of the references to e.g packetizer that fails
+ # in py3...
if not PY2:
return
threading.Thread(target=self._run).start()
@@ -252,3 +298,25 @@ class SSHClientTest (unittest.TestCase):
gc.collect()
self.assertTrue(p() is None)
+
+ def test_7_banner_timeout(self):
+ """
+ verify that the SSHClient has a configurable banner timeout.
+ """
+ # Start the thread with a 1 second wait.
+ threading.Thread(target=self._run, kwargs={'delay': 1}).start()
+ host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+
+ self.tc = paramiko.SSHClient()
+ self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
+ # Connect with a half second banner timeout.
+ self.assertRaises(
+ paramiko.SSHException,
+ self.tc.connect,
+ self.addr,
+ self.port,
+ username='slowdive',
+ password='pygmalion',
+ banner_timeout=0.5
+ )
diff --git a/tests/test_file.py b/tests/test_file.py
index c6edd7af..22a34aca 100755
--- a/tests/test_file.py
+++ b/tests/test_file.py
@@ -23,6 +23,7 @@ Some unit tests for the BufferedFile abstraction.
import unittest
from paramiko.file import BufferedFile
from paramiko.common import linefeed_byte, crlf, cr_byte
+import sys
class LoopbackFile (BufferedFile):
@@ -151,6 +152,15 @@ class BufferedFileTest (unittest.TestCase):
b'need to close them again.\n')
f.close()
+ def test_8_buffering(self):
+ """
+ verify that buffered objects can be written
+ """
+ if sys.version_info[0] == 2:
+ f = LoopbackFile('r+', 16)
+ f.write(buffer(b'Too small.'))
+ f.close()
+
if __name__ == '__main__':
from unittest import main
main()
diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py
index a8c0f973..8faec03c 100644
--- a/tests/test_packetizer.py
+++ b/tests/test_packetizer.py
@@ -74,3 +74,49 @@ class PacketizerTest (unittest.TestCase):
self.assertEqual(100, m.get_int())
self.assertEqual(1, m.get_int())
self.assertEqual(900, m.get_int())
+
+ def test_3_closed(self):
+ rsock = LoopSocket()
+ wsock = LoopSocket()
+ rsock.link(wsock)
+ p = Packetizer(wsock)
+ p.set_log(util.get_logger('paramiko.transport'))
+ p.set_hexdump(True)
+ cipher = AES.new(zero_byte * 16, AES.MODE_CBC, x55 * 16)
+ p.set_outbound_cipher(cipher, 16, sha1, 12, x1f * 20)
+
+ # message has to be at least 16 bytes long, so we'll have at least one
+ # block of data encrypted that contains zero random padding bytes
+ m = Message()
+ m.add_byte(byte_chr(100))
+ m.add_int(100)
+ m.add_int(1)
+ m.add_int(900)
+ wsock.send = lambda x: 0
+ from functools import wraps
+ import errno
+ import os
+ import signal
+
+ class TimeoutError(Exception):
+ pass
+
+ def timeout(seconds=1, error_message=os.strerror(errno.ETIME)):
+ def decorator(func):
+ def _handle_timeout(signum, frame):
+ raise TimeoutError(error_message)
+
+ def wrapper(*args, **kwargs):
+ signal.signal(signal.SIGALRM, _handle_timeout)
+ signal.alarm(seconds)
+ try:
+ result = func(*args, **kwargs)
+ finally:
+ signal.alarm(0)
+ return result
+
+ return wraps(func)(wrapper)
+
+ return decorator
+ send = timeout()(p.send_message)
+ self.assertRaises(EOFError, send, m)
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index 2b6aa3b6..1ae9781d 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -279,8 +279,8 @@ class SFTPTest (unittest.TestCase):
def test_7_listdir(self):
"""
- verify that a folder can be created, a bunch of files can be placed in it,
- and those files show up in sftp.listdir.
+ verify that a folder can be created, a bunch of files can be placed in
+ it, and those files show up in sftp.listdir.
"""
try:
sftp.open(FOLDER + '/duck.txt', 'w').close()
@@ -298,6 +298,26 @@ class SFTPTest (unittest.TestCase):
sftp.remove(FOLDER + '/fish.txt')
sftp.remove(FOLDER + '/tertiary.py')
+ def test_7_5_listdir_iter(self):
+ """
+ listdir_iter version of above test
+ """
+ try:
+ sftp.open(FOLDER + '/duck.txt', 'w').close()
+ sftp.open(FOLDER + '/fish.txt', 'w').close()
+ sftp.open(FOLDER + '/tertiary.py', 'w').close()
+
+ x = [x.filename for x in sftp.listdir_iter(FOLDER)]
+ self.assertEqual(len(x), 3)
+ self.assertTrue('duck.txt' in x)
+ self.assertTrue('fish.txt' in x)
+ self.assertTrue('tertiary.py' in x)
+ self.assertTrue('random' not in x)
+ finally:
+ sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(FOLDER + '/fish.txt')
+ sftp.remove(FOLDER + '/tertiary.py')
+
def test_8_setstat(self):
"""
verify that the setstat functions (chown, chmod, utime, truncate) work.
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 485a18e8..344d64b8 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -26,16 +26,19 @@ import socket
import time
import threading
import random
+import unittest
from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \
SSHException, ChannelException
from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
-from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST
+from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, \
+ MIN_PACKET_SIZE, MAX_WINDOW_SIZE, \
+ DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE
from paramiko.py3compat import bytes
from paramiko.message import Message
from tests.loop import LoopSocket
-from tests.util import ParamikoTest, test_path
+from tests.util import test_path
LONG_BANNER = """\
@@ -55,7 +58,7 @@ class NullServer (ServerInterface):
paranoid_did_password = False
paranoid_did_public_key = False
paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key'))
-
+
def get_allowed_auths(self, username):
if username == 'slowdive':
return 'publickey,password'
@@ -78,24 +81,24 @@ class NullServer (ServerInterface):
def check_channel_shell_request(self, channel):
return True
-
+
def check_global_request(self, kind, msg):
self._global_request = kind
return False
-
+
def check_channel_x11_request(self, channel, single_connection, auth_protocol, auth_cookie, screen_number):
self._x11_single_connection = single_connection
self._x11_auth_protocol = auth_protocol
self._x11_auth_cookie = auth_cookie
self._x11_screen_number = screen_number
return True
-
+
def check_port_forward_request(self, addr, port):
self._listen = socket.socket()
self._listen.bind(('127.0.0.1', 0))
self._listen.listen(1)
return self._listen.getsockname()[1]
-
+
def cancel_port_forward_request(self, addr, port):
self._listen.close()
self._listen = None
@@ -105,7 +108,7 @@ class NullServer (ServerInterface):
return OPEN_SUCCEEDED
-class TransportTest(ParamikoTest):
+class TransportTest(unittest.TestCase):
def setUp(self):
self.socks = LoopSocket()
self.sockc = LoopSocket()
@@ -123,12 +126,12 @@ class TransportTest(ParamikoTest):
host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
-
+
if client_options is not None:
client_options(self.tc.get_security_options())
if server_options is not None:
server_options(self.ts.get_security_options())
-
+
event = threading.Event()
self.server = NullServer()
self.assertTrue(not event.isSet())
@@ -155,7 +158,7 @@ class TransportTest(ParamikoTest):
self.assertTrue(False)
except TypeError:
pass
-
+
def test_2_compute_key(self):
self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929
self.tc.H = b'\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3'
@@ -208,7 +211,7 @@ class TransportTest(ParamikoTest):
event.wait(1.0)
self.assertTrue(event.isSet())
self.assertTrue(self.ts.is_active())
-
+
def test_4_special(self):
"""
verify that the client can demand odd handshake settings, and can
@@ -222,7 +225,7 @@ class TransportTest(ParamikoTest):
self.assertEqual('aes256-cbc', self.tc.remote_cipher)
self.assertEqual(12, self.tc.packetizer.get_mac_size_out())
self.assertEqual(12, self.tc.packetizer.get_mac_size_in())
-
+
self.tc.send_ignore(1024)
self.tc.renegotiate_keys()
self.ts.send_ignore(1024)
@@ -236,7 +239,7 @@ class TransportTest(ParamikoTest):
self.tc.set_keepalive(1)
time.sleep(2)
self.assertEqual('keepalive@lag.net', self.server._global_request)
-
+
def test_6_exec_command(self):
"""
verify that exec_command() does something reasonable.
@@ -250,7 +253,7 @@ class TransportTest(ParamikoTest):
self.assertTrue(False)
except SSHException:
pass
-
+
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
@@ -264,7 +267,7 @@ class TransportTest(ParamikoTest):
f = chan.makefile_stderr()
self.assertEqual('This is on stderr.\n', f.readline())
self.assertEqual('', f.readline())
-
+
# now try it with combined stdout/stderr
chan = self.tc.open_session()
chan.exec_command('yes')
@@ -273,7 +276,7 @@ class TransportTest(ParamikoTest):
schan.send_stderr('This is on stderr.\n')
schan.close()
- chan.set_combine_stderr(True)
+ chan.set_combine_stderr(True)
f = chan.makefile()
self.assertEqual('Hello there.\n', f.readline())
self.assertEqual('This is on stderr.\n', f.readline())
@@ -320,7 +323,7 @@ class TransportTest(ParamikoTest):
schan.shutdown_write()
schan.send_exit_status(23)
schan.close()
-
+
f = chan.makefile()
self.assertEqual('Hello there.\n', f.readline())
self.assertEqual('', f.readline())
@@ -342,14 +345,14 @@ class TransportTest(ParamikoTest):
chan.invoke_shell()
schan = self.ts.accept(1.0)
- # nothing should be ready
+ # nothing should be ready
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
self.assertEqual([], w)
self.assertEqual([], e)
-
+
schan.send('hello\n')
-
+
# something should be ready now (give it 1 second to appear)
for i in range(10):
r, w, e = select.select([chan], [], [], 0.1)
@@ -361,7 +364,7 @@ class TransportTest(ParamikoTest):
self.assertEqual([], e)
self.assertEqual(b'hello\n', chan.recv(6))
-
+
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
@@ -369,7 +372,7 @@ class TransportTest(ParamikoTest):
self.assertEqual([], e)
schan.close()
-
+
# detect eof?
for i in range(10):
r, w, e = select.select([chan], [], [], 0.1)
@@ -380,14 +383,14 @@ class TransportTest(ParamikoTest):
self.assertEqual([], w)
self.assertEqual([], e)
self.assertEqual(bytes(), chan.recv(16))
-
+
# make sure the pipe is still open for now...
p = chan._pipe
self.assertEqual(False, p._closed)
chan.close()
# ...and now is closed.
self.assertEqual(True, p._closed)
-
+
def test_B_renegotiate(self):
"""
verify that a transport can correctly renegotiate mid-stream.
@@ -402,7 +405,7 @@ class TransportTest(ParamikoTest):
for i in range(20):
chan.send('x' * 1024)
chan.close()
-
+
# allow a few seconds for the rekeying to complete
for i in range(50):
if self.tc.H != self.tc.session_id:
@@ -441,28 +444,28 @@ class TransportTest(ParamikoTest):
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
-
+
requested = []
def handler(c, addr_port):
addr, port = addr_port
requested.append((addr, port))
self.tc._queue_incoming_channel(c)
-
+
self.assertEqual(None, getattr(self.server, '_x11_screen_number', None))
cookie = chan.request_x11(0, single_connection=True, handler=handler)
self.assertEqual(0, self.server._x11_screen_number)
self.assertEqual('MIT-MAGIC-COOKIE-1', self.server._x11_auth_protocol)
self.assertEqual(cookie, self.server._x11_auth_cookie)
self.assertEqual(True, self.server._x11_single_connection)
-
+
x11_server = self.ts.open_x11_channel(('localhost', 6093))
x11_client = self.tc.accept()
self.assertEqual('localhost', requested[0][0])
self.assertEqual(6093, requested[0][1])
-
+
x11_server.send('hello')
self.assertEqual(b'hello', x11_client.recv(5))
-
+
x11_server.close()
x11_client.close()
chan.close()
@@ -477,13 +480,13 @@ class TransportTest(ParamikoTest):
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
-
+
requested = []
def handler(c, origin_addr_port, server_addr_port):
requested.append(origin_addr_port)
requested.append(server_addr_port)
self.tc._queue_incoming_channel(c)
-
+
port = self.tc.request_port_forward('127.0.0.1', 0, handler)
self.assertEqual(port, self.server._listen.getsockname()[1])
@@ -492,14 +495,14 @@ class TransportTest(ParamikoTest):
ss, _ = self.server._listen.accept()
sch = self.ts.open_forwarded_tcpip_channel(ss.getsockname(), ss.getpeername())
cch = self.tc.accept()
-
+
sch.send('hello')
self.assertEqual(b'hello', cch.recv(5))
sch.close()
cch.close()
ss.close()
cs.close()
-
+
# now cancel it.
self.tc.cancel_port_forward('127.0.0.1', port)
self.assertTrue(self.server._listen is None)
@@ -513,7 +516,7 @@ class TransportTest(ParamikoTest):
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
-
+
# open a port on the "server" that the client will ask to forward to.
greeting_server = socket.socket()
greeting_server.bind(('127.0.0.1', 0))
@@ -524,13 +527,13 @@ class TransportTest(ParamikoTest):
sch = self.ts.accept(1.0)
cch = socket.socket()
cch.connect(self.server._tcpip_dest)
-
+
ss, _ = greeting_server.accept()
ss.send(b'Hello!\n')
ss.close()
sch.send(cch.recv(8192))
sch.close()
-
+
self.assertEqual(b'Hello!\n', cs.recv(7))
cs.close()
@@ -544,14 +547,14 @@ class TransportTest(ParamikoTest):
chan.invoke_shell()
schan = self.ts.accept(1.0)
- # nothing should be ready
+ # nothing should be ready
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
self.assertEqual([], w)
self.assertEqual([], e)
-
+
schan.send_stderr('hello\n')
-
+
# something should be ready now (give it 1 second to appear)
for i in range(10):
r, w, e = select.select([chan], [], [], 0.1)
@@ -563,7 +566,7 @@ class TransportTest(ParamikoTest):
self.assertEqual([], e)
self.assertEqual(b'hello\n', chan.recv_stderr(6))
-
+
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
@@ -585,12 +588,13 @@ class TransportTest(ParamikoTest):
self.assertEqual(chan.send_ready(), True)
total = 0
K = '*' * 1024
- while total < 1024 * 1024:
+ limit = 1+(64 * 2 ** 15)
+ while total < limit:
chan.send(K)
total += len(K)
if not chan.send_ready():
break
- self.assertTrue(total < 1024 * 1024)
+ self.assertTrue(total < limit)
schan.close()
chan.close()
@@ -599,10 +603,10 @@ class TransportTest(ParamikoTest):
def test_I_rekey_deadlock(self):
"""
Regression test for deadlock when in-transit messages are received after MSG_KEXINIT is sent
-
+
Note: When this test fails, it may leak threads.
"""
-
+
# Test for an obscure deadlocking bug that can occur if we receive
# certain messages while initiating a key exchange.
#
@@ -619,7 +623,7 @@ class TransportTest(ParamikoTest):
# NeedRekeyException.
# 4. In response to NeedRekeyException, the transport thread sends
# MSG_KEXINIT to the remote host.
- #
+ #
# On the remote host (using any SSH implementation):
# 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST is sent.
# 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is sent.
@@ -654,7 +658,7 @@ class TransportTest(ParamikoTest):
self.done_event = done_event
self.watchdog_event = threading.Event()
self.last = None
-
+
def run(self):
try:
for i in range(1, 1+self.iterations):
@@ -666,7 +670,7 @@ class TransportTest(ParamikoTest):
finally:
self.done_event.set()
self.watchdog_event.set()
-
+
class ReceiveThread(threading.Thread):
def __init__(self, chan, done_event):
threading.Thread.__init__(self, None, None, self.__class__.__name__)
@@ -674,7 +678,7 @@ class TransportTest(ParamikoTest):
self.chan = chan
self.done_event = done_event
self.watchdog_event = threading.Event()
-
+
def run(self):
try:
while not self.done_event.isSet():
@@ -687,10 +691,10 @@ class TransportTest(ParamikoTest):
finally:
self.done_event.set()
self.watchdog_event.set()
-
+
self.setup_test_server()
self.ts.packetizer.REKEY_BYTES = 2048
-
+
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
@@ -712,7 +716,7 @@ class TransportTest(ParamikoTest):
self._send_message(m2)
return _negotiate_keys(self, m)
self.tc._handler_table[MSG_KEXINIT] = _negotiate_keys_wrapper
-
+
# Parameters for the test
iterations = 500 # The deadlock does not happen every time, but it
# should after many iterations.
@@ -724,12 +728,12 @@ class TransportTest(ParamikoTest):
# Start the sending thread
st = SendThread(schan, iterations, done_event)
st.start()
-
+
# Start the receiving thread
rt = ReceiveThread(chan, done_event)
rt.start()
- # Act as a watchdog timer, checking
+ # Act as a watchdog timer, checking
deadlocked = False
while not deadlocked and not done_event.isSet():
for event in (st.watchdog_event, rt.watchdog_event):
@@ -740,7 +744,7 @@ class TransportTest(ParamikoTest):
deadlocked = True
break
event.clear()
-
+
# Tell the threads to stop (if they haven't already stopped). Note
# that if one or more threads are deadlocked, they might hang around
# forever (until the process exits).
@@ -752,3 +756,21 @@ class TransportTest(ParamikoTest):
# Close the channels
schan.close()
chan.close()
+
+ def test_J_sanitze_packet_size(self):
+ """
+ verify that we conform to the rfc of packet and window sizes.
+ """
+ for val, correct in [(32767, MIN_PACKET_SIZE),
+ (None, DEFAULT_MAX_PACKET_SIZE),
+ (2**32, MAX_WINDOW_SIZE)]:
+ self.assertEqual(self.tc._sanitize_packet_size(val), correct)
+
+ def test_K_sanitze_window_size(self):
+ """
+ verify that we conform to the rfc of packet and window sizes.
+ """
+ for val, correct in [(32767, MIN_PACKET_SIZE),
+ (None, DEFAULT_WINDOW_SIZE),
+ (2**32, MAX_WINDOW_SIZE)]:
+ self.assertEqual(self.tc._sanitize_window_size(val), correct)
diff --git a/tests/test_util.py b/tests/test_util.py
index 69c75518..35e15765 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -24,13 +24,12 @@ from binascii import hexlify
import errno
import os
from hashlib import sha1
+import unittest
import paramiko.util
from paramiko.util import lookup_ssh_host_config as host_config
from paramiko.py3compat import StringIO, byte_ord
-from tests.util import ParamikoTest
-
test_config_file = """\
Host *
User robey
@@ -41,7 +40,12 @@ Host *.example.com
\tUser bjork
Port=3333
Host *
- \t \t Crazy something dumb
+"""
+
+dont_strip_whitespace_please = "\t \t Crazy something dumb "
+
+test_config_file += dont_strip_whitespace_please
+test_config_file += """
Host spoo.example.com
Crazy something else
"""
@@ -60,7 +64,7 @@ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
from paramiko import *
-class UtilTest(ParamikoTest):
+class UtilTest(unittest.TestCase):
def test_1_import(self):
"""
verify that all the classes can be imported from paramiko.
@@ -333,3 +337,114 @@ IdentityFile something_%l_using_fqdn
"""
config = paramiko.util.parse_ssh_config(StringIO(test_config))
assert config.lookup('meh') # will die during lookup() if bug regresses
+
+ def test_clamp_value(self):
+ self.assertEqual(32768, paramiko.util.clamp_value(32767, 32768, 32769))
+ self.assertEqual(32767, paramiko.util.clamp_value(32767, 32765, 32769))
+ self.assertEqual(32769, paramiko.util.clamp_value(32767, 32770, 32769))
+
+ def test_13_config_dos_crlf_succeeds(self):
+ config_file = StringIO("host abcqwerty\r\nHostName 127.0.0.1\r\n")
+ config = paramiko.SSHConfig()
+ config.parse(config_file)
+ self.assertEqual(config.lookup("abcqwerty")["hostname"], "127.0.0.1")
+
+ def test_quoted_host_names(self):
+ test_config_file = """\
+Host "param pam" param "pam"
+ Port 1111
+
+Host "param2"
+ Port 2222
+
+Host param3 parara
+ Port 3333
+
+Host param4 "p a r" "p" "par" para
+ Port 4444
+"""
+ res = {
+ 'param pam': {'hostname': 'param pam', 'port': '1111'},
+ 'param': {'hostname': 'param', 'port': '1111'},
+ 'pam': {'hostname': 'pam', 'port': '1111'},
+
+ 'param2': {'hostname': 'param2', 'port': '2222'},
+
+ 'param3': {'hostname': 'param3', 'port': '3333'},
+ 'parara': {'hostname': 'parara', 'port': '3333'},
+
+ 'param4': {'hostname': 'param4', 'port': '4444'},
+ 'p a r': {'hostname': 'p a r', 'port': '4444'},
+ 'p': {'hostname': 'p', 'port': '4444'},
+ 'par': {'hostname': 'par', 'port': '4444'},
+ 'para': {'hostname': 'para', 'port': '4444'},
+ }
+ f = StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ for host, values in res.items():
+ self.assertEquals(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ values
+ )
+
+ def test_quoted_params_in_config(self):
+ test_config_file = """\
+Host "param pam" param "pam"
+ IdentityFile id_rsa
+
+Host "param2"
+ IdentityFile "test rsa key"
+
+Host param3 parara
+ IdentityFile id_rsa
+ IdentityFile "test rsa key"
+"""
+ res = {
+ 'param pam': {'hostname': 'param pam', 'identityfile': ['id_rsa']},
+ 'param': {'hostname': 'param', 'identityfile': ['id_rsa']},
+ 'pam': {'hostname': 'pam', 'identityfile': ['id_rsa']},
+
+ 'param2': {'hostname': 'param2', 'identityfile': ['test rsa key']},
+
+ 'param3': {'hostname': 'param3', 'identityfile': ['id_rsa', 'test rsa key']},
+ 'parara': {'hostname': 'parara', 'identityfile': ['id_rsa', 'test rsa key']},
+ }
+ f = StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ for host, values in res.items():
+ self.assertEquals(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ values
+ )
+
+ def test_quoted_host_in_config(self):
+ conf = SSHConfig()
+ correct_data = {
+ 'param': ['param'],
+ '"param"': ['param'],
+
+ 'param pam': ['param', 'pam'],
+ '"param" "pam"': ['param', 'pam'],
+ '"param" pam': ['param', 'pam'],
+ 'param "pam"': ['param', 'pam'],
+
+ 'param "pam" p': ['param', 'pam', 'p'],
+ '"param" pam "p"': ['param', 'pam', 'p'],
+
+ '"pa ram"': ['pa ram'],
+ '"pa ram" pam': ['pa ram', 'pam'],
+ 'param "p a m"': ['param', 'p a m'],
+ }
+ incorrect_data = [
+ 'param"',
+ '"param',
+ 'param "pam',
+ 'param "pam" "p a',
+ ]
+ for host, values in correct_data.items():
+ self.assertEquals(
+ conf._get_hosts(host),
+ values
+ )
+ for host in incorrect_data:
+ self.assertRaises(Exception, conf._get_hosts, host)
diff --git a/tests/util.py b/tests/util.py
index 66d2696c..b546a7e1 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -1,17 +1,7 @@
import os
-import unittest
root_path = os.path.dirname(os.path.realpath(__file__))
-
-class ParamikoTest(unittest.TestCase):
- # for Python 2.3 and below
- if not hasattr(unittest.TestCase, 'assertTrue'):
- assertTrue = unittest.TestCase.failUnless
- if not hasattr(unittest.TestCase, 'assertFalse'):
- assertFalse = unittest.TestCase.failIf
-
-
def test_path(filename):
return os.path.join(root_path, filename)