summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--tests/test_buffered_pipe.py9
-rw-r--r--tests/test_transport.py107
-rw-r--r--tests/test_util.py5
-rw-r--r--tests/util.py10
4 files changed, 60 insertions, 71 deletions
diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py
index a53081a9..3b8f97ad 100644
--- a/tests/test_buffered_pipe.py
+++ b/tests/test_buffered_pipe.py
@@ -22,11 +22,10 @@ Some unit tests for BufferedPipe.
import threading
import time
+import unittest
from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
from paramiko import pipe
-from tests.util import ParamikoTest
-
def delay_thread(p):
p.feed('a')
@@ -40,7 +39,7 @@ def close_thread(p):
p.close()
-class BufferedPipeTest(ParamikoTest):
+class BufferedPipeTest(unittest.TestCase):
def test_1_buffered_pipe(self):
p = BufferedPipe()
self.assertTrue(not p.read_ready())
@@ -48,12 +47,12 @@ class BufferedPipeTest(ParamikoTest):
self.assertTrue(p.read_ready())
data = p.read(6)
self.assertEqual(b'hello.', data)
-
+
p.feed('plus/minus')
self.assertEqual(b'plu', p.read(3))
self.assertEqual(b's/m', p.read(3))
self.assertEqual(b'inus', p.read(4))
-
+
p.close()
self.assertTrue(not p.read_ready())
self.assertEqual(b'', p.read(1))
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 485a18e8..5c77a321 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -26,6 +26,7 @@ import socket
import time
import threading
import random
+import unittest
from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \
SSHException, ChannelException
@@ -35,7 +36,7 @@ from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST
from paramiko.py3compat import bytes
from paramiko.message import Message
from tests.loop import LoopSocket
-from tests.util import ParamikoTest, test_path
+from tests.util import test_path
LONG_BANNER = """\
@@ -55,7 +56,7 @@ class NullServer (ServerInterface):
paranoid_did_password = False
paranoid_did_public_key = False
paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key'))
-
+
def get_allowed_auths(self, username):
if username == 'slowdive':
return 'publickey,password'
@@ -78,24 +79,24 @@ class NullServer (ServerInterface):
def check_channel_shell_request(self, channel):
return True
-
+
def check_global_request(self, kind, msg):
self._global_request = kind
return False
-
+
def check_channel_x11_request(self, channel, single_connection, auth_protocol, auth_cookie, screen_number):
self._x11_single_connection = single_connection
self._x11_auth_protocol = auth_protocol
self._x11_auth_cookie = auth_cookie
self._x11_screen_number = screen_number
return True
-
+
def check_port_forward_request(self, addr, port):
self._listen = socket.socket()
self._listen.bind(('127.0.0.1', 0))
self._listen.listen(1)
return self._listen.getsockname()[1]
-
+
def cancel_port_forward_request(self, addr, port):
self._listen.close()
self._listen = None
@@ -105,7 +106,7 @@ class NullServer (ServerInterface):
return OPEN_SUCCEEDED
-class TransportTest(ParamikoTest):
+class TransportTest(unittest.TestCase):
def setUp(self):
self.socks = LoopSocket()
self.sockc = LoopSocket()
@@ -123,12 +124,12 @@ class TransportTest(ParamikoTest):
host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
-
+
if client_options is not None:
client_options(self.tc.get_security_options())
if server_options is not None:
server_options(self.ts.get_security_options())
-
+
event = threading.Event()
self.server = NullServer()
self.assertTrue(not event.isSet())
@@ -155,7 +156,7 @@ class TransportTest(ParamikoTest):
self.assertTrue(False)
except TypeError:
pass
-
+
def test_2_compute_key(self):
self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929
self.tc.H = b'\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3'
@@ -208,7 +209,7 @@ class TransportTest(ParamikoTest):
event.wait(1.0)
self.assertTrue(event.isSet())
self.assertTrue(self.ts.is_active())
-
+
def test_4_special(self):
"""
verify that the client can demand odd handshake settings, and can
@@ -222,7 +223,7 @@ class TransportTest(ParamikoTest):
self.assertEqual('aes256-cbc', self.tc.remote_cipher)
self.assertEqual(12, self.tc.packetizer.get_mac_size_out())
self.assertEqual(12, self.tc.packetizer.get_mac_size_in())
-
+
self.tc.send_ignore(1024)
self.tc.renegotiate_keys()
self.ts.send_ignore(1024)
@@ -236,7 +237,7 @@ class TransportTest(ParamikoTest):
self.tc.set_keepalive(1)
time.sleep(2)
self.assertEqual('keepalive@lag.net', self.server._global_request)
-
+
def test_6_exec_command(self):
"""
verify that exec_command() does something reasonable.
@@ -250,7 +251,7 @@ class TransportTest(ParamikoTest):
self.assertTrue(False)
except SSHException:
pass
-
+
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
@@ -264,7 +265,7 @@ class TransportTest(ParamikoTest):
f = chan.makefile_stderr()
self.assertEqual('This is on stderr.\n', f.readline())
self.assertEqual('', f.readline())
-
+
# now try it with combined stdout/stderr
chan = self.tc.open_session()
chan.exec_command('yes')
@@ -273,7 +274,7 @@ class TransportTest(ParamikoTest):
schan.send_stderr('This is on stderr.\n')
schan.close()
- chan.set_combine_stderr(True)
+ chan.set_combine_stderr(True)
f = chan.makefile()
self.assertEqual('Hello there.\n', f.readline())
self.assertEqual('This is on stderr.\n', f.readline())
@@ -320,7 +321,7 @@ class TransportTest(ParamikoTest):
schan.shutdown_write()
schan.send_exit_status(23)
schan.close()
-
+
f = chan.makefile()
self.assertEqual('Hello there.\n', f.readline())
self.assertEqual('', f.readline())
@@ -342,14 +343,14 @@ class TransportTest(ParamikoTest):
chan.invoke_shell()
schan = self.ts.accept(1.0)
- # nothing should be ready
+ # nothing should be ready
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
self.assertEqual([], w)
self.assertEqual([], e)
-
+
schan.send('hello\n')
-
+
# something should be ready now (give it 1 second to appear)
for i in range(10):
r, w, e = select.select([chan], [], [], 0.1)
@@ -361,7 +362,7 @@ class TransportTest(ParamikoTest):
self.assertEqual([], e)
self.assertEqual(b'hello\n', chan.recv(6))
-
+
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
@@ -369,7 +370,7 @@ class TransportTest(ParamikoTest):
self.assertEqual([], e)
schan.close()
-
+
# detect eof?
for i in range(10):
r, w, e = select.select([chan], [], [], 0.1)
@@ -380,14 +381,14 @@ class TransportTest(ParamikoTest):
self.assertEqual([], w)
self.assertEqual([], e)
self.assertEqual(bytes(), chan.recv(16))
-
+
# make sure the pipe is still open for now...
p = chan._pipe
self.assertEqual(False, p._closed)
chan.close()
# ...and now is closed.
self.assertEqual(True, p._closed)
-
+
def test_B_renegotiate(self):
"""
verify that a transport can correctly renegotiate mid-stream.
@@ -402,7 +403,7 @@ class TransportTest(ParamikoTest):
for i in range(20):
chan.send('x' * 1024)
chan.close()
-
+
# allow a few seconds for the rekeying to complete
for i in range(50):
if self.tc.H != self.tc.session_id:
@@ -441,28 +442,28 @@ class TransportTest(ParamikoTest):
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
-
+
requested = []
def handler(c, addr_port):
addr, port = addr_port
requested.append((addr, port))
self.tc._queue_incoming_channel(c)
-
+
self.assertEqual(None, getattr(self.server, '_x11_screen_number', None))
cookie = chan.request_x11(0, single_connection=True, handler=handler)
self.assertEqual(0, self.server._x11_screen_number)
self.assertEqual('MIT-MAGIC-COOKIE-1', self.server._x11_auth_protocol)
self.assertEqual(cookie, self.server._x11_auth_cookie)
self.assertEqual(True, self.server._x11_single_connection)
-
+
x11_server = self.ts.open_x11_channel(('localhost', 6093))
x11_client = self.tc.accept()
self.assertEqual('localhost', requested[0][0])
self.assertEqual(6093, requested[0][1])
-
+
x11_server.send('hello')
self.assertEqual(b'hello', x11_client.recv(5))
-
+
x11_server.close()
x11_client.close()
chan.close()
@@ -477,13 +478,13 @@ class TransportTest(ParamikoTest):
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
-
+
requested = []
def handler(c, origin_addr_port, server_addr_port):
requested.append(origin_addr_port)
requested.append(server_addr_port)
self.tc._queue_incoming_channel(c)
-
+
port = self.tc.request_port_forward('127.0.0.1', 0, handler)
self.assertEqual(port, self.server._listen.getsockname()[1])
@@ -492,14 +493,14 @@ class TransportTest(ParamikoTest):
ss, _ = self.server._listen.accept()
sch = self.ts.open_forwarded_tcpip_channel(ss.getsockname(), ss.getpeername())
cch = self.tc.accept()
-
+
sch.send('hello')
self.assertEqual(b'hello', cch.recv(5))
sch.close()
cch.close()
ss.close()
cs.close()
-
+
# now cancel it.
self.tc.cancel_port_forward('127.0.0.1', port)
self.assertTrue(self.server._listen is None)
@@ -513,7 +514,7 @@ class TransportTest(ParamikoTest):
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
-
+
# open a port on the "server" that the client will ask to forward to.
greeting_server = socket.socket()
greeting_server.bind(('127.0.0.1', 0))
@@ -524,13 +525,13 @@ class TransportTest(ParamikoTest):
sch = self.ts.accept(1.0)
cch = socket.socket()
cch.connect(self.server._tcpip_dest)
-
+
ss, _ = greeting_server.accept()
ss.send(b'Hello!\n')
ss.close()
sch.send(cch.recv(8192))
sch.close()
-
+
self.assertEqual(b'Hello!\n', cs.recv(7))
cs.close()
@@ -544,14 +545,14 @@ class TransportTest(ParamikoTest):
chan.invoke_shell()
schan = self.ts.accept(1.0)
- # nothing should be ready
+ # nothing should be ready
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
self.assertEqual([], w)
self.assertEqual([], e)
-
+
schan.send_stderr('hello\n')
-
+
# something should be ready now (give it 1 second to appear)
for i in range(10):
r, w, e = select.select([chan], [], [], 0.1)
@@ -563,7 +564,7 @@ class TransportTest(ParamikoTest):
self.assertEqual([], e)
self.assertEqual(b'hello\n', chan.recv_stderr(6))
-
+
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
@@ -599,10 +600,10 @@ class TransportTest(ParamikoTest):
def test_I_rekey_deadlock(self):
"""
Regression test for deadlock when in-transit messages are received after MSG_KEXINIT is sent
-
+
Note: When this test fails, it may leak threads.
"""
-
+
# Test for an obscure deadlocking bug that can occur if we receive
# certain messages while initiating a key exchange.
#
@@ -619,7 +620,7 @@ class TransportTest(ParamikoTest):
# NeedRekeyException.
# 4. In response to NeedRekeyException, the transport thread sends
# MSG_KEXINIT to the remote host.
- #
+ #
# On the remote host (using any SSH implementation):
# 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST is sent.
# 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is sent.
@@ -654,7 +655,7 @@ class TransportTest(ParamikoTest):
self.done_event = done_event
self.watchdog_event = threading.Event()
self.last = None
-
+
def run(self):
try:
for i in range(1, 1+self.iterations):
@@ -666,7 +667,7 @@ class TransportTest(ParamikoTest):
finally:
self.done_event.set()
self.watchdog_event.set()
-
+
class ReceiveThread(threading.Thread):
def __init__(self, chan, done_event):
threading.Thread.__init__(self, None, None, self.__class__.__name__)
@@ -674,7 +675,7 @@ class TransportTest(ParamikoTest):
self.chan = chan
self.done_event = done_event
self.watchdog_event = threading.Event()
-
+
def run(self):
try:
while not self.done_event.isSet():
@@ -687,10 +688,10 @@ class TransportTest(ParamikoTest):
finally:
self.done_event.set()
self.watchdog_event.set()
-
+
self.setup_test_server()
self.ts.packetizer.REKEY_BYTES = 2048
-
+
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
@@ -712,7 +713,7 @@ class TransportTest(ParamikoTest):
self._send_message(m2)
return _negotiate_keys(self, m)
self.tc._handler_table[MSG_KEXINIT] = _negotiate_keys_wrapper
-
+
# Parameters for the test
iterations = 500 # The deadlock does not happen every time, but it
# should after many iterations.
@@ -724,12 +725,12 @@ class TransportTest(ParamikoTest):
# Start the sending thread
st = SendThread(schan, iterations, done_event)
st.start()
-
+
# Start the receiving thread
rt = ReceiveThread(chan, done_event)
rt.start()
- # Act as a watchdog timer, checking
+ # Act as a watchdog timer, checking
deadlocked = False
while not deadlocked and not done_event.isSet():
for event in (st.watchdog_event, rt.watchdog_event):
@@ -740,7 +741,7 @@ class TransportTest(ParamikoTest):
deadlocked = True
break
event.clear()
-
+
# Tell the threads to stop (if they haven't already stopped). Note
# that if one or more threads are deadlocked, they might hang around
# forever (until the process exits).
diff --git a/tests/test_util.py b/tests/test_util.py
index ff2eb10c..2827ded6 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -24,13 +24,12 @@ from binascii import hexlify
import errno
import os
from hashlib import sha1
+import unittest
import paramiko.util
from paramiko.util import lookup_ssh_host_config as host_config
from paramiko.py3compat import StringIO, byte_ord
-from tests.util import ParamikoTest
-
test_config_file = """\
Host *
User robey
@@ -65,7 +64,7 @@ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
from paramiko import *
-class UtilTest(ParamikoTest):
+class UtilTest(unittest.TestCase):
def test_1_import(self):
"""
verify that all the classes can be imported from paramiko.
diff --git a/tests/util.py b/tests/util.py
index 66d2696c..b546a7e1 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -1,17 +1,7 @@
import os
-import unittest
root_path = os.path.dirname(os.path.realpath(__file__))
-
-class ParamikoTest(unittest.TestCase):
- # for Python 2.3 and below
- if not hasattr(unittest.TestCase, 'assertTrue'):
- assertTrue = unittest.TestCase.failUnless
- if not hasattr(unittest.TestCase, 'assertFalse'):
- assertFalse = unittest.TestCase.failIf
-
-
def test_path(filename):
return os.path.join(root_path, filename)