summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2023-12-17 18:47:33 -0500
committerJeff Forcier <jeff@bitprophet.org>2023-12-17 18:47:33 -0500
commit33508c920309860c4a775be70f209c2a400e18ec (patch)
tree21c2796a9f132850200f8a6f4deb7e7ba4337689 /tests
parent96db1e2be856eac66631761bae41167a1ebd2b4e (diff)
Expand MessageOrderError use to handle more packet types
Diffstat (limited to 'tests')
-rw-r--r--tests/_util.py7
-rw-r--r--tests/test_transport.py58
2 files changed, 57 insertions, 8 deletions
diff --git a/tests/_util.py b/tests/_util.py
index f705e934..f0ae1d41 100644
--- a/tests/_util.py
+++ b/tests/_util.py
@@ -346,6 +346,7 @@ def server(
pubkeys=None,
catch_error=False,
transport_factory=None,
+ server_transport_factory=None,
defer=False,
skip_verify=False,
):
@@ -373,6 +374,8 @@ def server(
Necessary for connection_time exception testing.
:param transport_factory:
Like the same-named param in SSHClient: which Transport class to use.
+ :param server_transport_factory:
+ Like ``transport_factory``, but only impacts the server transport.
:param bool defer:
Whether to defer authentication during connecting.
@@ -399,8 +402,10 @@ def server(
sockc.link(socks)
if transport_factory is None:
transport_factory = Transport
+ if server_transport_factory is None:
+ server_transport_factory = transport_factory
tc = transport_factory(sockc, **dict(init, **client_init))
- ts = transport_factory(socks, **dict(init, **server_init))
+ ts = server_transport_factory(socks, **dict(init, **server_init))
if hostkey is None:
hostkey = RSAKey.from_private_key_file(_support("rsa.key"))
diff --git a/tests/test_transport.py b/tests/test_transport.py
index ecf0a184..655ae071 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -52,11 +52,15 @@ from paramiko.common import (
MAX_WINDOW_SIZE,
MIN_PACKET_SIZE,
MIN_WINDOW_SIZE,
+ MSG_CHANNEL_OPEN,
+ MSG_DEBUG,
+ MSG_IGNORE,
MSG_KEXINIT,
+ MSG_UNIMPLEMENTED,
MSG_USERAUTH_SUCCESS,
+ byte_chr,
cMSG_CHANNEL_WINDOW_ADJUST,
cMSG_UNIMPLEMENTED,
- byte_chr,
)
from paramiko.message import Message
@@ -86,6 +90,10 @@ Note: An SSH banner may eventually appear.
Maybe.
"""
+# Faux 'packet type' we do not implement and are unlikely ever to (but which is
+# technically "within spec" re RFC 4251
+MSG_FUGGEDABOUTIT = 253
+
class TransportTest(unittest.TestCase):
# TODO: this can get nuked once ServiceRequestingTransport becomes the
@@ -1302,13 +1310,49 @@ class TestStrictKex:
)
)
- def test_MessageOrderError_raised_on_out_of_order_messages(self):
+ @mark.parametrize(
+ "ptype",
+ (
+ # "normal" but definitely out-of-order message
+ MSG_CHANNEL_OPEN,
+ # Normally ignored, but not in this case
+ MSG_IGNORE,
+ # Normally triggers debug parsing, but not in this case
+ MSG_DEBUG,
+ # Normally ignored, but...you get the idea
+ MSG_UNIMPLEMENTED,
+ # Not real, so would normally trigger us /sending/
+ # MSG_UNIMPLEMENTED, but...
+ MSG_FUGGEDABOUTIT,
+ ),
+ )
+ def test_MessageOrderError_non_kex_messages_in_initial_kex(self, ptype):
+ class AttackTransport(Transport):
+ # Easiest apparent spot on server side which is:
+ # - late enough for both ends to have handshook on strict mode
+ # - early enough to be in the window of opportunity for Terrapin
+ # attack; essentially during actual kex, when the engine is
+ # waiting for things like MSG_KEXECDH_REPLY (for eg curve25519).
+ def _negotiate_keys(self, m):
+ self.clear_to_send_lock.acquire()
+ try:
+ self.clear_to_send.clear()
+ finally:
+ self.clear_to_send_lock.release()
+ if self.local_kex_init is None:
+ # remote side wants to renegotiate
+ self._send_kex_init()
+ self._parse_kex_init(m)
+ # Here, we would normally kick over to kex_engine, but instead
+ # we want the server to send the OOO message.
+ m = Message()
+ m.add_byte(byte_chr(ptype))
+ # rest of packet unnecessary...
+ self._send_message(m)
+
with raises(MessageOrderError):
- with server() as (tc, _):
- # A bit artificial as it's outside kexinit/handshake, but much
- # easier to trigger and still in line with behavior under test
- tc._expect_packet(MSG_KEXINIT)
- tc.open_session()
+ with server(server_transport_factory=AttackTransport) as (tc, _):
+ pass # above should run and except during connect()
def test_SSHException_raised_on_out_of_order_messages_when_not_strict(
self,