summaryrefslogtreecommitdiffhomepage
path: root/tests/test_transport.py
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2023-12-15 23:59:12 -0500
committerJeff Forcier <jeff@bitprophet.org>2023-12-15 23:59:12 -0500
commit773a174fb1e40e1d18dbe2625e16337ea401119e (patch)
tree41183afaa445ad1da4c24731826b2fd6a03b7b50 /tests/test_transport.py
parent4c7f0410c533cdf0df2890512237961f934f5ab9 (diff)
Basic strict-kex-mode agreement mechanics work
Diffstat (limited to 'tests/test_transport.py')
-rw-r--r--tests/test_transport.py43
1 files changed, 43 insertions, 0 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 421c078b..7e238023 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -22,6 +22,7 @@ Some unit tests for the ssh2 protocol in Transport.
from binascii import hexlify
+import itertools
import select
import socket
import time
@@ -68,6 +69,7 @@ from ._util import (
TestServer as NullServer,
)
from ._loop import LoopSocket
+from pytest import skip, mark
LONG_BANNER = """\
@@ -1238,3 +1240,44 @@ class TestExtInfo(unittest.TestCase):
# Client settled on 256 despite itself not having 512 disabled (and
# otherwise, 512 would have been earlier in the preferred list)
assert tc._agreed_pubkey_algorithm == "rsa-sha2-256"
+
+
+class TestStrictKex:
+ def test_kex_algos_includes_kex_strict_c(self):
+ with server() as (tc, _):
+ kex = tc._get_latest_kex_init()
+ assert "kex-strict-c-v00@openssh.com" in kex["kex_algo_list"]
+
+ @mark.parametrize(
+ "server_active,client_active",
+ itertools.product([True, False], repeat=2),
+ )
+ def test_mode_agreement(self, server_active, client_active):
+ with server(
+ server_init=dict(strict_kex=server_active),
+ client_init=dict(strict_kex=client_active),
+ ) as (tc, ts):
+ if server_active and client_active:
+ assert tc.agreed_on_strict_kex is True
+ assert ts.agreed_on_strict_kex is True
+ else:
+ assert tc.agreed_on_strict_kex is False
+ assert ts.agreed_on_strict_kex is False
+
+ def test_mode_advertised_by_default(self):
+ # NOTE: no explicit strict_kex overrides...
+ with server() as (tc, ts):
+ assert all(
+ (
+ tc.advertise_strict_kex,
+ tc.agreed_on_strict_kex,
+ ts.advertise_strict_kex,
+ ts.agreed_on_strict_kex,
+ )
+ )
+
+ def test_sequence_numbers_reset_on_newkeys(self):
+ skip()
+
+ def test_error_raised_on_out_of_order_handshakes(self):
+ skip()