summaryrefslogtreecommitdiffhomepage
path: root/tests/test_transport.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_transport.py')
-rw-r--r--tests/test_transport.py331
1 files changed, 197 insertions, 134 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 99cbc3e0..c05d6781 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -32,20 +32,32 @@ from hashlib import sha1
import unittest
from paramiko import (
- Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, SSHException,
- ChannelException, Packetizer, Channel,
+ Transport,
+ SecurityOptions,
+ ServerInterface,
+ RSAKey,
+ DSSKey,
+ SSHException,
+ ChannelException,
+ Packetizer,
+ Channel,
)
from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
from paramiko.common import (
- MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, MIN_PACKET_SIZE, MIN_WINDOW_SIZE,
- MAX_WINDOW_SIZE, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE,
+ MSG_KEXINIT,
+ cMSG_CHANNEL_WINDOW_ADJUST,
+ MIN_PACKET_SIZE,
+ MIN_WINDOW_SIZE,
+ MAX_WINDOW_SIZE,
+ DEFAULT_WINDOW_SIZE,
+ DEFAULT_MAX_PACKET_SIZE,
)
from paramiko.py3compat import bytes
from paramiko.message import Message
-from tests import skipUnlessBuiltin
-from tests.loop import LoopSocket
-from tests.util import test_path
+
+from .util import needs_builtin, _support, slow
+from .loop import LoopSocket
LONG_BANNER = """\
@@ -61,28 +73,28 @@ Maybe.
"""
-class NullServer (ServerInterface):
+class NullServer(ServerInterface):
paranoid_did_password = False
paranoid_did_public_key = False
- paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key'))
+ paranoid_key = DSSKey.from_private_key_file(_support("test_dss.key"))
def get_allowed_auths(self, username):
- if username == 'slowdive':
- return 'publickey,password'
- return 'publickey'
+ if username == "slowdive":
+ return "publickey,password"
+ return "publickey"
def check_auth_password(self, username, password):
- if (username == 'slowdive') and (password == 'pygmalion'):
+ if (username == "slowdive") and (password == "pygmalion"):
return AUTH_SUCCESSFUL
return AUTH_FAILED
def check_channel_request(self, kind, chanid):
- if kind == 'bogus':
+ if kind == "bogus":
return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
return OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != b'yes':
+ if command != b"yes":
return False
return True
@@ -95,9 +107,16 @@ class NullServer (ServerInterface):
# tho that's only supposed to occur if the request cannot be served.
# For now, leaving that the default unless test supplies specific
# 'acceptable' request kind
- return kind == 'acceptable'
-
- def check_channel_x11_request(self, channel, single_connection, auth_protocol, auth_cookie, screen_number):
+ return kind == "acceptable"
+
+ def check_channel_x11_request(
+ self,
+ channel,
+ single_connection,
+ auth_protocol,
+ auth_cookie,
+ screen_number,
+ ):
self._x11_single_connection = single_connection
self._x11_auth_protocol = auth_protocol
self._x11_auth_cookie = auth_cookie
@@ -106,7 +125,7 @@ class NullServer (ServerInterface):
def check_port_forward_request(self, addr, port):
self._listen = socket.socket()
- self._listen.bind(('127.0.0.1', 0))
+ self._listen.bind(("127.0.0.1", 0))
self._listen.listen(1)
return self._listen.getsockname()[1]
@@ -120,6 +139,7 @@ class NullServer (ServerInterface):
class TransportTest(unittest.TestCase):
+
def setUp(self):
self.socks = LoopSocket()
self.sockc = LoopSocket()
@@ -134,9 +154,9 @@ class TransportTest(unittest.TestCase):
self.sockc.close()
def setup_test_server(
- self, client_options=None, server_options=None, connect_kwargs=None,
+ self, client_options=None, server_options=None, connect_kwargs=None
):
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
@@ -152,8 +172,8 @@ class TransportTest(unittest.TestCase):
if connect_kwargs is None:
connect_kwargs = dict(
hostkey=public_host_key,
- username='slowdive',
- password='pygmalion',
+ username="slowdive",
+ password="pygmalion",
)
self.tc.connect(**connect_kwargs)
event.wait(1.0)
@@ -163,11 +183,11 @@ class TransportTest(unittest.TestCase):
def test_1_security_options(self):
o = self.tc.get_security_options()
self.assertEqual(type(o), SecurityOptions)
- self.assertTrue(('aes256-cbc', 'blowfish-cbc') != o.ciphers)
- o.ciphers = ('aes256-cbc', 'blowfish-cbc')
- self.assertEqual(('aes256-cbc', 'blowfish-cbc'), o.ciphers)
+ self.assertTrue(("aes256-cbc", "blowfish-cbc") != o.ciphers)
+ o.ciphers = ("aes256-cbc", "blowfish-cbc")
+ self.assertEqual(("aes256-cbc", "blowfish-cbc"), o.ciphers)
try:
- o.ciphers = ('aes256-cbc', 'made-up-cipher')
+ o.ciphers = ("aes256-cbc", "made-up-cipher")
self.assertTrue(False)
except ValueError:
pass
@@ -187,12 +207,18 @@ class TransportTest(unittest.TestCase):
o.compression = o.compression
def test_2_compute_key(self):
- self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929
- self.tc.H = b'\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3'
+ self.tc.K = (
+ 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929
+ )
+ self.tc.H = (
+ b"\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3"
+ )
self.tc.session_id = self.tc.H
- key = self.tc._compute_key('C', 32)
- self.assertEqual(b'207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995',
- hexlify(key).upper())
+ key = self.tc._compute_key("C", 32)
+ self.assertEqual(
+ b"207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995",
+ hexlify(key).upper(),
+ )
def test_3_simple(self):
"""
@@ -200,7 +226,7 @@ class TransportTest(unittest.TestCase):
loopback sockets. this is hardly "simple" but it's simpler than the
later tests. :)
"""
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -211,13 +237,14 @@ class TransportTest(unittest.TestCase):
self.assertEqual(False, self.tc.is_authenticated())
self.assertEqual(False, self.ts.is_authenticated())
self.ts.start_server(event, server)
- self.tc.connect(hostkey=public_host_key,
- username='slowdive', password='pygmalion')
+ self.tc.connect(
+ hostkey=public_host_key, username="slowdive", password="pygmalion"
+ )
event.wait(1.0)
self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
- self.assertEqual('slowdive', self.tc.get_username())
- self.assertEqual('slowdive', self.ts.get_username())
+ self.assertEqual("slowdive", self.tc.get_username())
+ self.assertEqual("slowdive", self.ts.get_username())
self.assertEqual(True, self.tc.is_authenticated())
self.assertEqual(True, self.ts.is_authenticated())
@@ -225,7 +252,7 @@ class TransportTest(unittest.TestCase):
"""
verify that a long banner doesn't mess up the handshake.
"""
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -233,8 +260,9 @@ class TransportTest(unittest.TestCase):
self.assertTrue(not event.is_set())
self.socks.send(LONG_BANNER)
self.ts.start_server(event, server)
- self.tc.connect(hostkey=public_host_key,
- username='slowdive', password='pygmalion')
+ self.tc.connect(
+ hostkey=public_host_key, username="slowdive", password="pygmalion"
+ )
event.wait(1.0)
self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
@@ -244,12 +272,14 @@ class TransportTest(unittest.TestCase):
verify that the client can demand odd handshake settings, and can
renegotiate keys in mid-stream.
"""
+
def force_algorithms(options):
- options.ciphers = ('aes256-cbc',)
- options.digests = ('hmac-md5-96',)
+ options.ciphers = ("aes256-cbc",)
+ options.digests = ("hmac-md5-96",)
+
self.setup_test_server(client_options=force_algorithms)
- self.assertEqual('aes256-cbc', self.tc.local_cipher)
- self.assertEqual('aes256-cbc', self.tc.remote_cipher)
+ self.assertEqual("aes256-cbc", self.tc.local_cipher)
+ self.assertEqual("aes256-cbc", self.tc.remote_cipher)
self.assertEqual(12, self.tc.packetizer.get_mac_size_out())
self.assertEqual(12, self.tc.packetizer.get_mac_size_in())
@@ -257,15 +287,16 @@ class TransportTest(unittest.TestCase):
self.tc.renegotiate_keys()
self.ts.send_ignore(1024)
+ @slow
def test_5_keepalive(self):
"""
verify that the keepalive will be sent.
"""
self.setup_test_server()
- self.assertEqual(None, getattr(self.server, '_global_request', None))
+ self.assertEqual(None, getattr(self.server, "_global_request", None))
self.tc.set_keepalive(1)
time.sleep(2)
- self.assertEqual('keepalive@lag.net', self.server._global_request)
+ self.assertEqual("keepalive@lag.net", self.server._global_request)
def test_6_exec_command(self):
"""
@@ -276,39 +307,41 @@ class TransportTest(unittest.TestCase):
chan = self.tc.open_session()
schan = self.ts.accept(1.0)
try:
- chan.exec_command(b'command contains \xfc and is not a valid UTF-8 string')
+ chan.exec_command(
+ b"command contains \xfc and is not a valid UTF-8 string"
+ )
self.assertTrue(False)
except SSHException:
pass
chan = self.tc.open_session()
- chan.exec_command('yes')
+ chan.exec_command("yes")
schan = self.ts.accept(1.0)
- schan.send('Hello there.\n')
- schan.send_stderr('This is on stderr.\n')
+ schan.send("Hello there.\n")
+ schan.send_stderr("This is on stderr.\n")
schan.close()
f = chan.makefile()
- self.assertEqual('Hello there.\n', f.readline())
- self.assertEqual('', f.readline())
+ self.assertEqual("Hello there.\n", f.readline())
+ self.assertEqual("", f.readline())
f = chan.makefile_stderr()
- self.assertEqual('This is on stderr.\n', f.readline())
- self.assertEqual('', f.readline())
+ self.assertEqual("This is on stderr.\n", f.readline())
+ self.assertEqual("", f.readline())
# now try it with combined stdout/stderr
chan = self.tc.open_session()
- chan.exec_command('yes')
+ chan.exec_command("yes")
schan = self.ts.accept(1.0)
- schan.send('Hello there.\n')
- schan.send_stderr('This is on stderr.\n')
+ schan.send("Hello there.\n")
+ schan.send_stderr("This is on stderr.\n")
schan.close()
chan.set_combine_stderr(True)
f = chan.makefile()
- self.assertEqual('Hello there.\n', f.readline())
- self.assertEqual('This is on stderr.\n', f.readline())
- self.assertEqual('', f.readline())
-
+ self.assertEqual("Hello there.\n", f.readline())
+ self.assertEqual("This is on stderr.\n", f.readline())
+ self.assertEqual("", f.readline())
+
def test_6a_channel_can_be_used_as_context_manager(self):
"""
verify that exec_command() does something reasonable.
@@ -317,13 +350,13 @@ class TransportTest(unittest.TestCase):
with self.tc.open_session() as chan:
with self.ts.accept(1.0) as schan:
- chan.exec_command('yes')
- schan.send('Hello there.\n')
+ chan.exec_command("yes")
+ schan.send("Hello there.\n")
schan.close()
f = chan.makefile()
- self.assertEqual('Hello there.\n', f.readline())
- self.assertEqual('', f.readline())
+ self.assertEqual("Hello there.\n", f.readline())
+ self.assertEqual("", f.readline())
def test_7_invoke_shell(self):
"""
@@ -333,11 +366,11 @@ class TransportTest(unittest.TestCase):
chan = self.tc.open_session()
chan.invoke_shell()
schan = self.ts.accept(1.0)
- chan.send('communist j. cat\n')
+ chan.send("communist j. cat\n")
f = schan.makefile()
- self.assertEqual('communist j. cat\n', f.readline())
+ self.assertEqual("communist j. cat\n", f.readline())
chan.close()
- self.assertEqual('', f.readline())
+ self.assertEqual("", f.readline())
def test_8_channel_exception(self):
"""
@@ -345,8 +378,8 @@ class TransportTest(unittest.TestCase):
"""
self.setup_test_server()
try:
- chan = self.tc.open_channel('bogus')
- self.fail('expected exception')
+ chan = self.tc.open_channel("bogus")
+ self.fail("expected exception")
except ChannelException as e:
self.assertTrue(e.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED)
@@ -358,8 +391,8 @@ class TransportTest(unittest.TestCase):
chan = self.tc.open_session()
schan = self.ts.accept(1.0)
- chan.exec_command('yes')
- schan.send('Hello there.\n')
+ chan.exec_command("yes")
+ schan.send("Hello there.\n")
self.assertTrue(not chan.exit_status_ready())
# trigger an EOF
schan.shutdown_read()
@@ -368,8 +401,8 @@ class TransportTest(unittest.TestCase):
schan.close()
f = chan.makefile()
- self.assertEqual('Hello there.\n', f.readline())
- self.assertEqual('', f.readline())
+ self.assertEqual("Hello there.\n", f.readline())
+ self.assertEqual("", f.readline())
count = 0
while not chan.exit_status_ready():
time.sleep(0.1)
@@ -394,7 +427,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual([], w)
self.assertEqual([], e)
- schan.send('hello\n')
+ schan.send("hello\n")
# something should be ready now (give it 1 second to appear)
for i in range(10):
@@ -406,7 +439,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual([], w)
self.assertEqual([], e)
- self.assertEqual(b'hello\n', chan.recv(6))
+ self.assertEqual(b"hello\n", chan.recv(6))
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
@@ -441,12 +474,12 @@ class TransportTest(unittest.TestCase):
self.setup_test_server()
self.tc.packetizer.REKEY_BYTES = 16384
chan = self.tc.open_session()
- chan.exec_command('yes')
+ chan.exec_command("yes")
schan = self.ts.accept(1.0)
self.assertEqual(self.tc.H, self.tc.session_id)
for i in range(20):
- chan.send('x' * 1024)
+ chan.send("x" * 1024)
chan.close()
# allow a few seconds for the rekeying to complete
@@ -462,18 +495,20 @@ class TransportTest(unittest.TestCase):
"""
verify that zlib compression is basically working.
"""
+
def force_compression(o):
- o.compression = ('zlib',)
+ o.compression = ("zlib",)
+
self.setup_test_server(force_compression, force_compression)
chan = self.tc.open_session()
- chan.exec_command('yes')
+ chan.exec_command("yes")
schan = self.ts.accept(1.0)
bytes = self.tc.packetizer._Packetizer__sent_bytes
- chan.send('x' * 1024)
+ chan.send("x" * 1024)
bytes2 = self.tc.packetizer._Packetizer__sent_bytes
- block_size = self.tc._cipher_info[self.tc.local_cipher]['block-size']
- mac_size = self.tc._mac_info[self.tc.local_mac]['size']
+ block_size = self.tc._cipher_info[self.tc.local_cipher]["block-size"]
+ mac_size = self.tc._mac_info[self.tc.local_mac]["size"]
# tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :)
self.assertTrue(bytes2 - bytes < 1024)
self.assertEqual(16 + block_size + mac_size, bytes2 - bytes)
@@ -487,29 +522,32 @@ class TransportTest(unittest.TestCase):
"""
self.setup_test_server()
chan = self.tc.open_session()
- chan.exec_command('yes')
+ chan.exec_command("yes")
schan = self.ts.accept(1.0)
requested = []
+
def handler(c, addr_port):
addr, port = addr_port
requested.append((addr, port))
self.tc._queue_incoming_channel(c)
- self.assertEqual(None, getattr(self.server, '_x11_screen_number', None))
+ self.assertEqual(
+ None, getattr(self.server, "_x11_screen_number", None)
+ )
cookie = chan.request_x11(0, single_connection=True, handler=handler)
self.assertEqual(0, self.server._x11_screen_number)
- self.assertEqual('MIT-MAGIC-COOKIE-1', self.server._x11_auth_protocol)
+ self.assertEqual("MIT-MAGIC-COOKIE-1", self.server._x11_auth_protocol)
self.assertEqual(cookie, self.server._x11_auth_cookie)
self.assertEqual(True, self.server._x11_single_connection)
- x11_server = self.ts.open_x11_channel(('localhost', 6093))
+ x11_server = self.ts.open_x11_channel(("localhost", 6093))
x11_client = self.tc.accept()
- self.assertEqual('localhost', requested[0][0])
+ self.assertEqual("localhost", requested[0][0])
self.assertEqual(6093, requested[0][1])
- x11_server.send('hello')
- self.assertEqual(b'hello', x11_client.recv(5))
+ x11_server.send("hello")
+ self.assertEqual(b"hello", x11_client.recv(5))
x11_server.close()
x11_client.close()
@@ -523,33 +561,36 @@ class TransportTest(unittest.TestCase):
"""
self.setup_test_server()
chan = self.tc.open_session()
- chan.exec_command('yes')
+ chan.exec_command("yes")
schan = self.ts.accept(1.0)
requested = []
+
def handler(c, origin_addr_port, server_addr_port):
requested.append(origin_addr_port)
requested.append(server_addr_port)
self.tc._queue_incoming_channel(c)
- port = self.tc.request_port_forward('127.0.0.1', 0, handler)
+ port = self.tc.request_port_forward("127.0.0.1", 0, handler)
self.assertEqual(port, self.server._listen.getsockname()[1])
cs = socket.socket()
- cs.connect(('127.0.0.1', port))
+ cs.connect(("127.0.0.1", port))
ss, _ = self.server._listen.accept()
- sch = self.ts.open_forwarded_tcpip_channel(ss.getsockname(), ss.getpeername())
+ sch = self.ts.open_forwarded_tcpip_channel(
+ ss.getsockname(), ss.getpeername()
+ )
cch = self.tc.accept()
- sch.send('hello')
- self.assertEqual(b'hello', cch.recv(5))
+ sch.send("hello")
+ self.assertEqual(b"hello", cch.recv(5))
sch.close()
cch.close()
ss.close()
cs.close()
# now cancel it.
- self.tc.cancel_port_forward('127.0.0.1', port)
+ self.tc.cancel_port_forward("127.0.0.1", port)
self.assertTrue(self.server._listen is None)
def test_F_port_forwarding(self):
@@ -559,27 +600,29 @@ class TransportTest(unittest.TestCase):
"""
self.setup_test_server()
chan = self.tc.open_session()
- chan.exec_command('yes')
+ chan.exec_command("yes")
schan = self.ts.accept(1.0)
# open a port on the "server" that the client will ask to forward to.
greeting_server = socket.socket()
- greeting_server.bind(('127.0.0.1', 0))
+ greeting_server.bind(("127.0.0.1", 0))
greeting_server.listen(1)
greeting_port = greeting_server.getsockname()[1]
- cs = self.tc.open_channel('direct-tcpip', ('127.0.0.1', greeting_port), ('', 9000))
+ cs = self.tc.open_channel(
+ "direct-tcpip", ("127.0.0.1", greeting_port), ("", 9000)
+ )
sch = self.ts.accept(1.0)
cch = socket.socket()
cch.connect(self.server._tcpip_dest)
ss, _ = greeting_server.accept()
- ss.send(b'Hello!\n')
+ ss.send(b"Hello!\n")
ss.close()
sch.send(cch.recv(8192))
sch.close()
- self.assertEqual(b'Hello!\n', cs.recv(7))
+ self.assertEqual(b"Hello!\n", cs.recv(7))
cs.close()
def test_G_stderr_select(self):
@@ -598,7 +641,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual([], w)
self.assertEqual([], e)
- schan.send_stderr('hello\n')
+ schan.send_stderr("hello\n")
# something should be ready now (give it 1 second to appear)
for i in range(10):
@@ -610,7 +653,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual([], w)
self.assertEqual([], e)
- self.assertEqual(b'hello\n', chan.recv_stderr(6))
+ self.assertEqual(b"hello\n", chan.recv_stderr(6))
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
@@ -632,8 +675,8 @@ class TransportTest(unittest.TestCase):
self.assertEqual(chan.send_ready(), True)
total = 0
- K = '*' * 1024
- limit = 1+(64 * 2 ** 15)
+ K = "*" * 1024
+ limit = 1 + (64 * 2 ** 15)
while total < limit:
chan.send(K)
total += len(K)
@@ -695,8 +738,11 @@ class TransportTest(unittest.TestCase):
# expires, a deadlock is assumed.
class SendThread(threading.Thread):
+
def __init__(self, chan, iterations, done_event):
- threading.Thread.__init__(self, None, None, self.__class__.__name__)
+ threading.Thread.__init__(
+ self, None, None, self.__class__.__name__
+ )
self.setDaemon(True)
self.chan = chan
self.iterations = iterations
@@ -706,19 +752,22 @@ class TransportTest(unittest.TestCase):
def run(self):
try:
- for i in range(1, 1+self.iterations):
+ for i in range(1, 1 + self.iterations):
if self.done_event.is_set():
break
self.watchdog_event.set()
- #print i, "SEND"
+ # print i, "SEND"
self.chan.send("x" * 2048)
finally:
self.done_event.set()
self.watchdog_event.set()
class ReceiveThread(threading.Thread):
+
def __init__(self, chan, done_event):
- threading.Thread.__init__(self, None, None, self.__class__.__name__)
+ threading.Thread.__init__(
+ self, None, None, self.__class__.__name__
+ )
self.setDaemon(True)
self.chan = chan
self.done_event = done_event
@@ -741,30 +790,34 @@ class TransportTest(unittest.TestCase):
self.ts.packetizer.REKEY_BYTES = 2048
chan = self.tc.open_session()
- chan.exec_command('yes')
+ chan.exec_command("yes")
schan = self.ts.accept(1.0)
# Monkey patch the client's Transport._handler_table so that the client
# sends MSG_CHANNEL_WINDOW_ADJUST whenever it receives an initial
# MSG_KEXINIT. This is used to simulate the effect of network latency
# on a real MSG_CHANNEL_WINDOW_ADJUST message.
- self.tc._handler_table = self.tc._handler_table.copy() # copy per-class dictionary
+ self.tc._handler_table = (
+ self.tc._handler_table.copy()
+ ) # copy per-class dictionary
_negotiate_keys = self.tc._handler_table[MSG_KEXINIT]
+
def _negotiate_keys_wrapper(self, m):
- if self.local_kex_init is None: # Remote side sent KEXINIT
+ if self.local_kex_init is None: # Remote side sent KEXINIT
# Simulate in-transit MSG_CHANNEL_WINDOW_ADJUST by sending it
# before responding to the incoming MSG_KEXINIT.
m2 = Message()
m2.add_byte(cMSG_CHANNEL_WINDOW_ADJUST)
m2.add_int(chan.remote_chanid)
- m2.add_int(1) # bytes to add
+ m2.add_int(1) # bytes to add
self._send_message(m2)
return _negotiate_keys(self, m)
+
self.tc._handler_table[MSG_KEXINIT] = _negotiate_keys_wrapper
# Parameters for the test
- iterations = 500 # The deadlock does not happen every time, but it
- # should after many iterations.
+ iterations = 500 # The deadlock does not happen every time, but it
+ # should after many iterations.
timeout = 5
# This event is set when the test is completed
@@ -806,20 +859,25 @@ class TransportTest(unittest.TestCase):
"""
verify that we conform to the rfc of packet and window sizes.
"""
- for val, correct in [(4095, MIN_PACKET_SIZE),
- (None, DEFAULT_MAX_PACKET_SIZE),
- (2**32, MAX_WINDOW_SIZE)]:
+ for val, correct in [
+ (4095, MIN_PACKET_SIZE),
+ (None, DEFAULT_MAX_PACKET_SIZE),
+ (2 ** 32, MAX_WINDOW_SIZE),
+ ]:
self.assertEqual(self.tc._sanitize_packet_size(val), correct)
def test_K_sanitze_window_size(self):
"""
verify that we conform to the rfc of packet and window sizes.
"""
- for val, correct in [(32767, MIN_WINDOW_SIZE),
- (None, DEFAULT_WINDOW_SIZE),
- (2**32, MAX_WINDOW_SIZE)]:
+ for val, correct in [
+ (32767, MIN_WINDOW_SIZE),
+ (None, DEFAULT_WINDOW_SIZE),
+ (2 ** 32, MAX_WINDOW_SIZE),
+ ]:
self.assertEqual(self.tc._sanitize_window_size(val), correct)
+ @slow
def test_L_handshake_timeout(self):
"""
verify that we can get a hanshake timeout.
@@ -832,15 +890,17 @@ class TransportTest(unittest.TestCase):
# (Doing this on the server's transport *sounds* more 'correct' but
# actually doesn't work nearly as well for whatever reason.)
class SlowPacketizer(Packetizer):
+
def read_message(self):
time.sleep(1)
return super(SlowPacketizer, self).read_message()
+
# NOTE: prettttty sure since the replaced .packetizer Packetizer is now
# no longer doing anything with its copy of the socket...everything'll
# be fine. Even tho it's a bit squicky.
self.tc.packetizer = SlowPacketizer(self.tc.sock)
# Continue with regular test red tape.
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support("test_rsa.key"))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -848,10 +908,13 @@ class TransportTest(unittest.TestCase):
self.assertTrue(not event.is_set())
self.tc.handshake_timeout = 0.000000000001
self.ts.start_server(event, server)
- self.assertRaises(EOFError, self.tc.connect,
- hostkey=public_host_key,
- username='slowdive',
- password='pygmalion')
+ self.assertRaises(
+ EOFError,
+ self.tc.connect,
+ hostkey=public_host_key,
+ username="slowdive",
+ password="pygmalion",
+ )
def test_M_select_after_close(self):
"""
@@ -892,13 +955,13 @@ class TransportTest(unittest.TestCase):
expected = text.encode("utf-8")
self.assertEqual(sfile.read(len(expected)), expected)
- @skipUnlessBuiltin('buffer')
+ @needs_builtin("buffer")
def test_channel_send_buffer(self):
"""
verify sending buffer instances to a channel
"""
self.setup_test_server()
- data = 3 * b'some test data\n whole'
+ data = 3 * b"some test data\n whole"
with self.tc.open_session() as chan:
schan = self.ts.accept(1.0)
if schan is None:
@@ -915,13 +978,13 @@ class TransportTest(unittest.TestCase):
chan.sendall(buffer(data))
self.assertEqual(sfile.read(len(data)), data)
- @skipUnlessBuiltin('memoryview')
+ @needs_builtin("memoryview")
def test_channel_send_memoryview(self):
"""
verify sending memoryview instances to a channel
"""
self.setup_test_server()
- data = 3 * b'some test data\n whole'
+ data = 3 * b"some test data\n whole"
with self.tc.open_session() as chan:
schan = self.ts.accept(1.0)
if schan is None:
@@ -932,7 +995,7 @@ class TransportTest(unittest.TestCase):
sent = 0
view = memoryview(data)
while sent < len(view):
- sent += chan.send(view[sent:sent+8])
+ sent += chan.send(view[sent : sent + 8])
self.assertEqual(sfile.read(len(data)), data)
# sendall() accepts a memoryview instance
@@ -952,7 +1015,7 @@ class TransportTest(unittest.TestCase):
self.setup_test_server(connect_kwargs={})
# NOTE: this dummy global request kind would normally pass muster
# from the test server.
- self.tc.global_request('acceptable')
+ self.tc.global_request("acceptable")
# Global requests never raise exceptions, even on failure (not sure why
# this was the original design...ugh.) Best we can do to tell failure
# happened is that the client transport's global_response was set back
@@ -967,7 +1030,7 @@ class TransportTest(unittest.TestCase):
# an exception on the client side, unlike the general case...)
self.setup_test_server(connect_kwargs={})
try:
- self.tc.request_port_forward('localhost', 1234)
+ self.tc.request_port_forward("localhost", 1234)
except SSHException as e:
assert "forwarding request denied" in str(e)
else: