summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/test_client.py137
-rwxr-xr-xtests/test_file.py10
-rwxr-xr-xtests/test_sftp.py24
-rw-r--r--tests/test_util.py106
4 files changed, 227 insertions, 50 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
index 7e5c80b4..7c094628 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -29,10 +29,22 @@ import warnings
import os
from tests.util import test_path
import paramiko
-from paramiko.common import PY2
+from paramiko.common import PY2, b
+from paramiko.ssh_exception import SSHException
+
+
+FINGERPRINTS = {
+ 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c',
+ 'ssh-rsa': b'\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5',
+ 'ecdsa-sha2-nistp256': b'\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60',
+}
class NullServer (paramiko.ServerInterface):
+ def __init__(self, *args, **kwargs):
+ # Allow tests to enable/disable specific key types
+ self.__allowed_keys = kwargs.pop('allowed_keys', [])
+ super(NullServer, self).__init__(*args, **kwargs)
def get_allowed_auths(self, username):
if username == 'slowdive':
@@ -45,7 +57,14 @@ class NullServer (paramiko.ServerInterface):
return paramiko.AUTH_FAILED
def check_auth_publickey(self, username, key):
- if (key.get_name() == 'ssh-dss') and key.get_fingerprint() == b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c':
+ try:
+ expected = FINGERPRINTS[key.get_name()]
+ except KeyError:
+ return paramiko.AUTH_FAILED
+ if (
+ key.get_name() in self.__allowed_keys and
+ key.get_fingerprint() == expected
+ ):
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
@@ -72,32 +91,44 @@ class SSHClientTest (unittest.TestCase):
if hasattr(self, attr):
getattr(self, attr).close()
- def _run(self):
+ def _run(self, allowed_keys=None):
+ if allowed_keys is None:
+ allowed_keys = FINGERPRINTS.keys()
self.socks, addr = self.sockl.accept()
self.ts = paramiko.Transport(self.socks)
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
self.ts.add_server_key(host_key)
- server = NullServer()
+ server = NullServer(allowed_keys=allowed_keys)
self.ts.start_server(self.event, server)
- def test_1_client(self):
+ def _test_connection(self, **kwargs):
"""
- verify that the SSHClient stuff works too.
+ (Most) kwargs get passed directly into SSHClient.connect().
+
+ The exception is ``allowed_keys`` which is stripped and handed to the
+ ``NullServer`` used for testing.
"""
- threading.Thread(target=self._run).start()
+ run_kwargs = {'allowed_keys': kwargs.pop('allowed_keys', None)}
+ # Server setup
+ threading.Thread(target=self._run, kwargs=run_kwargs).start()
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+ # Client setup
self.tc = paramiko.SSHClient()
self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
- self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
+ # Actual connection
+ self.tc.connect(self.addr, self.port, username='slowdive', **kwargs)
+
+ # Authentication successful?
self.event.wait(1.0)
self.assertTrue(self.event.isSet())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
+ # Command execution functions?
stdin, stdout, stderr = self.tc.exec_command('yes')
schan = self.ts.accept(1.0)
@@ -110,61 +141,71 @@ class SSHClientTest (unittest.TestCase):
self.assertEqual('This is on stderr.\n', stderr.readline())
self.assertEqual('', stderr.readline())
+ # Cleanup
stdin.close()
stdout.close()
stderr.close()
+ def test_1_client(self):
+ """
+ verify that the SSHClient stuff works too.
+ """
+ self._test_connection(password='pygmalion')
+
def test_2_client_dsa(self):
"""
verify that SSHClient works with a DSA key.
"""
- threading.Thread(target=self._run).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
- public_host_key = paramiko.RSAKey(data=host_key.asbytes())
-
- self.tc = paramiko.SSHClient()
- self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
- self.tc.connect(self.addr, self.port, username='slowdive', key_filename=test_path('test_dss.key'))
-
- self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
- self.assertTrue(self.ts.is_active())
- self.assertEqual('slowdive', self.ts.get_username())
- self.assertEqual(True, self.ts.is_authenticated())
+ self._test_connection(key_filename=test_path('test_dss.key'))
- stdin, stdout, stderr = self.tc.exec_command('yes')
- schan = self.ts.accept(1.0)
-
- schan.send('Hello there.\n')
- schan.send_stderr('This is on stderr.\n')
- schan.close()
-
- self.assertEqual('Hello there.\n', stdout.readline())
- self.assertEqual('', stdout.readline())
- self.assertEqual('This is on stderr.\n', stderr.readline())
- self.assertEqual('', stderr.readline())
+ def test_client_rsa(self):
+ """
+ verify that SSHClient works with an RSA key.
+ """
+ self._test_connection(key_filename=test_path('test_rsa.key'))
- stdin.close()
- stdout.close()
- stderr.close()
+ def test_2_5_client_ecdsa(self):
+ """
+ verify that SSHClient works with an ECDSA key.
+ """
+ self._test_connection(key_filename=test_path('test_ecdsa.key'))
def test_3_multiple_key_files(self):
"""
verify that SSHClient accepts and tries multiple key files.
"""
- threading.Thread(target=self._run).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
- public_host_key = paramiko.RSAKey(data=host_key.asbytes())
-
- self.tc = paramiko.SSHClient()
- self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
- self.tc.connect(self.addr, self.port, username='slowdive', key_filename=[test_path('test_rsa.key'), test_path('test_dss.key')])
-
- self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
- self.assertTrue(self.ts.is_active())
- self.assertEqual('slowdive', self.ts.get_username())
- self.assertEqual(True, self.ts.is_authenticated())
+ # This is dumb :(
+ types_ = {
+ 'rsa': 'ssh-rsa',
+ 'dss': 'ssh-dss',
+ 'ecdsa': 'ecdsa-sha2-nistp256',
+ }
+ # Various combos of attempted & valid keys
+ # TODO: try every possible combo using itertools functions
+ for attempt, accept in (
+ (['rsa', 'dss'], ['dss']), # Original test #3
+ (['dss', 'rsa'], ['dss']), # Ordering matters sometimes, sadly
+ (['dss', 'rsa', 'ecdsa'], ['dss']), # Try ECDSA but fail
+ (['rsa', 'ecdsa'], ['ecdsa']), # ECDSA success
+ ):
+ self._test_connection(
+ key_filename=[
+ test_path('test_{0}.key'.format(x)) for x in attempt
+ ],
+ allowed_keys=[types_[x] for x in accept],
+ )
+
+ def test_multiple_key_files_failure(self):
+ """
+ Expect failure when multiple keys in play and none are accepted
+ """
+ # Until #387 is fixed we have to catch a high-up exception since
+ # various platforms trigger different errors here >_<
+ self.assertRaises(SSHException,
+ self._test_connection,
+ key_filename=[test_path('test_rsa.key')],
+ allowed_keys=['ecdsa-sha2-nistp256'],
+ )
def test_4_auto_add_policy(self):
"""
diff --git a/tests/test_file.py b/tests/test_file.py
index c6edd7af..22a34aca 100755
--- a/tests/test_file.py
+++ b/tests/test_file.py
@@ -23,6 +23,7 @@ Some unit tests for the BufferedFile abstraction.
import unittest
from paramiko.file import BufferedFile
from paramiko.common import linefeed_byte, crlf, cr_byte
+import sys
class LoopbackFile (BufferedFile):
@@ -151,6 +152,15 @@ class BufferedFileTest (unittest.TestCase):
b'need to close them again.\n')
f.close()
+ def test_8_buffering(self):
+ """
+ verify that buffered objects can be written
+ """
+ if sys.version_info[0] == 2:
+ f = LoopbackFile('r+', 16)
+ f.write(buffer(b'Too small.'))
+ f.close()
+
if __name__ == '__main__':
from unittest import main
main()
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index 2b6aa3b6..1ae9781d 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -279,8 +279,8 @@ class SFTPTest (unittest.TestCase):
def test_7_listdir(self):
"""
- verify that a folder can be created, a bunch of files can be placed in it,
- and those files show up in sftp.listdir.
+ verify that a folder can be created, a bunch of files can be placed in
+ it, and those files show up in sftp.listdir.
"""
try:
sftp.open(FOLDER + '/duck.txt', 'w').close()
@@ -298,6 +298,26 @@ class SFTPTest (unittest.TestCase):
sftp.remove(FOLDER + '/fish.txt')
sftp.remove(FOLDER + '/tertiary.py')
+ def test_7_5_listdir_iter(self):
+ """
+ listdir_iter version of above test
+ """
+ try:
+ sftp.open(FOLDER + '/duck.txt', 'w').close()
+ sftp.open(FOLDER + '/fish.txt', 'w').close()
+ sftp.open(FOLDER + '/tertiary.py', 'w').close()
+
+ x = [x.filename for x in sftp.listdir_iter(FOLDER)]
+ self.assertEqual(len(x), 3)
+ self.assertTrue('duck.txt' in x)
+ self.assertTrue('fish.txt' in x)
+ self.assertTrue('tertiary.py' in x)
+ self.assertTrue('random' not in x)
+ finally:
+ sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(FOLDER + '/fish.txt')
+ sftp.remove(FOLDER + '/tertiary.py')
+
def test_8_setstat(self):
"""
verify that the setstat functions (chown, chmod, utime, truncate) work.
diff --git a/tests/test_util.py b/tests/test_util.py
index 28c162e5..ff2eb10c 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -338,3 +338,109 @@ IdentityFile something_%l_using_fqdn
"""
config = paramiko.util.parse_ssh_config(StringIO(test_config))
assert config.lookup('meh') # will die during lookup() if bug regresses
+
+ def test_13_config_dos_crlf_succeeds(self):
+ config_file = StringIO("host abcqwerty\r\nHostName 127.0.0.1\r\n")
+ config = paramiko.SSHConfig()
+ config.parse(config_file)
+ self.assertEqual(config.lookup("abcqwerty")["hostname"], "127.0.0.1")
+
+ def test_quoted_host_names(self):
+ test_config_file = """\
+Host "param pam" param "pam"
+ Port 1111
+
+Host "param2"
+ Port 2222
+
+Host param3 parara
+ Port 3333
+
+Host param4 "p a r" "p" "par" para
+ Port 4444
+"""
+ res = {
+ 'param pam': {'hostname': 'param pam', 'port': '1111'},
+ 'param': {'hostname': 'param', 'port': '1111'},
+ 'pam': {'hostname': 'pam', 'port': '1111'},
+
+ 'param2': {'hostname': 'param2', 'port': '2222'},
+
+ 'param3': {'hostname': 'param3', 'port': '3333'},
+ 'parara': {'hostname': 'parara', 'port': '3333'},
+
+ 'param4': {'hostname': 'param4', 'port': '4444'},
+ 'p a r': {'hostname': 'p a r', 'port': '4444'},
+ 'p': {'hostname': 'p', 'port': '4444'},
+ 'par': {'hostname': 'par', 'port': '4444'},
+ 'para': {'hostname': 'para', 'port': '4444'},
+ }
+ f = StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ for host, values in res.items():
+ self.assertEquals(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ values
+ )
+
+ def test_quoted_params_in_config(self):
+ test_config_file = """\
+Host "param pam" param "pam"
+ IdentityFile id_rsa
+
+Host "param2"
+ IdentityFile "test rsa key"
+
+Host param3 parara
+ IdentityFile id_rsa
+ IdentityFile "test rsa key"
+"""
+ res = {
+ 'param pam': {'hostname': 'param pam', 'identityfile': ['id_rsa']},
+ 'param': {'hostname': 'param', 'identityfile': ['id_rsa']},
+ 'pam': {'hostname': 'pam', 'identityfile': ['id_rsa']},
+
+ 'param2': {'hostname': 'param2', 'identityfile': ['test rsa key']},
+
+ 'param3': {'hostname': 'param3', 'identityfile': ['id_rsa', 'test rsa key']},
+ 'parara': {'hostname': 'parara', 'identityfile': ['id_rsa', 'test rsa key']},
+ }
+ f = StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ for host, values in res.items():
+ self.assertEquals(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ values
+ )
+
+ def test_quoted_host_in_config(self):
+ conf = SSHConfig()
+ correct_data = {
+ 'param': ['param'],
+ '"param"': ['param'],
+
+ 'param pam': ['param', 'pam'],
+ '"param" "pam"': ['param', 'pam'],
+ '"param" pam': ['param', 'pam'],
+ 'param "pam"': ['param', 'pam'],
+
+ 'param "pam" p': ['param', 'pam', 'p'],
+ '"param" pam "p"': ['param', 'pam', 'p'],
+
+ '"pa ram"': ['pa ram'],
+ '"pa ram" pam': ['pa ram', 'pam'],
+ 'param "p a m"': ['param', 'p a m'],
+ }
+ incorrect_data = [
+ 'param"',
+ '"param',
+ 'param "pam',
+ 'param "pam" "p a',
+ ]
+ for host, values in correct_data.items():
+ self.assertEquals(
+ conf._get_hosts(host),
+ values
+ )
+ for host in incorrect_data:
+ self.assertRaises(Exception, conf._get_hosts, host)