summaryrefslogtreecommitdiffhomepage
path: root/tests/test_packetizer.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_packetizer.py')
-rw-r--r--tests/test_packetizer.py35
1 files changed, 21 insertions, 14 deletions
diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py
index 02173292..6920f08e 100644
--- a/tests/test_packetizer.py
+++ b/tests/test_packetizer.py
@@ -27,27 +27,28 @@ from hashlib import sha1
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes
-from tests.loop import LoopSocket
-
from paramiko import Message, Packetizer, util
from paramiko.common import byte_chr, zero_byte
+from .loop import LoopSocket
+
+
x55 = byte_chr(0x55)
x1f = byte_chr(0x1f)
-class PacketizerTest (unittest.TestCase):
+class PacketizerTest(unittest.TestCase):
def test_1_write(self):
rsock = LoopSocket()
wsock = LoopSocket()
rsock.link(wsock)
p = Packetizer(wsock)
- p.set_log(util.get_logger('paramiko.transport'))
+ p.set_log(util.get_logger("paramiko.transport"))
p.set_hexdump(True)
encryptor = Cipher(
algorithms.AES(zero_byte * 16),
modes.CBC(x55 * 16),
- backend=default_backend()
+ backend=default_backend(),
).encryptor()
p.set_outbound_cipher(encryptor, 16, sha1, 12, x1f * 20)
@@ -62,22 +63,27 @@ class PacketizerTest (unittest.TestCase):
data = rsock.recv(100)
# 32 + 12 bytes of MAC = 44
self.assertEqual(44, len(data))
- self.assertEqual(b'\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0', data[:16])
+ self.assertEqual(
+ b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0",
+ data[:16],
+ )
def test_2_read(self):
rsock = LoopSocket()
wsock = LoopSocket()
rsock.link(wsock)
p = Packetizer(rsock)
- p.set_log(util.get_logger('paramiko.transport'))
+ p.set_log(util.get_logger("paramiko.transport"))
p.set_hexdump(True)
decryptor = Cipher(
algorithms.AES(zero_byte * 16),
modes.CBC(x55 * 16),
- backend=default_backend()
+ backend=default_backend(),
).decryptor()
p.set_inbound_cipher(decryptor, 16, sha1, 12, x1f * 20)
- wsock.send(b'\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0\x90\xd2\x16\x56\x0d\x71\x73\x61\x38\x7c\x4c\x3d\xfb\x97\x7d\xe2\x6e\x03\xb1\xa0\xc2\x1c\xd6\x41\x41\x4c\xb4\x59')
+ wsock.send(
+ b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0\x90\xd2\x16\x56\x0d\x71\x73\x61\x38\x7c\x4c\x3d\xfb\x97\x7d\xe2\x6e\x03\xb1\xa0\xc2\x1c\xd6\x41\x41\x4c\xb4\x59"
+ )
cmd, m = p.read_message()
self.assertEqual(100, cmd)
self.assertEqual(100, m.get_int())
@@ -85,18 +91,18 @@ class PacketizerTest (unittest.TestCase):
self.assertEqual(900, m.get_int())
def test_3_closed(self):
- if sys.platform.startswith("win"): # no SIGALRM on windows
+ if sys.platform.startswith("win"): # no SIGALRM on windows
return
rsock = LoopSocket()
wsock = LoopSocket()
rsock.link(wsock)
p = Packetizer(wsock)
- p.set_log(util.get_logger('paramiko.transport'))
+ p.set_log(util.get_logger("paramiko.transport"))
p.set_hexdump(True)
encryptor = Cipher(
algorithms.AES(zero_byte * 16),
modes.CBC(x55 * 16),
- backend=default_backend()
+ backend=default_backend(),
).encryptor()
p.set_outbound_cipher(encryptor, 16, sha1, 12, x1f * 20)
@@ -115,12 +121,12 @@ class PacketizerTest (unittest.TestCase):
class TimeoutError(Exception):
def __init__(self, error_message):
- if hasattr(errno, 'ETIME'):
+ if hasattr(errno, "ETIME"):
self.message = os.sterror(errno.ETIME)
else:
self.messaage = error_message
- def timeout(seconds=1, error_message='Timer expired'):
+ def timeout(seconds=1, error_message="Timer expired"):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
@@ -137,5 +143,6 @@ class PacketizerTest (unittest.TestCase):
return wraps(func)(wrapper)
return decorator
+
send = timeout()(p.send_message)
self.assertRaises(EOFError, send, m)