diff options
Diffstat (limited to 'tests/test_transport.py')
-rw-r--r-- | tests/test_transport.py | 46 |
1 files changed, 37 insertions, 9 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py index bd11487f..5afc2e12 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -22,7 +22,7 @@ Some unit tests for the ssh2 protocol in Transport. """ -import sys, unittest, threading +import sys, time, threading, unittest from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \ SSHException, BadAuthenticationType from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL @@ -77,6 +77,10 @@ class NullServer (ServerInterface): def check_channel_shell_request(self, channel): return True + + def check_global_request(self, kind, msg): + self._global_request = kind + return False class TransportTest (unittest.TestCase): @@ -160,14 +164,38 @@ class TransportTest (unittest.TestCase): self.assert_(self.ts.is_active()) self.assertEquals('aes256-cbc', self.tc.local_cipher) self.assertEquals('aes256-cbc', self.tc.remote_cipher) - self.assertEquals(12, self.tc.local_mac_len) - self.assertEquals(12, self.tc.remote_mac_len) + self.assertEquals(12, self.tc.packetizer.get_mac_size_out()) + self.assertEquals(12, self.tc.packetizer.get_mac_size_in()) self.tc.send_ignore(1024) self.assert_(self.tc.renegotiate_keys()) self.ts.send_ignore(1024) - def test_4_bad_auth_type(self): + def test_4_keepalive(self): + """ + verify that the keepalive will be sent. + """ + self.tc.set_hexdump(True) + + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + self.tc.connect(hostkey=public_host_key, + username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + self.assertEquals(None, getattr(server, '_global_request', None)) + self.tc.set_keepalive(1) + time.sleep(2) + self.assertEquals('keepalive@lag.net', server._global_request) + + def test_5_bad_auth_type(self): """ verify that we get the right exception when an unsupported auth type is requested. @@ -188,7 +216,7 @@ class TransportTest (unittest.TestCase): self.assertEquals(BadAuthenticationType, etype) self.assertEquals(['publickey'], evalue.allowed_types) - def test_5_bad_password(self): + def test_6_bad_password(self): """ verify that a bad password gets the right exception, and that a retry with the right password works. @@ -213,7 +241,7 @@ class TransportTest (unittest.TestCase): self.assert_(event.isSet()) self.assert_(self.ts.is_active()) - def test_6_multipart_auth(self): + def test_7_multipart_auth(self): """ verify that multipart auth works. """ @@ -235,7 +263,7 @@ class TransportTest (unittest.TestCase): self.assert_(event.isSet()) self.assert_(self.ts.is_active()) - def test_7_exec_command(self): + def test_8_exec_command(self): """ verify that exec_command() does something reasonable. """ @@ -285,7 +313,7 @@ class TransportTest (unittest.TestCase): self.assertEquals('This is on stderr.\n', f.readline()) self.assertEquals('', f.readline()) - def test_8_invoke_shell(self): + def test_9_invoke_shell(self): """ verify that invoke_shell() does something reasonable. """ @@ -312,7 +340,7 @@ class TransportTest (unittest.TestCase): chan.close() self.assertEquals('', f.readline()) - def test_9_exit_status(self): + def test_A_exit_status(self): """ verify that get_exit_status() works. """ |