summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--tests/test_client.py95
1 files changed, 21 insertions, 74 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
index 3576ef32..0697d289 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -80,24 +80,30 @@ class SSHClientTest (unittest.TestCase):
server = NullServer()
self.ts.start_server(self.event, server)
- def test_1_client(self):
+ def _test_connection(self, **kwargs):
"""
- verify that the SSHClient stuff works too.
+ kwargs get passed directly into SSHClient.connect().
"""
+ # Server setup
threading.Thread(target=self._run).start()
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+ # Client setup
self.tc = paramiko.SSHClient()
self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
- self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
+ # Actual connection
+ self.tc.connect(self.addr, self.port, username='slowdive', **kwargs)
+
+ # Authentication successful?
self.event.wait(1.0)
self.assertTrue(self.event.isSet())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
+ # Command execution functions?
stdin, stdout, stderr = self.tc.exec_command('yes')
schan = self.ts.accept(1.0)
@@ -110,95 +116,36 @@ class SSHClientTest (unittest.TestCase):
self.assertEqual('This is on stderr.\n', stderr.readline())
self.assertEqual('', stderr.readline())
+ # Cleanup
stdin.close()
stdout.close()
stderr.close()
+ def test_1_client(self):
+ """
+ verify that the SSHClient stuff works too.
+ """
+ self._test_connection(password='pygmalion')
+
def test_2_client_dsa(self):
"""
verify that SSHClient works with a DSA key.
"""
- threading.Thread(target=self._run).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
- public_host_key = paramiko.RSAKey(data=host_key.asbytes())
-
- self.tc = paramiko.SSHClient()
- self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
- self.tc.connect(self.addr, self.port, username='slowdive', key_filename=test_path('test_dss.key'))
-
- self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
- self.assertTrue(self.ts.is_active())
- self.assertEqual('slowdive', self.ts.get_username())
- self.assertEqual(True, self.ts.is_authenticated())
-
- stdin, stdout, stderr = self.tc.exec_command('yes')
- schan = self.ts.accept(1.0)
-
- schan.send('Hello there.\n')
- schan.send_stderr('This is on stderr.\n')
- schan.close()
-
- self.assertEqual('Hello there.\n', stdout.readline())
- self.assertEqual('', stdout.readline())
- self.assertEqual('This is on stderr.\n', stderr.readline())
- self.assertEqual('', stderr.readline())
-
- stdin.close()
- stdout.close()
- stderr.close()
+ self._test_connection(key_filename=test_path('test_dss.key'))
def test_2_5_client_ecdsa(self):
"""
verify that SSHClient works with an ECDSA key.
"""
- threading.Thread(target=self._run).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
- public_host_key = paramiko.RSAKey(data=host_key.asbytes())
-
- self.tc = paramiko.SSHClient()
- self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
- self.tc.connect(self.addr, self.port, username='slowdive', key_filename=test_path('test_ecdsa.key'))
-
- self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
- self.assertTrue(self.ts.is_active())
- self.assertEqual('slowdive', self.ts.get_username())
- self.assertEqual(True, self.ts.is_authenticated())
-
- stdin, stdout, stderr = self.tc.exec_command('yes')
- schan = self.ts.accept(1.0)
-
- schan.send('Hello there.\n')
- schan.send_stderr('This is on stderr.\n')
- schan.close()
-
- self.assertEqual('Hello there.\n', stdout.readline())
- self.assertEqual('', stdout.readline())
- self.assertEqual('This is on stderr.\n', stderr.readline())
- self.assertEqual('', stderr.readline())
-
- stdin.close()
- stdout.close()
- stderr.close()
+ self._test_connection(key_filename=test_path('test_ecdsa.key'))
def test_3_multiple_key_files(self):
"""
verify that SSHClient accepts and tries multiple key files.
"""
- threading.Thread(target=self._run).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
- public_host_key = paramiko.RSAKey(data=host_key.asbytes())
-
- self.tc = paramiko.SSHClient()
- self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
- self.tc.connect(self.addr, self.port, username='slowdive', key_filename=[test_path('test_rsa.key'), test_path('test_dss.key')])
-
- self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
- self.assertTrue(self.ts.is_active())
- self.assertEqual('slowdive', self.ts.get_username())
- self.assertEqual(True, self.ts.is_authenticated())
+ self._test_connection(key_filename=[
+ test_path('test_rsa.key'), test_path('test_dss.key')
+ ])
def test_4_auto_add_policy(self):
"""