diff options
-rw-r--r-- | tests/test_transport.py | 141 |
1 files changed, 136 insertions, 5 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py index b55160a7..7147b43b 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -25,21 +25,61 @@ Some unit tests for the ssh2 protocol in Transport. import sys, unittest, threading from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \ SSHException, BadAuthenticationType -from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL +from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL +from paramiko import OPEN_SUCCEEDED from loop import LoopSocket class NullServer (ServerInterface): + paranoid_did_password = False + paranoid_did_public_key = False + paranoid_key = DSSKey.from_private_key_file('tests/test_dss.key') + def get_allowed_auths(self, username): if username == 'slowdive': return 'publickey,password' + if username == 'paranoid': + if not self.paranoid_did_password and not self.paranoid_did_public_key: + return 'publickey,password' + elif self.paranoid_did_password: + return 'publickey' + else: + return 'password' return 'publickey' def check_auth_password(self, username, password): if (username == 'slowdive') and (password == 'pygmalion'): return AUTH_SUCCESSFUL + if (username == 'paranoid') and (password == 'paranoid'): + # 2-part auth (even openssh doesn't support this) + self.paranoid_did_password = True + if self.paranoid_did_public_key: + return AUTH_SUCCESSFUL + return AUTH_PARTIALLY_SUCCESSFUL return AUTH_FAILED + def check_auth_publickey(self, username, key): + if (username == 'paranoid') and (key == self.paranoid_key): + # 2-part auth + self.paranoid_did_public_key = True + if self.paranoid_did_password: + return AUTH_SUCCESSFUL + return AUTH_PARTIALLY_SUCCESSFUL + return AUTH_FAILED + + def check_channel_request(self, kind, chanid): + return OPEN_SUCCEEDED + + def check_channel_exec_request(self, channel, command): + if command != 'yes': + return False + self.exec_channel = channel + return True + + def check_channel_shell_request(self, channel): + self.shell_channel = channel + return True + class TransportTest (unittest.TestCase): @@ -86,7 +126,6 @@ class TransportTest (unittest.TestCase): server = NullServer() self.assert_(not event.isSet()) self.ts.start_server(event, server) - self.tc.ultra_debug = True self.tc.connect(hostkey=public_host_key, username='slowdive', password='pygmalion') event.wait(1.0) @@ -105,7 +144,6 @@ class TransportTest (unittest.TestCase): server = NullServer() self.assert_(not event.isSet()) self.ts.start_server(event, server) - self.tc.ultra_debug = True try: self.tc.connect(hostkey=public_host_key, username='unknown', password='error') @@ -139,5 +177,98 @@ class TransportTest (unittest.TestCase): event.wait(1.0) self.assert_(event.isSet()) self.assert_(self.ts.is_active()) - - + + def test_5_multipart_auth(self): + """ + verify that multipart auth works. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + self.tc.ultra_debug = True + self.tc.connect(hostkey=public_host_key) + remain = self.tc.auth_password(username='paranoid', password='paranoid') + self.assertEquals(['publickey'], remain) + key = DSSKey.from_private_key_file('tests/test_dss.key') + remain = self.tc.auth_publickey(username='paranoid', key=key) + self.assertEquals([], remain) + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + def test_6_exec_command(self): + """ + verify that exec_command() does something reasonable. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + self.tc.ultra_debug = True + self.tc.connect(hostkey=public_host_key) + self.tc.auth_password(username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + chan = self.tc.open_session() + self.assert_(not chan.exec_command('no')) + + chan = self.tc.open_session() + self.assert_(chan.exec_command('yes')) + server.exec_channel.send('Hello there.\n') + server.exec_channel.send_stderr('This is on stderr.\n') + server.exec_channel.close() + + f = chan.makefile() + self.assertEquals('Hello there.\n', f.readline()) + self.assertEquals('', f.readline()) + f = chan.makefile_stderr() + self.assertEquals('This is on stderr.\n', f.readline()) + self.assertEquals('', f.readline()) + + # now try it with combined stdout/stderr + chan = self.tc.open_session() + chan.exec_command('yes') + server.exec_channel.send('Hello there.\n') + server.exec_channel.send_stderr('This is on stderr.\n') + server.exec_channel.close() + + chan.set_combine_stderr(True) + f = chan.makefile() + self.assertEquals('Hello there.\n', f.readline()) + self.assertEquals('This is on stderr.\n', f.readline()) + self.assertEquals('', f.readline()) + + def test_7_invoke_shell(self): + """ + verify that invoke_shell() does something reasonable. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + self.tc.ultra_debug = True + self.tc.connect(hostkey=public_host_key) + self.tc.auth_password(username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + chan = self.tc.open_session() + self.assert_(chan.invoke_shell()) + chan.send('communist j. cat\n') + f = server.shell_channel.makefile() + self.assertEquals('communist j. cat\n', f.readline()) + chan.close() + self.assertEquals('', f.readline()) |