summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2023-12-16 16:17:58 -0500
committerJeff Forcier <jeff@bitprophet.org>2023-12-16 16:17:58 -0500
commit75e311d3c0845a316b6e7b3fae2488d86ad5a270 (patch)
tree6702433a8f92b31c6b5ad52786ee3d05ba42c8d0
parent73f079f5a4bbba7f3048dadbe05b24242206745e (diff)
Enforce zero seqno on kexinit
-rw-r--r--paramiko/transport.py18
-rw-r--r--sites/www/changelog.rst3
-rw-r--r--tests/test_transport.py62
3 files changed, 75 insertions, 8 deletions
diff --git a/paramiko/transport.py b/paramiko/transport.py
index 44361b0c..c819d9a6 100644
--- a/paramiko/transport.py
+++ b/paramiko/transport.py
@@ -336,6 +336,7 @@ class Transport(threading.Thread, ClosingContextManager):
disabled_algorithms=None,
server_sig_algs=True,
strict_kex=True,
+ packetizer_class=None,
):
"""
Create a new SSH session over an existing socket, or socket-like
@@ -406,6 +407,9 @@ class Transport(threading.Thread, ClosingContextManager):
Whether to advertise (and implement, if client also advertises
support for) a "strict kex" mode for safer handshaking. Default:
``True``.
+ :param packetizer_class:
+ Which class to use for instantiating the internal packet handler.
+ Default: ``None`` (i.e.: use `Packetizer` as normal).
.. versionchanged:: 1.15
Added the ``default_window_size`` and ``default_max_packet_size``
@@ -418,6 +422,8 @@ class Transport(threading.Thread, ClosingContextManager):
Added the ``server_sig_algs`` kwarg.
.. versionchanged:: 3.4
Added the ``strict_kex`` kwarg.
+ .. versionchanged:: 3.4
+ Added the ``packetizer_class`` kwarg.
"""
self.active = False
self.hostname = None
@@ -467,7 +473,7 @@ class Transport(threading.Thread, ClosingContextManager):
self.sock.settimeout(self._active_check_timeout)
# negotiated crypto parameters
- self.packetizer = Packetizer(sock)
+ self.packetizer = (packetizer_class or Packetizer)(sock)
self.local_version = "SSH-" + self._PROTO_ID + "-" + self._CLIENT_ID
self.remote_version = ""
self.local_cipher = self.remote_cipher = ""
@@ -2428,7 +2434,8 @@ class Transport(threading.Thread, ClosingContextManager):
def _get_latest_kex_init(self):
return self._really_parse_kex_init(
- Message(self._latest_kex_init), ignore_first_byte=True
+ Message(self._latest_kex_init),
+ ignore_first_byte=True,
)
def _parse_kex_init(self, m):
@@ -2490,6 +2497,13 @@ class Transport(threading.Thread, ClosingContextManager):
for i in to_pop:
kex_algo_list.pop(i)
+ # CVE mitigation: expect zeroed-out seqno anytime we are performing kex
+ # init phase, if strict mode was negotiated.
+ if self.agreed_on_strict_kex and m.seqno != 0:
+ raise MessageOrderError(
+ f"Got nonzero seqno ({m.seqno}) during strict KEXINIT!"
+ )
+
# as a server, we pick the first item in the client's list that we
# support.
# as a client, we pick the first item in our list that the server
diff --git a/sites/www/changelog.rst b/sites/www/changelog.rst
index 44d3d445..4915eaca 100644
--- a/sites/www/changelog.rst
+++ b/sites/www/changelog.rst
@@ -2,6 +2,9 @@
Changelog
=========
+- :feature:`-` `Transport` grew a new ``packetizer_class`` kwarg for overriding
+ the packet-handler class used internally. Mostly for testing, but advanced
+ users may find this useful when doing deep hacks.
- :bug:`-` Address `CVE 2023-48795<https://terrapin-attack.com/>`_ (aka the
"Terrapin Attack", a vulnerability found in the SSH protocol re: treatment of
packet sequence numbers) as follows:
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 060a6cae..6cd9398a 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -1055,6 +1055,16 @@ class TransportTest(unittest.TestCase):
# Real fix's behavior
self._expect_unimplemented()
+ def test_can_override_packetizer_used(self):
+ class MyPacketizer(Packetizer):
+ pass
+
+ # control case
+ assert Transport(sock=LoopSocket()).packetizer.__class__ is Packetizer
+ # overridden case
+ tweaked = Transport(sock=LoopSocket(), packetizer_class=MyPacketizer)
+ assert tweaked.packetizer.__class__ is MyPacketizer
+
# TODO: for now this is purely a regression test. It needs actual tests of the
# intentional new behavior too!
@@ -1243,6 +1253,20 @@ class TestExtInfo(unittest.TestCase):
assert tc._agreed_pubkey_algorithm == "rsa-sha2-256"
+class BadSeqPacketizer(Packetizer):
+ def read_message(self):
+ cmd, msg = super().read_message()
+ # Only mess w/ seqno if kexinit.
+ if cmd is MSG_KEXINIT:
+ # NOTE: this is /only/ the copy of the seqno which gets
+ # transmitted up from Packetizer; it's not modifying
+ # Packetizer's own internal seqno. For these tests,
+ # modifying the latter isn't required, and is also harder
+ # to do w/o triggering MAC mismatches.
+ msg.seqno = 17 # arbitrary nonzero int
+ return cmd, msg
+
+
class TestStrictKex:
def test_kex_algos_includes_kex_strict_c(self):
with server() as (tc, _):
@@ -1277,9 +1301,6 @@ class TestStrictKex:
)
)
- def test_sequence_numbers_reset_on_newkeys(self):
- skip()
-
def test_MessageOrderError_raised_on_out_of_order_messages(self):
with raises(MessageOrderError):
with server() as (tc, _):
@@ -1288,12 +1309,41 @@ class TestStrictKex:
tc._expect_packet(MSG_KEXINIT)
tc.open_session()
- def test_SSHException_raised_on_out_of_order_messages_when_not_strict(self):
+ def test_SSHException_raised_on_out_of_order_messages_when_not_strict(
+ self,
+ ):
# This is kind of dumb (either situation is still fatal!) but whatever,
# may as well be strict with our new strict flag...
with raises(SSHException) as info: # would be true either way, but
- with server(client_init=dict(strict_kex=False),
- ) as (tc, _):
+ with server(
+ client_init=dict(strict_kex=False),
+ ) as (tc, _):
tc._expect_packet(MSG_KEXINIT)
tc.open_session()
assert info.type is SSHException # NOT MessageOrderError!
+
+ def test_error_not_raised_when_kexinit_not_seq_0_but_unstrict(self):
+ with server(
+ client_init=dict(
+ # Disable strict kex
+ strict_kex=False,
+ # Give our clientside a packetizer that sets all kexinit
+ # Message objects to have .seqno==17, which would trigger the
+ # new logic if we'd forgotten to wrap it in strict-kex check
+ packetizer_class=BadSeqPacketizer,
+ ),
+ ):
+ pass # kexinit happens at connect...
+
+ def test_MessageOrderError_raised_when_kexinit_not_seq_0_and_strict(self):
+ with raises(MessageOrderError):
+ with server(
+ # Give our clientside a packetizer that sets all kexinit
+ # Message objects to have .seqno==17, which should trigger the
+ # new logic (given we are NOT disabling strict-mode)
+ client_init=dict(packetizer_class=BadSeqPacketizer),
+ ):
+ pass # kexinit happens at connect...
+
+ def test_sequence_numbers_reset_on_newkeys(self):
+ skip()