diff options
author | Jeff Forcier <jeff@bitprophet.org> | 2018-03-12 09:17:43 -0700 |
---|---|---|
committer | Jeff Forcier <jeff@bitprophet.org> | 2018-03-12 15:50:29 -0700 |
commit | 5925a20a4f61ed8a05eca28ea15625eb1ac91f1c (patch) | |
tree | f9a58dd8efc0b31ddcddac99c4aca5b80d6b407b /tests/test_transport.py | |
parent | cef12f135c4bbbdb883276f36a0d9f6f0844c389 (diff) |
Initial tests proving CVE-2018-7750 / #XXX
Diffstat (limited to 'tests/test_transport.py')
-rw-r--r-- | tests/test_transport.py | 42 |
1 files changed, 40 insertions, 2 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py index a61cc7a2..99cbc3e0 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -33,7 +33,7 @@ import unittest from paramiko import ( Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, SSHException, - ChannelException, Packetizer, + ChannelException, Packetizer, Channel, ) from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED @@ -91,7 +91,11 @@ class NullServer (ServerInterface): def check_global_request(self, kind, msg): self._global_request = kind - return False + # NOTE: for w/e reason, older impl of this returned False always, even + # tho that's only supposed to occur if the request cannot be served. + # For now, leaving that the default unless test supplies specific + # 'acceptable' request kind + return kind == 'acceptable' def check_channel_x11_request(self, channel, single_connection, auth_protocol, auth_cookie, screen_number): self._x11_single_connection = single_connection @@ -934,3 +938,37 @@ class TransportTest(unittest.TestCase): # sendall() accepts a memoryview instance chan.sendall(memoryview(data)) self.assertEqual(sfile.read(len(data)), data) + + def test_server_rejects_open_channel_without_auth(self): + try: + self.setup_test_server(connect_kwargs={}) + self.tc.open_session() + except ChannelException as e: + assert e.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED + else: + assert False, "Did not raise ChannelException!" + + def test_server_rejects_arbitrary_global_request_without_auth(self): + self.setup_test_server(connect_kwargs={}) + # NOTE: this dummy global request kind would normally pass muster + # from the test server. + self.tc.global_request('acceptable') + # Global requests never raise exceptions, even on failure (not sure why + # this was the original design...ugh.) Best we can do to tell failure + # happened is that the client transport's global_response was set back + # to None; if it had succeeded, it would be the response Message. + err = "Unauthed global response incorrectly succeeded!" + assert self.tc.global_response is None, err + + def test_server_rejects_port_forward_without_auth(self): + # NOTE: at protocol level port forward requests are treated same as a + # regular global request, but Paramiko server implements a special-case + # method for it, so it gets its own test. (plus, THAT actually raises + # an exception on the client side, unlike the general case...) + self.setup_test_server(connect_kwargs={}) + try: + self.tc.request_port_forward('localhost', 1234) + except SSHException as e: + assert "forwarding request denied" in str(e) + else: + assert False, "Did not raise SSHException!" |