summaryrefslogtreecommitdiffhomepage
path: root/tests/test_transport.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_transport.py')
-rw-r--r--tests/test_transport.py24
1 files changed, 13 insertions, 11 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 3e352919..00639d04 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -33,7 +33,7 @@ import unittest
from paramiko import (
Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, SSHException,
- ChannelException, Packetizer,
+ ChannelException, Packetizer,
)
from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
@@ -43,9 +43,9 @@ from paramiko.common import (
)
from paramiko.py3compat import bytes
from paramiko.message import Message
-from tests import skipUnlessBuiltin
-from tests.loop import LoopSocket
-from tests.util import test_path
+
+from .util import needs_builtin, _support, slow
+from .loop import LoopSocket
LONG_BANNER = """\
@@ -64,7 +64,7 @@ Maybe.
class NullServer (ServerInterface):
paranoid_did_password = False
paranoid_did_public_key = False
- paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key'))
+ paranoid_key = DSSKey.from_private_key_file(_support('test_dss.key'))
def get_allowed_auths(self, username):
if username == 'slowdive':
@@ -130,7 +130,7 @@ class TransportTest(unittest.TestCase):
self.sockc.close()
def setup_test_server(self, client_options=None, server_options=None):
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
@@ -189,7 +189,7 @@ class TransportTest(unittest.TestCase):
loopback sockets. this is hardly "simple" but it's simpler than the
later tests. :)
"""
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -214,7 +214,7 @@ class TransportTest(unittest.TestCase):
"""
verify that a long banner doesn't mess up the handshake.
"""
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -246,6 +246,7 @@ class TransportTest(unittest.TestCase):
self.tc.renegotiate_keys()
self.ts.send_ignore(1024)
+ @slow
def test_5_keepalive(self):
"""
verify that the keepalive will be sent.
@@ -809,6 +810,7 @@ class TransportTest(unittest.TestCase):
(2**32, MAX_WINDOW_SIZE)]:
self.assertEqual(self.tc._sanitize_window_size(val), correct)
+ @slow
def test_L_handshake_timeout(self):
"""
verify that we can get a hanshake timeout.
@@ -829,7 +831,7 @@ class TransportTest(unittest.TestCase):
# be fine. Even tho it's a bit squicky.
self.tc.packetizer = SlowPacketizer(self.tc.sock)
# Continue with regular test red tape.
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -881,7 +883,7 @@ class TransportTest(unittest.TestCase):
expected = text.encode("utf-8")
self.assertEqual(sfile.read(len(expected)), expected)
- @skipUnlessBuiltin('buffer')
+ @needs_builtin('buffer')
def test_channel_send_buffer(self):
"""
verify sending buffer instances to a channel
@@ -904,7 +906,7 @@ class TransportTest(unittest.TestCase):
chan.sendall(buffer(data))
self.assertEqual(sfile.read(len(data)), data)
- @skipUnlessBuiltin('memoryview')
+ @needs_builtin('memoryview')
def test_channel_send_memoryview(self):
"""
verify sending memoryview instances to a channel