diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/stub_sftp.py | 1 | ||||
-rw-r--r-- | tests/test_buffered_pipe.py | 10 | ||||
-rw-r--r-- | tests/test_client.py | 31 | ||||
-rw-r--r-- | tests/test_gssapi.py | 157 | ||||
-rw-r--r-- | tests/test_kex_gss.py | 133 | ||||
-rw-r--r-- | tests/test_packetizer.py | 46 | ||||
-rw-r--r-- | tests/test_pkey.py | 18 | ||||
-rw-r--r-- | tests/test_ssh_gss.py | 126 | ||||
-rw-r--r-- | tests/test_transport.py | 134 | ||||
-rw-r--r-- | tests/test_util.py | 17 | ||||
-rw-r--r-- | tests/util.py | 10 |
11 files changed, 606 insertions, 77 deletions
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py index 47644433..24380ba1 100644 --- a/tests/stub_sftp.py +++ b/tests/stub_sftp.py @@ -21,6 +21,7 @@ A stub SFTP server for loopback SFTP testing. """ import os +import sys from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \ SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED from paramiko.common import o666 diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py index a53081a9..eeb4d0ad 100644 --- a/tests/test_buffered_pipe.py +++ b/tests/test_buffered_pipe.py @@ -22,10 +22,10 @@ Some unit tests for BufferedPipe. import threading import time +import unittest from paramiko.buffered_pipe import BufferedPipe, PipeTimeout from paramiko import pipe - -from tests.util import ParamikoTest +from paramiko.py3compat import b def delay_thread(p): @@ -40,7 +40,7 @@ def close_thread(p): p.close() -class BufferedPipeTest(ParamikoTest): +class BufferedPipeTest(unittest.TestCase): def test_1_buffered_pipe(self): p = BufferedPipe() self.assertTrue(not p.read_ready()) @@ -48,12 +48,12 @@ class BufferedPipeTest(ParamikoTest): self.assertTrue(p.read_ready()) data = p.read(6) self.assertEqual(b'hello.', data) - + p.feed('plus/minus') self.assertEqual(b'plu', p.read(3)) self.assertEqual(b's/m', p.read(3)) self.assertEqual(b'inus', p.read(4)) - + p.close() self.assertTrue(not p.read_ready()) self.assertEqual(b'', p.read(1)) diff --git a/tests/test_client.py b/tests/test_client.py index b3635272..28d1cb46 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -29,6 +29,7 @@ import unittest import weakref import warnings import os +import time from tests.util import test_path import paramiko from paramiko.common import PY2, b @@ -93,7 +94,7 @@ class SSHClientTest (unittest.TestCase): if hasattr(self, attr): getattr(self, attr).close() - def _run(self, allowed_keys=None): + def _run(self, allowed_keys=None, delay=0): if allowed_keys is None: allowed_keys = FINGERPRINTS.keys() self.socks, addr = self.sockl.accept() @@ -101,6 +102,8 @@ class SSHClientTest (unittest.TestCase): host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) self.ts.add_server_key(host_key) server = NullServer(allowed_keys=allowed_keys) + if delay: + time.sleep(delay) self.ts.start_server(self.event, server) def _test_connection(self, **kwargs): @@ -264,6 +267,8 @@ class SSHClientTest (unittest.TestCase): """ # Unclear why this is borked on Py3, but it is, and does not seem worth # pursuing at the moment. + # XXX: It's the release of the references to e.g packetizer that fails + # in py3... if not PY2: return threading.Thread(target=self._run).start() @@ -296,7 +301,7 @@ class SSHClientTest (unittest.TestCase): self.assertTrue(p() is None) - def test_6_client_can_be_used_as_context_manager(self): + def test_client_can_be_used_as_context_manager(self): """ verify that an SSHClient can be used a context manager """ @@ -317,3 +322,25 @@ class SSHClientTest (unittest.TestCase): self.assertTrue(self.tc._transport is not None) self.assertTrue(self.tc._transport is None) + + def test_7_banner_timeout(self): + """ + verify that the SSHClient has a configurable banner timeout. + """ + # Start the thread with a 1 second wait. + threading.Thread(target=self._run, kwargs={'delay': 1}).start() + host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + public_host_key = paramiko.RSAKey(data=host_key.asbytes()) + + self.tc = paramiko.SSHClient() + self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key) + # Connect with a half second banner timeout. + self.assertRaises( + paramiko.SSHException, + self.tc.connect, + self.addr, + self.port, + username='slowdive', + password='pygmalion', + banner_timeout=0.5 + ) diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py new file mode 100644 index 00000000..0d3df72c --- /dev/null +++ b/tests/test_gssapi.py @@ -0,0 +1,157 @@ +# Copyright (C) 2013-2014 science + computing ag +# Author: Sebastian Deiss <sebastian.deiss@t-online.de> +# +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Test the used APIs for GSS-API / SSPI authentication +""" + +import unittest +import socket + + +class GSSAPITest(unittest.TestCase): + + def init(hostname=None, srv_mode=False): + global krb5_mech, targ_name, server_mode + krb5_mech = "1.2.840.113554.1.2.2" + targ_name = hostname + server_mode = srv_mode + + init = staticmethod(init) + + def test_1_pyasn1(self): + """ + Test the used methods of pyasn1. + """ + from pyasn1.type.univ import ObjectIdentifier + from pyasn1.codec.der import encoder, decoder + oid = encoder.encode(ObjectIdentifier(krb5_mech)) + mech, __ = decoder.decode(oid) + self.assertEquals(krb5_mech, mech.__str__()) + + def test_2_gssapi_sspi(self): + """ + Test the used methods of python-gssapi or sspi, sspicon from pywin32. + """ + _API = "MIT" + try: + import gssapi + except ImportError: + import sspicon + import sspi + _API = "SSPI" + + c_token = None + gss_ctxt_status = False + mic_msg = b"G'day Mate!" + + if _API == "MIT": + if server_mode: + gss_flags = (gssapi.C_PROT_READY_FLAG, + gssapi.C_INTEG_FLAG, + gssapi.C_MUTUAL_FLAG, + gssapi.C_DELEG_FLAG) + else: + gss_flags = (gssapi.C_PROT_READY_FLAG, + gssapi.C_INTEG_FLAG, + gssapi.C_DELEG_FLAG) + """ + Initialize a GSS-API context. + """ + ctx = gssapi.Context() + ctx.flags = gss_flags + krb5_oid = gssapi.OID.mech_from_string(krb5_mech) + target_name = gssapi.Name("host@" + targ_name, + gssapi.C_NT_HOSTBASED_SERVICE) + gss_ctxt = gssapi.InitContext(peer_name=target_name, + mech_type=krb5_oid, + req_flags=ctx.flags) + if server_mode: + c_token = gss_ctxt.step(c_token) + gss_ctxt_status = gss_ctxt.established + self.assertEquals(False, gss_ctxt_status) + """ + Accept a GSS-API context. + """ + gss_srv_ctxt = gssapi.AcceptContext() + s_token = gss_srv_ctxt.step(c_token) + gss_ctxt_status = gss_srv_ctxt.established + self.assertNotEquals(None, s_token) + self.assertEquals(True, gss_ctxt_status) + """ + Establish the client context + """ + c_token = gss_ctxt.step(s_token) + self.assertEquals(None, c_token) + else: + while not gss_ctxt.established: + c_token = gss_ctxt.step(c_token) + self.assertNotEquals(None, c_token) + """ + Build MIC + """ + mic_token = gss_ctxt.get_mic(mic_msg) + + if server_mode: + """ + Check MIC + """ + status = gss_srv_ctxt.verify_mic(mic_msg, mic_token) + self.assertEquals(0, status) + else: + gss_flags = sspicon.ISC_REQ_INTEGRITY |\ + sspicon.ISC_REQ_MUTUAL_AUTH |\ + sspicon.ISC_REQ_DELEGATE + """ + Initialize a GSS-API context. + """ + target_name = "host/" + socket.getfqdn(targ_name) + gss_ctxt = sspi.ClientAuth("Kerberos", + scflags=gss_flags, + targetspn=target_name) + if server_mode: + error, token = gss_ctxt.authorize(c_token) + c_token = token[0].Buffer + self.assertEquals(0, error) + """ + Accept a GSS-API context. + """ + gss_srv_ctxt = sspi.ServerAuth("Kerberos", spn=target_name) + error, token = gss_srv_ctxt.authorize(c_token) + s_token = token[0].Buffer + """ + Establish the context. + """ + error, token = gss_ctxt.authorize(s_token) + c_token = token[0].Buffer + self.assertEquals(None, c_token) + self.assertEquals(0, error) + """ + Build MIC + """ + mic_token = gss_ctxt.sign(mic_msg) + """ + Check MIC + """ + gss_srv_ctxt.verify(mic_msg, mic_token) + else: + error, token = gss_ctxt.authorize(c_token) + c_token = token[0].Buffer + self.assertNotEquals(0, error) diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py new file mode 100644 index 00000000..b5e277b3 --- /dev/null +++ b/tests/test_kex_gss.py @@ -0,0 +1,133 @@ +# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> +# Copyright (C) 2013-2014 science + computing ag +# Author: Sebastian Deiss <sebastian.deiss@t-online.de> +# +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Unit Tests for the GSS-API / SSPI SSHv2 Diffie-Hellman Key Exchange and user +authentication +""" + + +import socket +import threading +import unittest + +import paramiko + + +class NullServer (paramiko.ServerInterface): + + def get_allowed_auths(self, username): + return 'gssapi-keyex' + + def check_auth_gssapi_keyex(self, username, + gss_authenticated=paramiko.AUTH_FAILED, + cc_file=None): + if gss_authenticated == paramiko.AUTH_SUCCESSFUL: + return paramiko.AUTH_SUCCESSFUL + return paramiko.AUTH_FAILED + + def enable_auth_gssapi(self): + UseGSSAPI = True + return UseGSSAPI + + def check_channel_request(self, kind, chanid): + return paramiko.OPEN_SUCCEEDED + + def check_channel_exec_request(self, channel, command): + if command != 'yes': + return False + return True + + +class GSSKexTest(unittest.TestCase): + + def init(username, hostname): + global krb5_principal, targ_name + krb5_principal = username + targ_name = hostname + + init = staticmethod(init) + + def setUp(self): + self.username = krb5_principal + self.hostname = socket.getfqdn(targ_name) + self.sockl = socket.socket() + self.sockl.bind((targ_name, 0)) + self.sockl.listen(1) + self.addr, self.port = self.sockl.getsockname() + self.event = threading.Event() + thread = threading.Thread(target=self._run) + thread.start() + + def tearDown(self): + for attr in "tc ts socks sockl".split(): + if hasattr(self, attr): + getattr(self, attr).close() + + def _run(self): + self.socks, addr = self.sockl.accept() + self.ts = paramiko.Transport(self.socks, True) + host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + self.ts.add_server_key(host_key) + self.ts.set_gss_host(targ_name) + try: + self.ts.load_server_moduli() + except: + print ('(Failed to load moduli -- gex will be unsupported.)') + server = NullServer() + self.ts.start_server(self.event, server) + + def test_1_gsskex_and_auth(self): + """ + Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated + Diffie-Hellman Key Exchange and user authentication with the GSS-API + context created during key exchange. + """ + host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = paramiko.RSAKey(data=host_key.asbytes()) + + self.tc = paramiko.SSHClient() + self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port), + 'ssh-rsa', public_host_key) + self.tc.connect(self.hostname, self.port, username=self.username, + gss_auth=True, gss_kex=True) + + self.event.wait(1.0) + self.assert_(self.event.isSet()) + self.assert_(self.ts.is_active()) + self.assertEquals(self.username, self.ts.get_username()) + self.assertEquals(True, self.ts.is_authenticated()) + + stdin, stdout, stderr = self.tc.exec_command('yes') + schan = self.ts.accept(1.0) + + schan.send('Hello there.\n') + schan.send_stderr('This is on stderr.\n') + schan.close() + + self.assertEquals('Hello there.\n', stdout.readline()) + self.assertEquals('', stdout.readline()) + self.assertEquals('This is on stderr.\n', stderr.readline()) + self.assertEquals('', stderr.readline()) + + stdin.close() + stdout.close() + stderr.close() diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py index a8c0f973..8faec03c 100644 --- a/tests/test_packetizer.py +++ b/tests/test_packetizer.py @@ -74,3 +74,49 @@ class PacketizerTest (unittest.TestCase): self.assertEqual(100, m.get_int()) self.assertEqual(1, m.get_int()) self.assertEqual(900, m.get_int()) + + def test_3_closed(self): + rsock = LoopSocket() + wsock = LoopSocket() + rsock.link(wsock) + p = Packetizer(wsock) + p.set_log(util.get_logger('paramiko.transport')) + p.set_hexdump(True) + cipher = AES.new(zero_byte * 16, AES.MODE_CBC, x55 * 16) + p.set_outbound_cipher(cipher, 16, sha1, 12, x1f * 20) + + # message has to be at least 16 bytes long, so we'll have at least one + # block of data encrypted that contains zero random padding bytes + m = Message() + m.add_byte(byte_chr(100)) + m.add_int(100) + m.add_int(1) + m.add_int(900) + wsock.send = lambda x: 0 + from functools import wraps + import errno + import os + import signal + + class TimeoutError(Exception): + pass + + def timeout(seconds=1, error_message=os.strerror(errno.ETIME)): + def decorator(func): + def _handle_timeout(signum, frame): + raise TimeoutError(error_message) + + def wrapper(*args, **kwargs): + signal.signal(signal.SIGALRM, _handle_timeout) + signal.alarm(seconds) + try: + result = func(*args, **kwargs) + finally: + signal.alarm(0) + return result + + return wraps(func)(wrapper) + + return decorator + send = timeout()(p.send_message) + self.assertRaises(EOFError, send, m) diff --git a/tests/test_pkey.py b/tests/test_pkey.py index 1468ee27..f673254f 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -21,6 +21,7 @@ Some unit tests for public/private key objects. """ import unittest +import os from binascii import hexlify from hashlib import md5 @@ -253,3 +254,20 @@ class KeyTest (unittest.TestCase): msg.rewind() pub = ECDSAKey(data=key.asbytes()) self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg)) + + def test_salt_size(self): + # Read an existing encrypted private key + file_ = test_path('test_rsa_password.key') + password = 'television' + newfile = file_ + '.new' + newpassword = 'radio' + key = RSAKey(filename=file_, password=password) + # Write out a newly re-encrypted copy with a new password. + # When the bug under test exists, this will ValueError. + try: + key.write_private_key_file(newfile, password=newpassword) + # Verify the inner key data still matches (when no ValueError) + key2 = RSAKey(filename=newfile, password=newpassword) + self.assertEqual(key, key2) + finally: + os.remove(newfile) diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py new file mode 100644 index 00000000..595081b8 --- /dev/null +++ b/tests/test_ssh_gss.py @@ -0,0 +1,126 @@ +# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> +# Copyright (C) 2013-2014 science + computing ag +# Author: Sebastian Deiss <sebastian.deiss@t-online.de> +# +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Unit Tests for the GSS-API / SSPI SSHv2 Authentication (gssapi-with-mic) +""" + +import socket +import threading +import unittest + +import paramiko + + +class NullServer (paramiko.ServerInterface): + + def get_allowed_auths(self, username): + return 'gssapi-with-mic' + + def check_auth_gssapi_with_mic(self, username, + gss_authenticated=paramiko.AUTH_FAILED, + cc_file=None): + if gss_authenticated == paramiko.AUTH_SUCCESSFUL: + return paramiko.AUTH_SUCCESSFUL + return paramiko.AUTH_FAILED + + def enable_auth_gssapi(self): + UseGSSAPI = True + GSSAPICleanupCredentials = True + return UseGSSAPI + + def check_channel_request(self, kind, chanid): + return paramiko.OPEN_SUCCEEDED + + def check_channel_exec_request(self, channel, command): + if command != 'yes': + return False + return True + + +class GSSAuthTest(unittest.TestCase): + + def init(username, hostname): + global krb5_principal, targ_name + krb5_principal = username + targ_name = hostname + + init = staticmethod(init) + + def setUp(self): + self.username = krb5_principal + self.hostname = socket.getfqdn(targ_name) + self.sockl = socket.socket() + self.sockl.bind((targ_name, 0)) + self.sockl.listen(1) + self.addr, self.port = self.sockl.getsockname() + self.event = threading.Event() + thread = threading.Thread(target=self._run) + thread.start() + + def tearDown(self): + for attr in "tc ts socks sockl".split(): + if hasattr(self, attr): + getattr(self, attr).close() + + def _run(self): + self.socks, addr = self.sockl.accept() + self.ts = paramiko.Transport(self.socks) + host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + self.ts.add_server_key(host_key) + server = NullServer() + self.ts.start_server(self.event, server) + + def test_1_gss_auth(self): + """ + Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication + (gssapi-with-mic) in client and server mode. + """ + host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = paramiko.RSAKey(data=host_key.asbytes()) + + self.tc = paramiko.SSHClient() + self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port), + 'ssh-rsa', public_host_key) + self.tc.connect(self.hostname, self.port, username=self.username, + gss_auth=True) + + self.event.wait(1.0) + self.assert_(self.event.isSet()) + self.assert_(self.ts.is_active()) + self.assertEquals(self.username, self.ts.get_username()) + self.assertEquals(True, self.ts.is_authenticated()) + + stdin, stdout, stderr = self.tc.exec_command('yes') + schan = self.ts.accept(1.0) + + schan.send('Hello there.\n') + schan.send_stderr('This is on stderr.\n') + schan.close() + + self.assertEquals('Hello there.\n', stdout.readline()) + self.assertEquals('', stdout.readline()) + self.assertEquals('This is on stderr.\n', stderr.readline()) + self.assertEquals('', stderr.readline()) + + stdin.close() + stdout.close() + stderr.close() diff --git a/tests/test_transport.py b/tests/test_transport.py index ae0f5e01..50b1d86b 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -28,16 +28,19 @@ import socket import time import threading import random +import unittest from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \ SSHException, ChannelException from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED -from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST +from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, \ + MIN_PACKET_SIZE, MAX_WINDOW_SIZE, \ + DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE from paramiko.py3compat import bytes from paramiko.message import Message from tests.loop import LoopSocket -from tests.util import ParamikoTest, test_path +from tests.util import test_path LONG_BANNER = """\ @@ -57,7 +60,7 @@ class NullServer (ServerInterface): paranoid_did_password = False paranoid_did_public_key = False paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key')) - + def get_allowed_auths(self, username): if username == 'slowdive': return 'publickey,password' @@ -80,24 +83,24 @@ class NullServer (ServerInterface): def check_channel_shell_request(self, channel): return True - + def check_global_request(self, kind, msg): self._global_request = kind return False - + def check_channel_x11_request(self, channel, single_connection, auth_protocol, auth_cookie, screen_number): self._x11_single_connection = single_connection self._x11_auth_protocol = auth_protocol self._x11_auth_cookie = auth_cookie self._x11_screen_number = screen_number return True - + def check_port_forward_request(self, addr, port): self._listen = socket.socket() self._listen.bind(('127.0.0.1', 0)) self._listen.listen(1) return self._listen.getsockname()[1] - + def cancel_port_forward_request(self, addr, port): self._listen.close() self._listen = None @@ -107,7 +110,7 @@ class NullServer (ServerInterface): return OPEN_SUCCEEDED -class TransportTest(ParamikoTest): +class TransportTest(unittest.TestCase): def setUp(self): self.socks = LoopSocket() self.sockc = LoopSocket() @@ -125,12 +128,12 @@ class TransportTest(ParamikoTest): host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) - + if client_options is not None: client_options(self.tc.get_security_options()) if server_options is not None: server_options(self.ts.get_security_options()) - + event = threading.Event() self.server = NullServer() self.assertTrue(not event.isSet()) @@ -157,7 +160,7 @@ class TransportTest(ParamikoTest): self.assertTrue(False) except TypeError: pass - + def test_2_compute_key(self): self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929 self.tc.H = b'\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3' @@ -210,7 +213,7 @@ class TransportTest(ParamikoTest): event.wait(1.0) self.assertTrue(event.isSet()) self.assertTrue(self.ts.is_active()) - + def test_4_special(self): """ verify that the client can demand odd handshake settings, and can @@ -224,7 +227,7 @@ class TransportTest(ParamikoTest): self.assertEqual('aes256-cbc', self.tc.remote_cipher) self.assertEqual(12, self.tc.packetizer.get_mac_size_out()) self.assertEqual(12, self.tc.packetizer.get_mac_size_in()) - + self.tc.send_ignore(1024) self.tc.renegotiate_keys() self.ts.send_ignore(1024) @@ -238,7 +241,7 @@ class TransportTest(ParamikoTest): self.tc.set_keepalive(1) time.sleep(2) self.assertEqual('keepalive@lag.net', self.server._global_request) - + def test_6_exec_command(self): """ verify that exec_command() does something reasonable. @@ -252,7 +255,7 @@ class TransportTest(ParamikoTest): self.assertTrue(False) except SSHException: pass - + chan = self.tc.open_session() chan.exec_command('yes') schan = self.ts.accept(1.0) @@ -266,7 +269,7 @@ class TransportTest(ParamikoTest): f = chan.makefile_stderr() self.assertEqual('This is on stderr.\n', f.readline()) self.assertEqual('', f.readline()) - + # now try it with combined stdout/stderr chan = self.tc.open_session() chan.exec_command('yes') @@ -275,7 +278,7 @@ class TransportTest(ParamikoTest): schan.send_stderr('This is on stderr.\n') schan.close() - chan.set_combine_stderr(True) + chan.set_combine_stderr(True) f = chan.makefile() self.assertEqual('Hello there.\n', f.readline()) self.assertEqual('This is on stderr.\n', f.readline()) @@ -338,7 +341,7 @@ class TransportTest(ParamikoTest): schan.shutdown_write() schan.send_exit_status(23) schan.close() - + f = chan.makefile() self.assertEqual('Hello there.\n', f.readline()) self.assertEqual('', f.readline()) @@ -360,14 +363,14 @@ class TransportTest(ParamikoTest): chan.invoke_shell() schan = self.ts.accept(1.0) - # nothing should be ready + # nothing should be ready r, w, e = select.select([chan], [], [], 0.1) self.assertEqual([], r) self.assertEqual([], w) self.assertEqual([], e) - + schan.send('hello\n') - + # something should be ready now (give it 1 second to appear) for i in range(10): r, w, e = select.select([chan], [], [], 0.1) @@ -379,7 +382,7 @@ class TransportTest(ParamikoTest): self.assertEqual([], e) self.assertEqual(b'hello\n', chan.recv(6)) - + # and, should be dead again now r, w, e = select.select([chan], [], [], 0.1) self.assertEqual([], r) @@ -387,7 +390,7 @@ class TransportTest(ParamikoTest): self.assertEqual([], e) schan.close() - + # detect eof? for i in range(10): r, w, e = select.select([chan], [], [], 0.1) @@ -398,14 +401,14 @@ class TransportTest(ParamikoTest): self.assertEqual([], w) self.assertEqual([], e) self.assertEqual(bytes(), chan.recv(16)) - + # make sure the pipe is still open for now... p = chan._pipe self.assertEqual(False, p._closed) chan.close() # ...and now is closed. self.assertEqual(True, p._closed) - + def test_B_renegotiate(self): """ verify that a transport can correctly renegotiate mid-stream. @@ -420,7 +423,7 @@ class TransportTest(ParamikoTest): for i in range(20): chan.send('x' * 1024) chan.close() - + # allow a few seconds for the rekeying to complete for i in range(50): if self.tc.H != self.tc.session_id: @@ -459,28 +462,28 @@ class TransportTest(ParamikoTest): chan = self.tc.open_session() chan.exec_command('yes') schan = self.ts.accept(1.0) - + requested = [] def handler(c, addr_port): addr, port = addr_port requested.append((addr, port)) self.tc._queue_incoming_channel(c) - + self.assertEqual(None, getattr(self.server, '_x11_screen_number', None)) cookie = chan.request_x11(0, single_connection=True, handler=handler) self.assertEqual(0, self.server._x11_screen_number) self.assertEqual('MIT-MAGIC-COOKIE-1', self.server._x11_auth_protocol) self.assertEqual(cookie, self.server._x11_auth_cookie) self.assertEqual(True, self.server._x11_single_connection) - + x11_server = self.ts.open_x11_channel(('localhost', 6093)) x11_client = self.tc.accept() self.assertEqual('localhost', requested[0][0]) self.assertEqual(6093, requested[0][1]) - + x11_server.send('hello') self.assertEqual(b'hello', x11_client.recv(5)) - + x11_server.close() x11_client.close() chan.close() @@ -495,13 +498,13 @@ class TransportTest(ParamikoTest): chan = self.tc.open_session() chan.exec_command('yes') schan = self.ts.accept(1.0) - + requested = [] def handler(c, origin_addr_port, server_addr_port): requested.append(origin_addr_port) requested.append(server_addr_port) self.tc._queue_incoming_channel(c) - + port = self.tc.request_port_forward('127.0.0.1', 0, handler) self.assertEqual(port, self.server._listen.getsockname()[1]) @@ -510,14 +513,14 @@ class TransportTest(ParamikoTest): ss, _ = self.server._listen.accept() sch = self.ts.open_forwarded_tcpip_channel(ss.getsockname(), ss.getpeername()) cch = self.tc.accept() - + sch.send('hello') self.assertEqual(b'hello', cch.recv(5)) sch.close() cch.close() ss.close() cs.close() - + # now cancel it. self.tc.cancel_port_forward('127.0.0.1', port) self.assertTrue(self.server._listen is None) @@ -531,7 +534,7 @@ class TransportTest(ParamikoTest): chan = self.tc.open_session() chan.exec_command('yes') schan = self.ts.accept(1.0) - + # open a port on the "server" that the client will ask to forward to. greeting_server = socket.socket() greeting_server.bind(('127.0.0.1', 0)) @@ -542,13 +545,13 @@ class TransportTest(ParamikoTest): sch = self.ts.accept(1.0) cch = socket.socket() cch.connect(self.server._tcpip_dest) - + ss, _ = greeting_server.accept() ss.send(b'Hello!\n') ss.close() sch.send(cch.recv(8192)) sch.close() - + self.assertEqual(b'Hello!\n', cs.recv(7)) cs.close() @@ -562,14 +565,14 @@ class TransportTest(ParamikoTest): chan.invoke_shell() schan = self.ts.accept(1.0) - # nothing should be ready + # nothing should be ready r, w, e = select.select([chan], [], [], 0.1) self.assertEqual([], r) self.assertEqual([], w) self.assertEqual([], e) - + schan.send_stderr('hello\n') - + # something should be ready now (give it 1 second to appear) for i in range(10): r, w, e = select.select([chan], [], [], 0.1) @@ -581,7 +584,7 @@ class TransportTest(ParamikoTest): self.assertEqual([], e) self.assertEqual(b'hello\n', chan.recv_stderr(6)) - + # and, should be dead again now r, w, e = select.select([chan], [], [], 0.1) self.assertEqual([], r) @@ -603,12 +606,13 @@ class TransportTest(ParamikoTest): self.assertEqual(chan.send_ready(), True) total = 0 K = '*' * 1024 - while total < 1024 * 1024: + limit = 1+(64 * 2 ** 15) + while total < limit: chan.send(K) total += len(K) if not chan.send_ready(): break - self.assertTrue(total < 1024 * 1024) + self.assertTrue(total < limit) schan.close() chan.close() @@ -617,10 +621,10 @@ class TransportTest(ParamikoTest): def test_I_rekey_deadlock(self): """ Regression test for deadlock when in-transit messages are received after MSG_KEXINIT is sent - + Note: When this test fails, it may leak threads. """ - + # Test for an obscure deadlocking bug that can occur if we receive # certain messages while initiating a key exchange. # @@ -637,7 +641,7 @@ class TransportTest(ParamikoTest): # NeedRekeyException. # 4. In response to NeedRekeyException, the transport thread sends # MSG_KEXINIT to the remote host. - # + # # On the remote host (using any SSH implementation): # 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST is sent. # 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is sent. @@ -672,7 +676,7 @@ class TransportTest(ParamikoTest): self.done_event = done_event self.watchdog_event = threading.Event() self.last = None - + def run(self): try: for i in range(1, 1+self.iterations): @@ -684,7 +688,7 @@ class TransportTest(ParamikoTest): finally: self.done_event.set() self.watchdog_event.set() - + class ReceiveThread(threading.Thread): def __init__(self, chan, done_event): threading.Thread.__init__(self, None, None, self.__class__.__name__) @@ -692,7 +696,7 @@ class TransportTest(ParamikoTest): self.chan = chan self.done_event = done_event self.watchdog_event = threading.Event() - + def run(self): try: while not self.done_event.isSet(): @@ -705,10 +709,10 @@ class TransportTest(ParamikoTest): finally: self.done_event.set() self.watchdog_event.set() - + self.setup_test_server() self.ts.packetizer.REKEY_BYTES = 2048 - + chan = self.tc.open_session() chan.exec_command('yes') schan = self.ts.accept(1.0) @@ -730,7 +734,7 @@ class TransportTest(ParamikoTest): self._send_message(m2) return _negotiate_keys(self, m) self.tc._handler_table[MSG_KEXINIT] = _negotiate_keys_wrapper - + # Parameters for the test iterations = 500 # The deadlock does not happen every time, but it # should after many iterations. @@ -742,12 +746,12 @@ class TransportTest(ParamikoTest): # Start the sending thread st = SendThread(schan, iterations, done_event) st.start() - + # Start the receiving thread rt = ReceiveThread(chan, done_event) rt.start() - # Act as a watchdog timer, checking + # Act as a watchdog timer, checking deadlocked = False while not deadlocked and not done_event.isSet(): for event in (st.watchdog_event, rt.watchdog_event): @@ -758,7 +762,7 @@ class TransportTest(ParamikoTest): deadlocked = True break event.clear() - + # Tell the threads to stop (if they haven't already stopped). Note # that if one or more threads are deadlocked, they might hang around # forever (until the process exits). @@ -770,3 +774,21 @@ class TransportTest(ParamikoTest): # Close the channels schan.close() chan.close() + + def test_J_sanitze_packet_size(self): + """ + verify that we conform to the rfc of packet and window sizes. + """ + for val, correct in [(32767, MIN_PACKET_SIZE), + (None, DEFAULT_MAX_PACKET_SIZE), + (2**32, MAX_WINDOW_SIZE)]: + self.assertEqual(self.tc._sanitize_packet_size(val), correct) + + def test_K_sanitze_window_size(self): + """ + verify that we conform to the rfc of packet and window sizes. + """ + for val, correct in [(32767, MIN_PACKET_SIZE), + (None, DEFAULT_WINDOW_SIZE), + (2**32, MAX_WINDOW_SIZE)]: + self.assertEqual(self.tc._sanitize_window_size(val), correct) diff --git a/tests/test_util.py b/tests/test_util.py index 394ea553..35e15765 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -24,13 +24,12 @@ from binascii import hexlify import errno import os from hashlib import sha1 +import unittest import paramiko.util from paramiko.util import lookup_ssh_host_config as host_config from paramiko.py3compat import StringIO, byte_ord -from tests.util import ParamikoTest - test_config_file = """\ Host * User robey @@ -41,7 +40,12 @@ Host *.example.com \tUser bjork Port=3333 Host * - \t \t Crazy something dumb +""" + +dont_strip_whitespace_please = "\t \t Crazy something dumb " + +test_config_file += dont_strip_whitespace_please +test_config_file += """ Host spoo.example.com Crazy something else """ @@ -60,7 +64,7 @@ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\ from paramiko import * -class UtilTest(ParamikoTest): +class UtilTest(unittest.TestCase): def test_1_import(self): """ verify that all the classes can be imported from paramiko. @@ -334,6 +338,11 @@ IdentityFile something_%l_using_fqdn config = paramiko.util.parse_ssh_config(StringIO(test_config)) assert config.lookup('meh') # will die during lookup() if bug regresses + def test_clamp_value(self): + self.assertEqual(32768, paramiko.util.clamp_value(32767, 32768, 32769)) + self.assertEqual(32767, paramiko.util.clamp_value(32767, 32765, 32769)) + self.assertEqual(32769, paramiko.util.clamp_value(32767, 32770, 32769)) + def test_13_config_dos_crlf_succeeds(self): config_file = StringIO("host abcqwerty\r\nHostName 127.0.0.1\r\n") config = paramiko.SSHConfig() diff --git a/tests/util.py b/tests/util.py index 66d2696c..b546a7e1 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,17 +1,7 @@ import os -import unittest root_path = os.path.dirname(os.path.realpath(__file__)) - -class ParamikoTest(unittest.TestCase): - # for Python 2.3 and below - if not hasattr(unittest.TestCase, 'assertTrue'): - assertTrue = unittest.TestCase.failUnless - if not hasattr(unittest.TestCase, 'assertFalse'): - assertFalse = unittest.TestCase.failIf - - def test_path(filename): return os.path.join(root_path, filename) |