summaryrefslogtreecommitdiffhomepage
path: root/tests/test_transport.py
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2018-03-12 16:22:51 -0700
committerJeff Forcier <jeff@bitprophet.org>2018-03-12 16:22:51 -0700
commit83c26d51b68120bc08aa63513665cf8b36b77c63 (patch)
tree3351bf21264e3d0203659f218691affc41c35b83 /tests/test_transport.py
parent184ddbc7e08e7918a381cb517d191be45304ae3a (diff)
parent2d3f14c6da96d351fb9728b4558ad8d2916bcd48 (diff)
Merge branch '2.2' into 2.3
Diffstat (limited to 'tests/test_transport.py')
-rw-r--r--tests/test_transport.py55
1 files changed, 50 insertions, 5 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 3e352919..99cbc3e0 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -33,7 +33,7 @@ import unittest
from paramiko import (
Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, SSHException,
- ChannelException, Packetizer,
+ ChannelException, Packetizer, Channel,
)
from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
@@ -91,7 +91,11 @@ class NullServer (ServerInterface):
def check_global_request(self, kind, msg):
self._global_request = kind
- return False
+ # NOTE: for w/e reason, older impl of this returned False always, even
+ # tho that's only supposed to occur if the request cannot be served.
+ # For now, leaving that the default unless test supplies specific
+ # 'acceptable' request kind
+ return kind == 'acceptable'
def check_channel_x11_request(self, channel, single_connection, auth_protocol, auth_cookie, screen_number):
self._x11_single_connection = single_connection
@@ -129,7 +133,9 @@ class TransportTest(unittest.TestCase):
self.socks.close()
self.sockc.close()
- def setup_test_server(self, client_options=None, server_options=None):
+ def setup_test_server(
+ self, client_options=None, server_options=None, connect_kwargs=None,
+ ):
host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
@@ -143,8 +149,13 @@ class TransportTest(unittest.TestCase):
self.server = NullServer()
self.assertTrue(not event.is_set())
self.ts.start_server(event, self.server)
- self.tc.connect(hostkey=public_host_key,
- username='slowdive', password='pygmalion')
+ if connect_kwargs is None:
+ connect_kwargs = dict(
+ hostkey=public_host_key,
+ username='slowdive',
+ password='pygmalion',
+ )
+ self.tc.connect(**connect_kwargs)
event.wait(1.0)
self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
@@ -927,3 +938,37 @@ class TransportTest(unittest.TestCase):
# sendall() accepts a memoryview instance
chan.sendall(memoryview(data))
self.assertEqual(sfile.read(len(data)), data)
+
+ def test_server_rejects_open_channel_without_auth(self):
+ try:
+ self.setup_test_server(connect_kwargs={})
+ self.tc.open_session()
+ except ChannelException as e:
+ assert e.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
+ else:
+ assert False, "Did not raise ChannelException!"
+
+ def test_server_rejects_arbitrary_global_request_without_auth(self):
+ self.setup_test_server(connect_kwargs={})
+ # NOTE: this dummy global request kind would normally pass muster
+ # from the test server.
+ self.tc.global_request('acceptable')
+ # Global requests never raise exceptions, even on failure (not sure why
+ # this was the original design...ugh.) Best we can do to tell failure
+ # happened is that the client transport's global_response was set back
+ # to None; if it had succeeded, it would be the response Message.
+ err = "Unauthed global response incorrectly succeeded!"
+ assert self.tc.global_response is None, err
+
+ def test_server_rejects_port_forward_without_auth(self):
+ # NOTE: at protocol level port forward requests are treated same as a
+ # regular global request, but Paramiko server implements a special-case
+ # method for it, so it gets its own test. (plus, THAT actually raises
+ # an exception on the client side, unlike the general case...)
+ self.setup_test_server(connect_kwargs={})
+ try:
+ self.tc.request_port_forward('localhost', 1234)
+ except SSHException as e:
+ assert "forwarding request denied" in str(e)
+ else:
+ assert False, "Did not raise SSHException!"