diff options
author | Jeff Forcier <jeff@bitprophet.org> | 2023-12-16 16:17:58 -0500 |
---|---|---|
committer | Jeff Forcier <jeff@bitprophet.org> | 2023-12-16 16:17:58 -0500 |
commit | 75e311d3c0845a316b6e7b3fae2488d86ad5a270 (patch) | |
tree | 6702433a8f92b31c6b5ad52786ee3d05ba42c8d0 | |
parent | 73f079f5a4bbba7f3048dadbe05b24242206745e (diff) |
Enforce zero seqno on kexinit
-rw-r--r-- | paramiko/transport.py | 18 | ||||
-rw-r--r-- | sites/www/changelog.rst | 3 | ||||
-rw-r--r-- | tests/test_transport.py | 62 |
3 files changed, 75 insertions, 8 deletions
diff --git a/paramiko/transport.py b/paramiko/transport.py index 44361b0c..c819d9a6 100644 --- a/paramiko/transport.py +++ b/paramiko/transport.py @@ -336,6 +336,7 @@ class Transport(threading.Thread, ClosingContextManager): disabled_algorithms=None, server_sig_algs=True, strict_kex=True, + packetizer_class=None, ): """ Create a new SSH session over an existing socket, or socket-like @@ -406,6 +407,9 @@ class Transport(threading.Thread, ClosingContextManager): Whether to advertise (and implement, if client also advertises support for) a "strict kex" mode for safer handshaking. Default: ``True``. + :param packetizer_class: + Which class to use for instantiating the internal packet handler. + Default: ``None`` (i.e.: use `Packetizer` as normal). .. versionchanged:: 1.15 Added the ``default_window_size`` and ``default_max_packet_size`` @@ -418,6 +422,8 @@ class Transport(threading.Thread, ClosingContextManager): Added the ``server_sig_algs`` kwarg. .. versionchanged:: 3.4 Added the ``strict_kex`` kwarg. + .. versionchanged:: 3.4 + Added the ``packetizer_class`` kwarg. """ self.active = False self.hostname = None @@ -467,7 +473,7 @@ class Transport(threading.Thread, ClosingContextManager): self.sock.settimeout(self._active_check_timeout) # negotiated crypto parameters - self.packetizer = Packetizer(sock) + self.packetizer = (packetizer_class or Packetizer)(sock) self.local_version = "SSH-" + self._PROTO_ID + "-" + self._CLIENT_ID self.remote_version = "" self.local_cipher = self.remote_cipher = "" @@ -2428,7 +2434,8 @@ class Transport(threading.Thread, ClosingContextManager): def _get_latest_kex_init(self): return self._really_parse_kex_init( - Message(self._latest_kex_init), ignore_first_byte=True + Message(self._latest_kex_init), + ignore_first_byte=True, ) def _parse_kex_init(self, m): @@ -2490,6 +2497,13 @@ class Transport(threading.Thread, ClosingContextManager): for i in to_pop: kex_algo_list.pop(i) + # CVE mitigation: expect zeroed-out seqno anytime we are performing kex + # init phase, if strict mode was negotiated. + if self.agreed_on_strict_kex and m.seqno != 0: + raise MessageOrderError( + f"Got nonzero seqno ({m.seqno}) during strict KEXINIT!" + ) + # as a server, we pick the first item in the client's list that we # support. # as a client, we pick the first item in our list that the server diff --git a/sites/www/changelog.rst b/sites/www/changelog.rst index 44d3d445..4915eaca 100644 --- a/sites/www/changelog.rst +++ b/sites/www/changelog.rst @@ -2,6 +2,9 @@ Changelog ========= +- :feature:`-` `Transport` grew a new ``packetizer_class`` kwarg for overriding + the packet-handler class used internally. Mostly for testing, but advanced + users may find this useful when doing deep hacks. - :bug:`-` Address `CVE 2023-48795<https://terrapin-attack.com/>`_ (aka the "Terrapin Attack", a vulnerability found in the SSH protocol re: treatment of packet sequence numbers) as follows: diff --git a/tests/test_transport.py b/tests/test_transport.py index 060a6cae..6cd9398a 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -1055,6 +1055,16 @@ class TransportTest(unittest.TestCase): # Real fix's behavior self._expect_unimplemented() + def test_can_override_packetizer_used(self): + class MyPacketizer(Packetizer): + pass + + # control case + assert Transport(sock=LoopSocket()).packetizer.__class__ is Packetizer + # overridden case + tweaked = Transport(sock=LoopSocket(), packetizer_class=MyPacketizer) + assert tweaked.packetizer.__class__ is MyPacketizer + # TODO: for now this is purely a regression test. It needs actual tests of the # intentional new behavior too! @@ -1243,6 +1253,20 @@ class TestExtInfo(unittest.TestCase): assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" +class BadSeqPacketizer(Packetizer): + def read_message(self): + cmd, msg = super().read_message() + # Only mess w/ seqno if kexinit. + if cmd is MSG_KEXINIT: + # NOTE: this is /only/ the copy of the seqno which gets + # transmitted up from Packetizer; it's not modifying + # Packetizer's own internal seqno. For these tests, + # modifying the latter isn't required, and is also harder + # to do w/o triggering MAC mismatches. + msg.seqno = 17 # arbitrary nonzero int + return cmd, msg + + class TestStrictKex: def test_kex_algos_includes_kex_strict_c(self): with server() as (tc, _): @@ -1277,9 +1301,6 @@ class TestStrictKex: ) ) - def test_sequence_numbers_reset_on_newkeys(self): - skip() - def test_MessageOrderError_raised_on_out_of_order_messages(self): with raises(MessageOrderError): with server() as (tc, _): @@ -1288,12 +1309,41 @@ class TestStrictKex: tc._expect_packet(MSG_KEXINIT) tc.open_session() - def test_SSHException_raised_on_out_of_order_messages_when_not_strict(self): + def test_SSHException_raised_on_out_of_order_messages_when_not_strict( + self, + ): # This is kind of dumb (either situation is still fatal!) but whatever, # may as well be strict with our new strict flag... with raises(SSHException) as info: # would be true either way, but - with server(client_init=dict(strict_kex=False), - ) as (tc, _): + with server( + client_init=dict(strict_kex=False), + ) as (tc, _): tc._expect_packet(MSG_KEXINIT) tc.open_session() assert info.type is SSHException # NOT MessageOrderError! + + def test_error_not_raised_when_kexinit_not_seq_0_but_unstrict(self): + with server( + client_init=dict( + # Disable strict kex + strict_kex=False, + # Give our clientside a packetizer that sets all kexinit + # Message objects to have .seqno==17, which would trigger the + # new logic if we'd forgotten to wrap it in strict-kex check + packetizer_class=BadSeqPacketizer, + ), + ): + pass # kexinit happens at connect... + + def test_MessageOrderError_raised_when_kexinit_not_seq_0_and_strict(self): + with raises(MessageOrderError): + with server( + # Give our clientside a packetizer that sets all kexinit + # Message objects to have .seqno==17, which should trigger the + # new logic (given we are NOT disabling strict-mode) + client_init=dict(packetizer_class=BadSeqPacketizer), + ): + pass # kexinit happens at connect... + + def test_sequence_numbers_reset_on_newkeys(self): + skip() |