summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/test_transport.py69
1 files changed, 54 insertions, 15 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 7147b43b..3e51485d 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -73,11 +73,9 @@ class NullServer (ServerInterface):
def check_channel_exec_request(self, channel, command):
if command != 'yes':
return False
- self.exec_channel = channel
return True
def check_channel_shell_request(self, channel):
- self.shell_channel = channel
return True
@@ -125,14 +123,51 @@ class TransportTest (unittest.TestCase):
event = threading.Event()
server = NullServer()
self.assert_(not event.isSet())
+ self.assertEquals(None, self.tc.get_username())
+ self.assertEquals(None, self.ts.get_username())
+ self.assertEquals(False, self.tc.is_authenticated())
+ self.assertEquals(False, self.ts.is_authenticated())
self.ts.start_server(event, server)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
+ self.assertEquals('slowdive', self.tc.get_username())
+ self.assertEquals('slowdive', self.ts.get_username())
+ self.assertEquals(True, self.tc.is_authenticated())
+ self.assertEquals(True, self.ts.is_authenticated())
- def test_3_bad_auth_type(self):
+ def test_3_special(self):
+ """
+ verify that the client can demand odd handshake settings, and can
+ renegotiate keys in mid-stream.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ options = self.tc.get_security_options()
+ options.ciphers = ('aes256-cbc',)
+ options.digests = ('hmac-md5-96',)
+ self.tc.connect(hostkey=public_host_key,
+ username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+ self.assertEquals('aes256-cbc', self.tc.local_cipher)
+ self.assertEquals('aes256-cbc', self.tc.remote_cipher)
+ self.assertEquals(12, self.tc.local_mac_len)
+ self.assertEquals(12, self.tc.remote_mac_len)
+
+ self.tc.send_ignore(1024)
+ self.assert_(self.tc.renegotiate_keys())
+ self.ts.send_ignore(1024)
+
+ def test_4_bad_auth_type(self):
"""
verify that we get the right exception when an unsupported auth
type is requested.
@@ -153,7 +188,7 @@ class TransportTest (unittest.TestCase):
self.assertEquals(BadAuthenticationType, etype)
self.assertEquals(['publickey'], evalue.allowed_types)
- def test_4_bad_password(self):
+ def test_5_bad_password(self):
"""
verify that a bad password gets the right exception, and that a retry
with the right password works.
@@ -178,7 +213,7 @@ class TransportTest (unittest.TestCase):
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
- def test_5_multipart_auth(self):
+ def test_6_multipart_auth(self):
"""
verify that multipart auth works.
"""
@@ -200,7 +235,7 @@ class TransportTest (unittest.TestCase):
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
- def test_6_exec_command(self):
+ def test_7_exec_command(self):
"""
verify that exec_command() does something reasonable.
"""
@@ -219,13 +254,15 @@ class TransportTest (unittest.TestCase):
self.assert_(self.ts.is_active())
chan = self.tc.open_session()
+ schan = self.ts.accept(1.0)
self.assert_(not chan.exec_command('no'))
chan = self.tc.open_session()
self.assert_(chan.exec_command('yes'))
- server.exec_channel.send('Hello there.\n')
- server.exec_channel.send_stderr('This is on stderr.\n')
- server.exec_channel.close()
+ schan = self.ts.accept(1.0)
+ schan.send('Hello there.\n')
+ schan.send_stderr('This is on stderr.\n')
+ schan.close()
f = chan.makefile()
self.assertEquals('Hello there.\n', f.readline())
@@ -236,10 +273,11 @@ class TransportTest (unittest.TestCase):
# now try it with combined stdout/stderr
chan = self.tc.open_session()
- chan.exec_command('yes')
- server.exec_channel.send('Hello there.\n')
- server.exec_channel.send_stderr('This is on stderr.\n')
- server.exec_channel.close()
+ self.assert_(chan.exec_command('yes'))
+ schan = self.ts.accept(1.0)
+ schan.send('Hello there.\n')
+ schan.send_stderr('This is on stderr.\n')
+ schan.close()
chan.set_combine_stderr(True)
f = chan.makefile()
@@ -247,7 +285,7 @@ class TransportTest (unittest.TestCase):
self.assertEquals('This is on stderr.\n', f.readline())
self.assertEquals('', f.readline())
- def test_7_invoke_shell(self):
+ def test_8_invoke_shell(self):
"""
verify that invoke_shell() does something reasonable.
"""
@@ -267,8 +305,9 @@ class TransportTest (unittest.TestCase):
chan = self.tc.open_session()
self.assert_(chan.invoke_shell())
+ schan = self.ts.accept(1.0)
chan.send('communist j. cat\n')
- f = server.shell_channel.makefile()
+ f = schan.makefile()
self.assertEquals('communist j. cat\n', f.readline())
chan.close()
self.assertEquals('', f.readline())