summaryrefslogtreecommitdiffhomepage
path: root/tests/test_transport.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_transport.py')
-rw-r--r--tests/test_transport.py111
1 files changed, 56 insertions, 55 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 2b8ee3bc..ad267e28 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -28,36 +28,32 @@ import socket
import time
import threading
import random
-from hashlib import sha1
import unittest
from mock import Mock
from paramiko import (
- Transport,
- SecurityOptions,
- ServerInterface,
- RSAKey,
- DSSKey,
- SSHException,
+ AuthHandler,
ChannelException,
+ DSSKey,
Packetizer,
- Channel,
- AuthHandler,
+ RSAKey,
+ SSHException,
+ SecurityOptions,
+ ServerInterface,
+ Transport,
)
from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
from paramiko.common import (
- MSG_KEXINIT,
- cMSG_CHANNEL_WINDOW_ADJUST,
- cMSG_UNIMPLEMENTED,
+ DEFAULT_MAX_PACKET_SIZE,
+ DEFAULT_WINDOW_SIZE,
+ MAX_WINDOW_SIZE,
MIN_PACKET_SIZE,
MIN_WINDOW_SIZE,
- MAX_WINDOW_SIZE,
- DEFAULT_WINDOW_SIZE,
- DEFAULT_MAX_PACKET_SIZE,
- MSG_NAMES,
- MSG_UNIMPLEMENTED,
+ MSG_KEXINIT,
MSG_USERAUTH_SUCCESS,
+ cMSG_CHANNEL_WINDOW_ADJUST,
+ cMSG_UNIMPLEMENTED,
)
from paramiko.py3compat import bytes, byte_chr
from paramiko.message import Message
@@ -185,7 +181,7 @@ class TransportTest(unittest.TestCase):
self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
- def test_1_security_options(self):
+ def test_security_options(self):
o = self.tc.get_security_options()
self.assertEqual(type(o), SecurityOptions)
self.assertTrue(("aes256-cbc", "blowfish-cbc") != o.ciphers)
@@ -202,7 +198,7 @@ class TransportTest(unittest.TestCase):
except TypeError:
pass
- def test_1b_security_options_reset(self):
+ def testb_security_options_reset(self):
o = self.tc.get_security_options()
# should not throw any exceptions
o.ciphers = o.ciphers
@@ -211,17 +207,17 @@ class TransportTest(unittest.TestCase):
o.kex = o.kex
o.compression = o.compression
- def test_2_compute_key(self):
- self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929
- self.tc.H = b"\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3"
+ def test_compute_key(self):
+ self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929 # noqa
+ self.tc.H = b"\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3" # noqa
self.tc.session_id = self.tc.H
key = self.tc._compute_key("C", 32)
self.assertEqual(
- b"207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995",
+ b"207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995", # noqa
hexlify(key).upper(),
)
- def test_3_simple(self):
+ def test_simple(self):
"""
verify that we can establish an ssh link with ourselves across the
loopback sockets. this is hardly "simple" but it's simpler than the
@@ -249,7 +245,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual(True, self.tc.is_authenticated())
self.assertEqual(True, self.ts.is_authenticated())
- def test_3a_long_banner(self):
+ def testa_long_banner(self):
"""
verify that a long banner doesn't mess up the handshake.
"""
@@ -268,7 +264,7 @@ class TransportTest(unittest.TestCase):
self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
- def test_4_special(self):
+ def test_special(self):
"""
verify that the client can demand odd handshake settings, and can
renegotiate keys in mid-stream.
@@ -289,7 +285,7 @@ class TransportTest(unittest.TestCase):
self.ts.send_ignore(1024)
@slow
- def test_5_keepalive(self):
+ def test_keepalive(self):
"""
verify that the keepalive will be sent.
"""
@@ -299,7 +295,7 @@ class TransportTest(unittest.TestCase):
time.sleep(2)
self.assertEqual("keepalive@lag.net", self.server._global_request)
- def test_6_exec_command(self):
+ def test_exec_command(self):
"""
verify that exec_command() does something reasonable.
"""
@@ -343,7 +339,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual("This is on stderr.\n", f.readline())
self.assertEqual("", f.readline())
- def test_6a_channel_can_be_used_as_context_manager(self):
+ def testa_channel_can_be_used_as_context_manager(self):
"""
verify that exec_command() does something reasonable.
"""
@@ -359,7 +355,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual("Hello there.\n", f.readline())
self.assertEqual("", f.readline())
- def test_7_invoke_shell(self):
+ def test_invoke_shell(self):
"""
verify that invoke_shell() does something reasonable.
"""
@@ -373,18 +369,18 @@ class TransportTest(unittest.TestCase):
chan.close()
self.assertEqual("", f.readline())
- def test_8_channel_exception(self):
+ def test_channel_exception(self):
"""
verify that ChannelException is thrown for a bad open-channel request.
"""
self.setup_test_server()
try:
- chan = self.tc.open_channel("bogus")
+ self.tc.open_channel("bogus")
self.fail("expected exception")
except ChannelException as e:
self.assertTrue(e.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED)
- def test_9_exit_status(self):
+ def test_exit_status(self):
"""
verify that get_exit_status() works.
"""
@@ -413,7 +409,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual(23, chan.recv_exit_status())
chan.close()
- def test_A_select(self):
+ def test_select(self):
"""
verify that select() on a channel works.
"""
@@ -468,7 +464,7 @@ class TransportTest(unittest.TestCase):
# ...and now is closed.
self.assertEqual(True, p._closed)
- def test_B_renegotiate(self):
+ def test_renegotiate(self):
"""
verify that a transport can correctly renegotiate mid-stream.
"""
@@ -492,7 +488,7 @@ class TransportTest(unittest.TestCase):
schan.close()
- def test_C_compression(self):
+ def test_compression(self):
"""
verify that zlib compression is basically working.
"""
@@ -510,14 +506,15 @@ class TransportTest(unittest.TestCase):
bytes2 = self.tc.packetizer._Packetizer__sent_bytes
block_size = self.tc._cipher_info[self.tc.local_cipher]["block-size"]
mac_size = self.tc._mac_info[self.tc.local_mac]["size"]
- # tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :)
+ # tests show this is actually compressed to *52 bytes*! including
+ # packet overhead! nice!! :)
self.assertTrue(bytes2 - bytes < 1024)
self.assertEqual(16 + block_size + mac_size, bytes2 - bytes)
chan.close()
schan.close()
- def test_D_x11(self):
+ def test_x11(self):
"""
verify that an x11 port can be requested and opened.
"""
@@ -555,7 +552,7 @@ class TransportTest(unittest.TestCase):
chan.close()
schan.close()
- def test_E_reverse_port_forwarding(self):
+ def test_reverse_port_forwarding(self):
"""
verify that a client can ask the server to open a reverse port for
forwarding.
@@ -563,7 +560,7 @@ class TransportTest(unittest.TestCase):
self.setup_test_server()
chan = self.tc.open_session()
chan.exec_command("yes")
- schan = self.ts.accept(1.0)
+ self.ts.accept(1.0)
requested = []
@@ -594,7 +591,7 @@ class TransportTest(unittest.TestCase):
self.tc.cancel_port_forward("127.0.0.1", port)
self.assertTrue(self.server._listen is None)
- def test_F_port_forwarding(self):
+ def test_port_forwarding(self):
"""
verify that a client can forward new connections from a locally-
forwarded port.
@@ -602,7 +599,7 @@ class TransportTest(unittest.TestCase):
self.setup_test_server()
chan = self.tc.open_session()
chan.exec_command("yes")
- schan = self.ts.accept(1.0)
+ self.ts.accept(1.0)
# open a port on the "server" that the client will ask to forward to.
greeting_server = socket.socket()
@@ -626,7 +623,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual(b"Hello!\n", cs.recv(7))
cs.close()
- def test_G_stderr_select(self):
+ def test_stderr_select(self):
"""
verify that select() on a channel works even if only stderr is
receiving data.
@@ -665,7 +662,7 @@ class TransportTest(unittest.TestCase):
schan.close()
chan.close()
- def test_H_send_ready(self):
+ def test_send_ready(self):
"""
verify that send_ready() indicates when a send would not block.
"""
@@ -689,9 +686,10 @@ class TransportTest(unittest.TestCase):
chan.close()
self.assertEqual(chan.send_ready(), True)
- def test_I_rekey_deadlock(self):
+ def test_rekey_deadlock(self):
"""
- Regression test for deadlock when in-transit messages are received after MSG_KEXINIT is sent
+ Regression test for deadlock when in-transit messages are received
+ after MSG_KEXINIT is sent
Note: When this test fails, it may leak threads.
"""
@@ -714,12 +712,15 @@ class TransportTest(unittest.TestCase):
# MSG_KEXINIT to the remote host.
#
# On the remote host (using any SSH implementation):
- # 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST is sent.
- # 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is sent.
+ # 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST
+ # is sent.
+ # 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is
+ # sent.
#
# In the main thread:
# 7. The user's program calls Channel.send().
- # 8. Channel.send acquires Channel.lock, then calls Transport._send_user_message().
+ # 8. Channel.send acquires Channel.lock, then calls
+ # Transport._send_user_message().
# 9. Transport._send_user_message waits for Transport.clear_to_send
# to be set (i.e., it waits for re-keying to complete).
# Channel.lock is still held.
@@ -854,7 +855,7 @@ class TransportTest(unittest.TestCase):
schan.close()
chan.close()
- def test_J_sanitze_packet_size(self):
+ def test_sanitze_packet_size(self):
"""
verify that we conform to the rfc of packet and window sizes.
"""
@@ -865,7 +866,7 @@ class TransportTest(unittest.TestCase):
]:
self.assertEqual(self.tc._sanitize_packet_size(val), correct)
- def test_K_sanitze_window_size(self):
+ def test_sanitze_window_size(self):
"""
verify that we conform to the rfc of packet and window sizes.
"""
@@ -877,7 +878,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual(self.tc._sanitize_window_size(val), correct)
@slow
- def test_L_handshake_timeout(self):
+ def test_handshake_timeout(self):
"""
verify that we can get a hanshake timeout.
"""
@@ -914,7 +915,7 @@ class TransportTest(unittest.TestCase):
password="pygmalion",
)
- def test_M_select_after_close(self):
+ def test_select_after_close(self):
"""
verify that select works when a channel is already closed.
"""
@@ -969,11 +970,11 @@ class TransportTest(unittest.TestCase):
# send() accepts buffer instances
sent = 0
while sent < len(data):
- sent += chan.send(buffer(data, sent, 8))
+ sent += chan.send(buffer(data, sent, 8)) # noqa
self.assertEqual(sfile.read(len(data)), data)
# sendall() accepts a buffer instance
- chan.sendall(buffer(data))
+ chan.sendall(buffer(data)) # noqa
self.assertEqual(sfile.read(len(data)), data)
@needs_builtin("memoryview")