summaryrefslogtreecommitdiffhomepage
path: root/tests/test_transport.py
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2018-03-12 09:17:43 -0700
committerJeff Forcier <jeff@bitprophet.org>2018-03-12 15:50:29 -0700
commit5925a20a4f61ed8a05eca28ea15625eb1ac91f1c (patch)
treef9a58dd8efc0b31ddcddac99c4aca5b80d6b407b /tests/test_transport.py
parentcef12f135c4bbbdb883276f36a0d9f6f0844c389 (diff)
Initial tests proving CVE-2018-7750 / #XXX
Diffstat (limited to 'tests/test_transport.py')
-rw-r--r--tests/test_transport.py42
1 files changed, 40 insertions, 2 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py
index a61cc7a2..99cbc3e0 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -33,7 +33,7 @@ import unittest
from paramiko import (
Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, SSHException,
- ChannelException, Packetizer,
+ ChannelException, Packetizer, Channel,
)
from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
@@ -91,7 +91,11 @@ class NullServer (ServerInterface):
def check_global_request(self, kind, msg):
self._global_request = kind
- return False
+ # NOTE: for w/e reason, older impl of this returned False always, even
+ # tho that's only supposed to occur if the request cannot be served.
+ # For now, leaving that the default unless test supplies specific
+ # 'acceptable' request kind
+ return kind == 'acceptable'
def check_channel_x11_request(self, channel, single_connection, auth_protocol, auth_cookie, screen_number):
self._x11_single_connection = single_connection
@@ -934,3 +938,37 @@ class TransportTest(unittest.TestCase):
# sendall() accepts a memoryview instance
chan.sendall(memoryview(data))
self.assertEqual(sfile.read(len(data)), data)
+
+ def test_server_rejects_open_channel_without_auth(self):
+ try:
+ self.setup_test_server(connect_kwargs={})
+ self.tc.open_session()
+ except ChannelException as e:
+ assert e.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
+ else:
+ assert False, "Did not raise ChannelException!"
+
+ def test_server_rejects_arbitrary_global_request_without_auth(self):
+ self.setup_test_server(connect_kwargs={})
+ # NOTE: this dummy global request kind would normally pass muster
+ # from the test server.
+ self.tc.global_request('acceptable')
+ # Global requests never raise exceptions, even on failure (not sure why
+ # this was the original design...ugh.) Best we can do to tell failure
+ # happened is that the client transport's global_response was set back
+ # to None; if it had succeeded, it would be the response Message.
+ err = "Unauthed global response incorrectly succeeded!"
+ assert self.tc.global_response is None, err
+
+ def test_server_rejects_port_forward_without_auth(self):
+ # NOTE: at protocol level port forward requests are treated same as a
+ # regular global request, but Paramiko server implements a special-case
+ # method for it, so it gets its own test. (plus, THAT actually raises
+ # an exception on the client side, unlike the general case...)
+ self.setup_test_server(connect_kwargs={})
+ try:
+ self.tc.request_port_forward('localhost', 1234)
+ except SSHException as e:
+ assert "forwarding request denied" in str(e)
+ else:
+ assert False, "Did not raise SSHException!"