summaryrefslogtreecommitdiffhomepage
path: root/tests/test_transport.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_transport.py')
-rw-r--r--tests/test_transport.py40
1 files changed, 33 insertions, 7 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 5fcc7865..b2e8b6f6 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -23,9 +23,9 @@ Some unit tests for the ssh2 protocol in Transport.
import sys, time, threading, unittest
import select
from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \
- SSHException, BadAuthenticationType, InteractiveQuery, util
+ SSHException, BadAuthenticationType, InteractiveQuery, util, ChannelException
from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL
-from paramiko import OPEN_SUCCEEDED
+from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
from loop import LoopSocket
@@ -81,6 +81,8 @@ class NullServer (ServerInterface):
return AUTH_FAILED
def check_channel_request(self, kind, chanid):
+ if kind == 'bogus':
+ return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
return OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
@@ -189,7 +191,7 @@ class TransportTest (unittest.TestCase):
self.assertEquals(12, self.tc.packetizer.get_mac_size_in())
self.tc.send_ignore(1024)
- self.assert_(self.tc.renegotiate_keys())
+ self.tc.renegotiate_keys()
self.ts.send_ignore(1024)
def test_5_keepalive(self):
@@ -408,7 +410,31 @@ class TransportTest (unittest.TestCase):
chan.close()
self.assertEquals('', f.readline())
- def test_D_exit_status(self):
+ def test_D_channel_exception(self):
+ """
+ verify that ChannelException is thrown for a bad open-channel request.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ self.tc.ultra_debug = True
+ self.tc.connect(hostkey=public_host_key)
+ self.tc.auth_password(username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ try:
+ chan = self.tc.open_channel('bogus')
+ self.fail('expected exception')
+ except ChannelException, x:
+ self.assert_(x.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED)
+
+ def test_E_exit_status(self):
"""
verify that get_exit_status() works.
"""
@@ -442,7 +468,7 @@ class TransportTest (unittest.TestCase):
self.assertEquals(23, chan.recv_exit_status())
chan.close()
- def test_E_select(self):
+ def test_F_select(self):
"""
verify that select() on a channel works.
"""
@@ -505,7 +531,7 @@ class TransportTest (unittest.TestCase):
chan.close()
- def test_F_renegotiate(self):
+ def test_G_renegotiate(self):
"""
verify that a transport can correctly renegotiate mid-stream.
"""
@@ -541,7 +567,7 @@ class TransportTest (unittest.TestCase):
schan.close()
- def test_G_compression(self):
+ def test_H_compression(self):
"""
verify that zlib compression is basically working.
"""