diff options
Diffstat (limited to 'tests/test_transport.py')
-rw-r--r-- | tests/test_transport.py | 198 |
1 files changed, 19 insertions, 179 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py index d7704af6..ee00830a 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -22,8 +22,6 @@ Some unit tests for the ssh2 protocol in Transport. from binascii import hexlify -from contextlib import contextmanager -import pytest import select import socket import time @@ -35,18 +33,15 @@ from unittest.mock import Mock from paramiko import ( AuthHandler, ChannelException, - DSSKey, Packetizer, RSAKey, SSHException, AuthenticationException, IncompatiblePeer, SecurityOptions, - ServerInterface, Transport, ) -from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL -from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED +from paramiko import OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED from paramiko.common import ( DEFAULT_MAX_PACKET_SIZE, DEFAULT_WINDOW_SIZE, @@ -61,7 +56,18 @@ from paramiko.common import ( ) from paramiko.message import Message -from ._util import needs_builtin, _support, requires_sha1_signing, slow +from ._util import ( + needs_builtin, + _support, + requires_sha1_signing, + slow, + server, + _disable_sha2, + _disable_sha1, + _disable_sha2_pubkey, + _disable_sha1_pubkey, + TestServer as NullServer, +) from ._loop import LoopSocket @@ -78,79 +84,6 @@ Maybe. """ -class NullServer(ServerInterface): - paranoid_did_password = False - paranoid_did_public_key = False - paranoid_key = DSSKey.from_private_key_file(_support("dss.key")) - - def __init__(self, allowed_keys=None): - self.allowed_keys = allowed_keys if allowed_keys is not None else [] - - def get_allowed_auths(self, username): - if username == "slowdive": - return "publickey,password" - return "publickey" - - def check_auth_password(self, username, password): - if (username == "slowdive") and (password == "pygmalion"): - return AUTH_SUCCESSFUL - return AUTH_FAILED - - def check_auth_publickey(self, username, key): - if key in self.allowed_keys: - return AUTH_SUCCESSFUL - return AUTH_FAILED - - def check_channel_request(self, kind, chanid): - if kind == "bogus": - return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED - return OPEN_SUCCEEDED - - def check_channel_exec_request(self, channel, command): - if command != b"yes": - return False - return True - - def check_channel_shell_request(self, channel): - return True - - def check_global_request(self, kind, msg): - self._global_request = kind - # NOTE: for w/e reason, older impl of this returned False always, even - # tho that's only supposed to occur if the request cannot be served. - # For now, leaving that the default unless test supplies specific - # 'acceptable' request kind - return kind == "acceptable" - - def check_channel_x11_request( - self, - channel, - single_connection, - auth_protocol, - auth_cookie, - screen_number, - ): - self._x11_single_connection = single_connection - self._x11_auth_protocol = auth_protocol - self._x11_auth_cookie = auth_cookie - self._x11_screen_number = screen_number - return True - - def check_port_forward_request(self, addr, port): - self._listen = socket.socket() - self._listen.bind(("127.0.0.1", 0)) - self._listen.listen(1) - return self._listen.getsockname()[1] - - def cancel_port_forward_request(self, addr, port): - self._listen.close() - self._listen = None - - def check_channel_direct_tcpip_request(self, chanid, origin, destination): - self._tcpip_dest = destination - return OPEN_SUCCEEDED - - class TransportTest(unittest.TestCase): def setUp(self): self.socks = LoopSocket() @@ -1190,103 +1123,6 @@ class AlgorithmDisablingTests(unittest.TestCase): assert "zlib" not in compressions -@contextmanager -def server( - hostkey=None, - init=None, - server_init=None, - client_init=None, - connect=None, - pubkeys=None, - catch_error=False, - transport_factory=None, -): - """ - SSH server contextmanager for testing. - - :param hostkey: - Host key to use for the server; if None, loads - ``rsa.key``. - :param init: - Default `Transport` constructor kwargs to use for both sides. - :param server_init: - Extends and/or overrides ``init`` for server transport only. - :param client_init: - Extends and/or overrides ``init`` for client transport only. - :param connect: - Kwargs to use for ``connect()`` on the client. - :param pubkeys: - List of public keys for auth. - :param catch_error: - Whether to capture connection errors & yield from contextmanager. - Necessary for connection_time exception testing. - :param transport_factory: - Like the same-named param in SSHClient: which Transport class to use. - """ - if init is None: - init = {} - if server_init is None: - server_init = {} - if client_init is None: - client_init = {} - if connect is None: - connect = dict(username="slowdive", password="pygmalion") - socks = LoopSocket() - sockc = LoopSocket() - sockc.link(socks) - if transport_factory is None: - transport_factory = Transport - tc = transport_factory(sockc, **dict(init, **client_init)) - ts = transport_factory(socks, **dict(init, **server_init)) - - if hostkey is None: - hostkey = RSAKey.from_private_key_file(_support("rsa.key")) - ts.add_server_key(hostkey) - event = threading.Event() - server = NullServer(allowed_keys=pubkeys) - assert not event.is_set() - assert not ts.is_active() - assert tc.get_username() is None - assert ts.get_username() is None - assert not tc.is_authenticated() - assert not ts.is_authenticated() - - err = None - # Trap errors and yield instead of raising right away; otherwise callers - # cannot usefully deal with problems at connect time which stem from errors - # in the server side. - try: - ts.start_server(event, server) - tc.connect(**connect) - - event.wait(1.0) - assert event.is_set() - assert ts.is_active() - assert tc.is_active() - - except Exception as e: - if not catch_error: - raise - err = e - - yield (tc, ts, err) if catch_error else (tc, ts) - - tc.close() - ts.close() - socks.close() - sockc.close() - - -_disable_sha2 = dict( - disabled_algorithms=dict(keys=["rsa-sha2-256", "rsa-sha2-512"]) -) -_disable_sha1 = dict(disabled_algorithms=dict(keys=["ssh-rsa"])) -_disable_sha2_pubkey = dict( - disabled_algorithms=dict(pubkeys=["rsa-sha2-256", "rsa-sha2-512"]) -) -_disable_sha1_pubkey = dict(disabled_algorithms=dict(pubkeys=["ssh-rsa"])) - - class TestSHA2SignatureKeyExchange(unittest.TestCase): # NOTE: these all rely on the default server() hostkey being RSA # NOTE: these rely on both sides being properly implemented re: agreed-upon @@ -1351,7 +1187,10 @@ class TestSHA2SignatureKeyExchange(unittest.TestCase): # the entire preferred-hostkeys structure when given an explicit key as # a client.) hostkey = RSAKey.from_private_key_file(_support("rsa.key")) - with server(hostkey=hostkey, connect=dict(hostkey=hostkey)) as (tc, _): + connect = dict( + hostkey=hostkey, username="slowdive", password="pygmalion" + ) + with server(hostkey=hostkey, connect=connect) as (tc, _): assert tc.host_key_type == "rsa-sha2-512" @@ -1442,7 +1281,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): server_init = dict(_disable_sha2_pubkey, server_sig_algs=False) with server( pubkeys=[privkey], - connect=dict(pkey=privkey), + connect=dict(username="slowdive", pkey=privkey), server_init=server_init, catch_error=True, ) as (tc, ts, err): @@ -1455,6 +1294,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], + # TODO: why is this passing without a username? connect=dict(pkey=privkey), init=dict( disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-256"]) |