summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2023-12-15 23:59:12 -0500
committerJeff Forcier <jeff@bitprophet.org>2023-12-15 23:59:12 -0500
commit773a174fb1e40e1d18dbe2625e16337ea401119e (patch)
tree41183afaa445ad1da4c24731826b2fd6a03b7b50
parent4c7f0410c533cdf0df2890512237961f934f5ab9 (diff)
Basic strict-kex-mode agreement mechanics work
-rw-r--r--paramiko/transport.py38
-rw-r--r--tests/test_transport.py43
2 files changed, 78 insertions, 3 deletions
diff --git a/paramiko/transport.py b/paramiko/transport.py
index 14a26333..af2ed744 100644
--- a/paramiko/transport.py
+++ b/paramiko/transport.py
@@ -334,6 +334,7 @@ class Transport(threading.Thread, ClosingContextManager):
gss_deleg_creds=True,
disabled_algorithms=None,
server_sig_algs=True,
+ strict_kex=True,
):
"""
Create a new SSH session over an existing socket, or socket-like
@@ -400,6 +401,10 @@ class Transport(threading.Thread, ClosingContextManager):
Whether to send an extra message to compatible clients, in server
mode, with a list of supported pubkey algorithms. Default:
``True``.
+ :param bool strict_kex:
+ Whether to advertise (and implement, if client also advertises
+ support for) a "strict kex" mode for safer handshaking. Default:
+ ``True``.
.. versionchanged:: 1.15
Added the ``default_window_size`` and ``default_max_packet_size``
@@ -410,10 +415,14 @@ class Transport(threading.Thread, ClosingContextManager):
Added the ``disabled_algorithms`` kwarg.
.. versionchanged:: 2.9
Added the ``server_sig_algs`` kwarg.
+ .. versionchanged:: 3.4
+ Added the ``strict_kex`` kwarg.
"""
self.active = False
self.hostname = None
self.server_extensions = {}
+ self.advertise_strict_kex = strict_kex
+ self.agreed_on_strict_kex = False
# TODO: these two overrides on sock's type should go away sometime, too
# many ways to do it!
@@ -2363,12 +2372,18 @@ class Transport(threading.Thread, ClosingContextManager):
)
else:
available_server_keys = self.preferred_keys
- # Signal support for MSG_EXT_INFO.
+ # Signal support for MSG_EXT_INFO so server will send it to us.
# NOTE: doing this here handily means we don't even consider this
# value when agreeing on real kex algo to use (which is a common
# pitfall when adding this apparently).
kex_algos.append("ext-info-c")
+ # Similar to ext-info, but used in both server modes, so done outside
+ # of above if/else.
+ if self.advertise_strict_kex:
+ which = "s" if self.server_mode else "c"
+ kex_algos.append(f"kex-strict-{which}-v00@openssh.com")
+
m = Message()
m.add_byte(cMSG_KEXINIT)
m.add_bytes(os.urandom(16))
@@ -2448,11 +2463,28 @@ class Transport(threading.Thread, ClosingContextManager):
self._log(DEBUG, "kex follows: {}".format(kex_follows))
self._log(DEBUG, "=== Key exchange agreements ===")
- # Strip out ext-info "kex algo"
+ # Record, and strip out, ext-info and/or strict-kex non-algorithms
self._remote_ext_info = None
+ self._remote_strict_kex = None
+ to_pop = []
for i, algo in enumerate(kex_algo_list):
if algo.startswith("ext-info-"):
- self._remote_ext_info = kex_algo_list.pop(i)
+ self._remote_ext_info = algo
+ to_pop.insert(0, i)
+ elif algo.startswith("kex-strict-"):
+ # NOTE: this is what we are expecting from the /remote/ end.
+ which = "c" if self.server_mode else "s"
+ expected = f"kex-strict-{which}-v00@openssh.com"
+ # Set strict mode if agreed.
+ self.agreed_on_strict_kex = (
+ algo == expected and self.advertise_strict_kex
+ )
+ self._log(
+ DEBUG, f"Strict kex mode: {self.agreed_on_strict_kex}"
+ )
+ to_pop.insert(0, i)
+ for i in to_pop:
+ kex_algo_list.pop(i)
# as a server, we pick the first item in the client's list that we
# support.
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 421c078b..7e238023 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -22,6 +22,7 @@ Some unit tests for the ssh2 protocol in Transport.
from binascii import hexlify
+import itertools
import select
import socket
import time
@@ -68,6 +69,7 @@ from ._util import (
TestServer as NullServer,
)
from ._loop import LoopSocket
+from pytest import skip, mark
LONG_BANNER = """\
@@ -1238,3 +1240,44 @@ class TestExtInfo(unittest.TestCase):
# Client settled on 256 despite itself not having 512 disabled (and
# otherwise, 512 would have been earlier in the preferred list)
assert tc._agreed_pubkey_algorithm == "rsa-sha2-256"
+
+
+class TestStrictKex:
+ def test_kex_algos_includes_kex_strict_c(self):
+ with server() as (tc, _):
+ kex = tc._get_latest_kex_init()
+ assert "kex-strict-c-v00@openssh.com" in kex["kex_algo_list"]
+
+ @mark.parametrize(
+ "server_active,client_active",
+ itertools.product([True, False], repeat=2),
+ )
+ def test_mode_agreement(self, server_active, client_active):
+ with server(
+ server_init=dict(strict_kex=server_active),
+ client_init=dict(strict_kex=client_active),
+ ) as (tc, ts):
+ if server_active and client_active:
+ assert tc.agreed_on_strict_kex is True
+ assert ts.agreed_on_strict_kex is True
+ else:
+ assert tc.agreed_on_strict_kex is False
+ assert ts.agreed_on_strict_kex is False
+
+ def test_mode_advertised_by_default(self):
+ # NOTE: no explicit strict_kex overrides...
+ with server() as (tc, ts):
+ assert all(
+ (
+ tc.advertise_strict_kex,
+ tc.agreed_on_strict_kex,
+ ts.advertise_strict_kex,
+ ts.agreed_on_strict_kex,
+ )
+ )
+
+ def test_sequence_numbers_reset_on_newkeys(self):
+ skip()
+
+ def test_error_raised_on_out_of_order_handshakes(self):
+ skip()