summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/test_transport.py141
1 files changed, 136 insertions, 5 deletions
diff --git a/tests/test_transport.py b/tests/test_transport.py
index b55160a7..7147b43b 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -25,21 +25,61 @@ Some unit tests for the ssh2 protocol in Transport.
import sys, unittest, threading
from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \
SSHException, BadAuthenticationType
-from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
+from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL
+from paramiko import OPEN_SUCCEEDED
from loop import LoopSocket
class NullServer (ServerInterface):
+ paranoid_did_password = False
+ paranoid_did_public_key = False
+ paranoid_key = DSSKey.from_private_key_file('tests/test_dss.key')
+
def get_allowed_auths(self, username):
if username == 'slowdive':
return 'publickey,password'
+ if username == 'paranoid':
+ if not self.paranoid_did_password and not self.paranoid_did_public_key:
+ return 'publickey,password'
+ elif self.paranoid_did_password:
+ return 'publickey'
+ else:
+ return 'password'
return 'publickey'
def check_auth_password(self, username, password):
if (username == 'slowdive') and (password == 'pygmalion'):
return AUTH_SUCCESSFUL
+ if (username == 'paranoid') and (password == 'paranoid'):
+ # 2-part auth (even openssh doesn't support this)
+ self.paranoid_did_password = True
+ if self.paranoid_did_public_key:
+ return AUTH_SUCCESSFUL
+ return AUTH_PARTIALLY_SUCCESSFUL
return AUTH_FAILED
+ def check_auth_publickey(self, username, key):
+ if (username == 'paranoid') and (key == self.paranoid_key):
+ # 2-part auth
+ self.paranoid_did_public_key = True
+ if self.paranoid_did_password:
+ return AUTH_SUCCESSFUL
+ return AUTH_PARTIALLY_SUCCESSFUL
+ return AUTH_FAILED
+
+ def check_channel_request(self, kind, chanid):
+ return OPEN_SUCCEEDED
+
+ def check_channel_exec_request(self, channel, command):
+ if command != 'yes':
+ return False
+ self.exec_channel = channel
+ return True
+
+ def check_channel_shell_request(self, channel):
+ self.shell_channel = channel
+ return True
+
class TransportTest (unittest.TestCase):
@@ -86,7 +126,6 @@ class TransportTest (unittest.TestCase):
server = NullServer()
self.assert_(not event.isSet())
self.ts.start_server(event, server)
- self.tc.ultra_debug = True
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
@@ -105,7 +144,6 @@ class TransportTest (unittest.TestCase):
server = NullServer()
self.assert_(not event.isSet())
self.ts.start_server(event, server)
- self.tc.ultra_debug = True
try:
self.tc.connect(hostkey=public_host_key,
username='unknown', password='error')
@@ -139,5 +177,98 @@ class TransportTest (unittest.TestCase):
event.wait(1.0)
self.assert_(event.isSet())
self.assert_(self.ts.is_active())
-
-
+
+ def test_5_multipart_auth(self):
+ """
+ verify that multipart auth works.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ self.tc.ultra_debug = True
+ self.tc.connect(hostkey=public_host_key)
+ remain = self.tc.auth_password(username='paranoid', password='paranoid')
+ self.assertEquals(['publickey'], remain)
+ key = DSSKey.from_private_key_file('tests/test_dss.key')
+ remain = self.tc.auth_publickey(username='paranoid', key=key)
+ self.assertEquals([], remain)
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ def test_6_exec_command(self):
+ """
+ verify that exec_command() does something reasonable.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ self.tc.ultra_debug = True
+ self.tc.connect(hostkey=public_host_key)
+ self.tc.auth_password(username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ chan = self.tc.open_session()
+ self.assert_(not chan.exec_command('no'))
+
+ chan = self.tc.open_session()
+ self.assert_(chan.exec_command('yes'))
+ server.exec_channel.send('Hello there.\n')
+ server.exec_channel.send_stderr('This is on stderr.\n')
+ server.exec_channel.close()
+
+ f = chan.makefile()
+ self.assertEquals('Hello there.\n', f.readline())
+ self.assertEquals('', f.readline())
+ f = chan.makefile_stderr()
+ self.assertEquals('This is on stderr.\n', f.readline())
+ self.assertEquals('', f.readline())
+
+ # now try it with combined stdout/stderr
+ chan = self.tc.open_session()
+ chan.exec_command('yes')
+ server.exec_channel.send('Hello there.\n')
+ server.exec_channel.send_stderr('This is on stderr.\n')
+ server.exec_channel.close()
+
+ chan.set_combine_stderr(True)
+ f = chan.makefile()
+ self.assertEquals('Hello there.\n', f.readline())
+ self.assertEquals('This is on stderr.\n', f.readline())
+ self.assertEquals('', f.readline())
+
+ def test_7_invoke_shell(self):
+ """
+ verify that invoke_shell() does something reasonable.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ self.tc.ultra_debug = True
+ self.tc.connect(hostkey=public_host_key)
+ self.tc.auth_password(username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ chan = self.tc.open_session()
+ self.assert_(chan.invoke_shell())
+ chan.send('communist j. cat\n')
+ f = server.shell_channel.makefile()
+ self.assertEquals('communist j. cat\n', f.readline())
+ chan.close()
+ self.assertEquals('', f.readline())