summaryrefslogtreecommitdiffhomepage
path: root/tests/test_transport.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_transport.py')
-rw-r--r--tests/test_transport.py46
1 files changed, 37 insertions, 9 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py
index bd11487f..5afc2e12 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -22,7 +22,7 @@
Some unit tests for the ssh2 protocol in Transport.
"""
-import sys, unittest, threading
+import sys, time, threading, unittest
from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \
SSHException, BadAuthenticationType
from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL
@@ -77,6 +77,10 @@ class NullServer (ServerInterface):
def check_channel_shell_request(self, channel):
return True
+
+ def check_global_request(self, kind, msg):
+ self._global_request = kind
+ return False
class TransportTest (unittest.TestCase):
@@ -160,14 +164,38 @@ class TransportTest (unittest.TestCase):
self.assert_(self.ts.is_active())
self.assertEquals('aes256-cbc', self.tc.local_cipher)
self.assertEquals('aes256-cbc', self.tc.remote_cipher)
- self.assertEquals(12, self.tc.local_mac_len)
- self.assertEquals(12, self.tc.remote_mac_len)
+ self.assertEquals(12, self.tc.packetizer.get_mac_size_out())
+ self.assertEquals(12, self.tc.packetizer.get_mac_size_in())
self.tc.send_ignore(1024)
self.assert_(self.tc.renegotiate_keys())
self.ts.send_ignore(1024)
- def test_4_bad_auth_type(self):
+ def test_4_keepalive(self):
+ """
+ verify that the keepalive will be sent.
+ """
+ self.tc.set_hexdump(True)
+
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ self.tc.connect(hostkey=public_host_key,
+ username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ self.assertEquals(None, getattr(server, '_global_request', None))
+ self.tc.set_keepalive(1)
+ time.sleep(2)
+ self.assertEquals('keepalive@lag.net', server._global_request)
+
+ def test_5_bad_auth_type(self):
"""
verify that we get the right exception when an unsupported auth
type is requested.
@@ -188,7 +216,7 @@ class TransportTest (unittest.TestCase):
self.assertEquals(BadAuthenticationType, etype)
self.assertEquals(['publickey'], evalue.allowed_types)
- def test_5_bad_password(self):
+ def test_6_bad_password(self):
"""
verify that a bad password gets the right exception, and that a retry
with the right password works.
@@ -213,7 +241,7 @@ class TransportTest (unittest.TestCase):
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
- def test_6_multipart_auth(self):
+ def test_7_multipart_auth(self):
"""
verify that multipart auth works.
"""
@@ -235,7 +263,7 @@ class TransportTest (unittest.TestCase):
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
- def test_7_exec_command(self):
+ def test_8_exec_command(self):
"""
verify that exec_command() does something reasonable.
"""
@@ -285,7 +313,7 @@ class TransportTest (unittest.TestCase):
self.assertEquals('This is on stderr.\n', f.readline())
self.assertEquals('', f.readline())
- def test_8_invoke_shell(self):
+ def test_9_invoke_shell(self):
"""
verify that invoke_shell() does something reasonable.
"""
@@ -312,7 +340,7 @@ class TransportTest (unittest.TestCase):
chan.close()
self.assertEquals('', f.readline())
- def test_9_exit_status(self):
+ def test_A_exit_status(self):
"""
verify that get_exit_status() works.
"""