diff options
-rw-r--r-- | tests/test_client.py | 95 |
1 files changed, 21 insertions, 74 deletions
diff --git a/tests/test_client.py b/tests/test_client.py index 3576ef32..0697d289 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -80,24 +80,30 @@ class SSHClientTest (unittest.TestCase): server = NullServer() self.ts.start_server(self.event, server) - def test_1_client(self): + def _test_connection(self, **kwargs): """ - verify that the SSHClient stuff works too. + kwargs get passed directly into SSHClient.connect(). """ + # Server setup threading.Thread(target=self._run).start() host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) + # Client setup self.tc = paramiko.SSHClient() self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key) - self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') + # Actual connection + self.tc.connect(self.addr, self.port, username='slowdive', **kwargs) + + # Authentication successful? self.event.wait(1.0) self.assertTrue(self.event.isSet()) self.assertTrue(self.ts.is_active()) self.assertEqual('slowdive', self.ts.get_username()) self.assertEqual(True, self.ts.is_authenticated()) + # Command execution functions? stdin, stdout, stderr = self.tc.exec_command('yes') schan = self.ts.accept(1.0) @@ -110,95 +116,36 @@ class SSHClientTest (unittest.TestCase): self.assertEqual('This is on stderr.\n', stderr.readline()) self.assertEqual('', stderr.readline()) + # Cleanup stdin.close() stdout.close() stderr.close() + def test_1_client(self): + """ + verify that the SSHClient stuff works too. + """ + self._test_connection(password='pygmalion') + def test_2_client_dsa(self): """ verify that SSHClient works with a DSA key. """ - threading.Thread(target=self._run).start() - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) - public_host_key = paramiko.RSAKey(data=host_key.asbytes()) - - self.tc = paramiko.SSHClient() - self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key) - self.tc.connect(self.addr, self.port, username='slowdive', key_filename=test_path('test_dss.key')) - - self.event.wait(1.0) - self.assertTrue(self.event.isSet()) - self.assertTrue(self.ts.is_active()) - self.assertEqual('slowdive', self.ts.get_username()) - self.assertEqual(True, self.ts.is_authenticated()) - - stdin, stdout, stderr = self.tc.exec_command('yes') - schan = self.ts.accept(1.0) - - schan.send('Hello there.\n') - schan.send_stderr('This is on stderr.\n') - schan.close() - - self.assertEqual('Hello there.\n', stdout.readline()) - self.assertEqual('', stdout.readline()) - self.assertEqual('This is on stderr.\n', stderr.readline()) - self.assertEqual('', stderr.readline()) - - stdin.close() - stdout.close() - stderr.close() + self._test_connection(key_filename=test_path('test_dss.key')) def test_2_5_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ - threading.Thread(target=self._run).start() - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) - public_host_key = paramiko.RSAKey(data=host_key.asbytes()) - - self.tc = paramiko.SSHClient() - self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key) - self.tc.connect(self.addr, self.port, username='slowdive', key_filename=test_path('test_ecdsa.key')) - - self.event.wait(1.0) - self.assertTrue(self.event.isSet()) - self.assertTrue(self.ts.is_active()) - self.assertEqual('slowdive', self.ts.get_username()) - self.assertEqual(True, self.ts.is_authenticated()) - - stdin, stdout, stderr = self.tc.exec_command('yes') - schan = self.ts.accept(1.0) - - schan.send('Hello there.\n') - schan.send_stderr('This is on stderr.\n') - schan.close() - - self.assertEqual('Hello there.\n', stdout.readline()) - self.assertEqual('', stdout.readline()) - self.assertEqual('This is on stderr.\n', stderr.readline()) - self.assertEqual('', stderr.readline()) - - stdin.close() - stdout.close() - stderr.close() + self._test_connection(key_filename=test_path('test_ecdsa.key')) def test_3_multiple_key_files(self): """ verify that SSHClient accepts and tries multiple key files. """ - threading.Thread(target=self._run).start() - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) - public_host_key = paramiko.RSAKey(data=host_key.asbytes()) - - self.tc = paramiko.SSHClient() - self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key) - self.tc.connect(self.addr, self.port, username='slowdive', key_filename=[test_path('test_rsa.key'), test_path('test_dss.key')]) - - self.event.wait(1.0) - self.assertTrue(self.event.isSet()) - self.assertTrue(self.ts.is_active()) - self.assertEqual('slowdive', self.ts.get_username()) - self.assertEqual(True, self.ts.is_authenticated()) + self._test_connection(key_filename=[ + test_path('test_rsa.key'), test_path('test_dss.key') + ]) def test_4_auto_add_policy(self): """ |