summaryrefslogtreecommitdiffhomepage
path: root/tests/test_util.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_util.py')
-rw-r--r--tests/test_util.py384
1 files changed, 214 insertions, 170 deletions
diff --git a/tests/test_util.py b/tests/test_util.py
index 7880e156..23b2e86a 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -30,6 +30,7 @@ import paramiko.util
from paramiko.util import lookup_ssh_host_config as host_config, safe_string
from paramiko.py3compat import StringIO, byte_ord, b
+
# Note some lines in this configuration have trailing spaces on purpose
test_config_file = """\
Host *
@@ -66,53 +67,71 @@ from paramiko import *
class UtilTest(unittest.TestCase):
+
def test_import(self):
"""
verify that all the classes can be imported from paramiko.
"""
symbols = list(globals().keys())
- self.assertTrue('Transport' in symbols)
- self.assertTrue('SSHClient' in symbols)
- self.assertTrue('MissingHostKeyPolicy' in symbols)
- self.assertTrue('AutoAddPolicy' in symbols)
- self.assertTrue('RejectPolicy' in symbols)
- self.assertTrue('WarningPolicy' in symbols)
- self.assertTrue('SecurityOptions' in symbols)
- self.assertTrue('SubsystemHandler' in symbols)
- self.assertTrue('Channel' in symbols)
- self.assertTrue('RSAKey' in symbols)
- self.assertTrue('DSSKey' in symbols)
- self.assertTrue('Message' in symbols)
- self.assertTrue('SSHException' in symbols)
- self.assertTrue('AuthenticationException' in symbols)
- self.assertTrue('PasswordRequiredException' in symbols)
- self.assertTrue('BadAuthenticationType' in symbols)
- self.assertTrue('ChannelException' in symbols)
- self.assertTrue('SFTP' in symbols)
- self.assertTrue('SFTPFile' in symbols)
- self.assertTrue('SFTPHandle' in symbols)
- self.assertTrue('SFTPClient' in symbols)
- self.assertTrue('SFTPServer' in symbols)
- self.assertTrue('SFTPError' in symbols)
- self.assertTrue('SFTPAttributes' in symbols)
- self.assertTrue('SFTPServerInterface' in symbols)
- self.assertTrue('ServerInterface' in symbols)
- self.assertTrue('BufferedFile' in symbols)
- self.assertTrue('Agent' in symbols)
- self.assertTrue('AgentKey' in symbols)
- self.assertTrue('HostKeys' in symbols)
- self.assertTrue('SSHConfig' in symbols)
- self.assertTrue('util' in symbols)
+ self.assertTrue("Transport" in symbols)
+ self.assertTrue("SSHClient" in symbols)
+ self.assertTrue("MissingHostKeyPolicy" in symbols)
+ self.assertTrue("AutoAddPolicy" in symbols)
+ self.assertTrue("RejectPolicy" in symbols)
+ self.assertTrue("WarningPolicy" in symbols)
+ self.assertTrue("SecurityOptions" in symbols)
+ self.assertTrue("SubsystemHandler" in symbols)
+ self.assertTrue("Channel" in symbols)
+ self.assertTrue("RSAKey" in symbols)
+ self.assertTrue("DSSKey" in symbols)
+ self.assertTrue("Message" in symbols)
+ self.assertTrue("SSHException" in symbols)
+ self.assertTrue("AuthenticationException" in symbols)
+ self.assertTrue("PasswordRequiredException" in symbols)
+ self.assertTrue("BadAuthenticationType" in symbols)
+ self.assertTrue("ChannelException" in symbols)
+ self.assertTrue("SFTP" in symbols)
+ self.assertTrue("SFTPFile" in symbols)
+ self.assertTrue("SFTPHandle" in symbols)
+ self.assertTrue("SFTPClient" in symbols)
+ self.assertTrue("SFTPServer" in symbols)
+ self.assertTrue("SFTPError" in symbols)
+ self.assertTrue("SFTPAttributes" in symbols)
+ self.assertTrue("SFTPServerInterface" in symbols)
+ self.assertTrue("ServerInterface" in symbols)
+ self.assertTrue("BufferedFile" in symbols)
+ self.assertTrue("Agent" in symbols)
+ self.assertTrue("AgentKey" in symbols)
+ self.assertTrue("HostKeys" in symbols)
+ self.assertTrue("SSHConfig" in symbols)
+ self.assertTrue("util" in symbols)
def test_parse_config(self):
global test_config_file
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
- self.assertEqual(config._config,
- [{'host': ['*'], 'config': {}}, {'host': ['*'], 'config': {'identityfile': ['~/.ssh/id_rsa'], 'user': 'robey'}},
- {'host': ['*.example.com'], 'config': {'user': 'bjork', 'port': '3333'}},
- {'host': ['*'], 'config': {'crazy': 'something dumb'}},
- {'host': ['spoo.example.com'], 'config': {'crazy': 'something else'}}])
+ self.assertEqual(
+ config._config,
+ [
+ {"host": ["*"], "config": {}},
+ {
+ "host": ["*"],
+ "config": {
+ "identityfile": ["~/.ssh/id_rsa"],
+ "user": "robey",
+ },
+ },
+ {
+ "host": ["*.example.com"],
+ "config": {"user": "bjork", "port": "3333"},
+ },
+ {"host": ["*"], "config": {"crazy": "something dumb"}},
+ {
+ "host": ["spoo.example.com"],
+ "config": {"crazy": "something else"},
+ },
+ ],
+ )
def test_host_config(self):
global test_config_file
@@ -120,44 +139,57 @@ class UtilTest(unittest.TestCase):
config = paramiko.util.parse_ssh_config(f)
for host, values in {
- 'irc.danger.com': {'crazy': 'something dumb',
- 'hostname': 'irc.danger.com',
- 'user': 'robey'},
- 'irc.example.com': {'crazy': 'something dumb',
- 'hostname': 'irc.example.com',
- 'user': 'robey',
- 'port': '3333'},
- 'spoo.example.com': {'crazy': 'something dumb',
- 'hostname': 'spoo.example.com',
- 'user': 'robey',
- 'port': '3333'}
+ "irc.danger.com": {
+ "crazy": "something dumb",
+ "hostname": "irc.danger.com",
+ "user": "robey",
+ },
+ "irc.example.com": {
+ "crazy": "something dumb",
+ "hostname": "irc.example.com",
+ "user": "robey",
+ "port": "3333",
+ },
+ "spoo.example.com": {
+ "crazy": "something dumb",
+ "hostname": "spoo.example.com",
+ "user": "robey",
+ "port": "3333",
+ },
}.items():
- values = dict(values,
+ values = dict(
+ values,
hostname=host,
- identityfile=[os.path.expanduser("~/.ssh/id_rsa")]
+ identityfile=[os.path.expanduser("~/.ssh/id_rsa")],
)
self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config),
- values
+ paramiko.util.lookup_ssh_host_config(host, config), values
)
def test_generate_key_bytes(self):
- x = paramiko.util.generate_key_bytes(sha1, b'ABCDEFGH', 'This is my secret passphrase.', 64)
- hex = ''.join(['%02x' % byte_ord(c) for c in x])
- self.assertEqual(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')
+ x = paramiko.util.generate_key_bytes(
+ sha1, b"ABCDEFGH", "This is my secret passphrase.", 64
+ )
+ hex = "".join(["%02x" % byte_ord(c) for c in x])
+ self.assertEqual(
+ hex,
+ "9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b",
+ )
def test_host_keys(self):
- with open('hostfile.temp', 'w') as f:
+ with open("hostfile.temp", "w") as f:
f.write(test_hosts_file)
try:
- hostdict = paramiko.util.load_host_keys('hostfile.temp')
+ hostdict = paramiko.util.load_host_keys("hostfile.temp")
self.assertEqual(2, len(hostdict))
self.assertEqual(1, len(list(hostdict.values())[0]))
self.assertEqual(1, len(list(hostdict.values())[1]))
- fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
- self.assertEqual(b'E6684DB30E109B67B70FF1DC5C7F1363', fp)
+ fp = hexlify(
+ hostdict["secure.example.com"]["ssh-rsa"].get_fingerprint()
+ ).upper()
+ self.assertEqual(b"E6684DB30E109B67B70FF1DC5C7F1363", fp)
finally:
- os.unlink('hostfile.temp')
+ os.unlink("hostfile.temp")
def test_host_config_expose_issue_33(self):
test_config_file = """
@@ -172,36 +204,44 @@ Host *
"""
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
- host = 'www13.example.com'
+ host = "www13.example.com"
self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
- {'hostname': host, 'port': '22'}
+ {"hostname": host, "port": "22"},
)
def test_eintr_retry(self):
- self.assertEqual('foo', paramiko.util.retry_on_signal(lambda: 'foo'))
+ self.assertEqual("foo", paramiko.util.retry_on_signal(lambda: "foo"))
# Variables that are set by raises_intr
intr_errors_remaining = [3]
call_count = [0]
+
def raises_intr():
call_count[0] += 1
if intr_errors_remaining[0] > 0:
intr_errors_remaining[0] -= 1
- raise IOError(errno.EINTR, 'file', 'interrupted system call')
+ raise IOError(errno.EINTR, "file", "interrupted system call")
+
self.assertTrue(paramiko.util.retry_on_signal(raises_intr) is None)
self.assertEqual(0, intr_errors_remaining[0])
self.assertEqual(4, call_count[0])
def raises_ioerror_not_eintr():
- raise IOError(errno.ENOENT, 'file', 'file not found')
- self.assertRaises(IOError,
- lambda: paramiko.util.retry_on_signal(raises_ioerror_not_eintr))
+ raise IOError(errno.ENOENT, "file", "file not found")
+
+ self.assertRaises(
+ IOError,
+ lambda: paramiko.util.retry_on_signal(raises_ioerror_not_eintr),
+ )
def raises_other_exception():
- raise AssertionError('foo')
- self.assertRaises(AssertionError,
- lambda: paramiko.util.retry_on_signal(raises_other_exception))
+ raise AssertionError("foo")
+
+ self.assertRaises(
+ AssertionError,
+ lambda: paramiko.util.retry_on_signal(raises_other_exception),
+ )
def test_proxycommand_config_equals_parsing(self):
"""
@@ -216,17 +256,18 @@ Host equals-delimited
"""
f = StringIO(conf)
config = paramiko.util.parse_ssh_config(f)
- for host in ('space-delimited', 'equals-delimited'):
+ for host in ("space-delimited", "equals-delimited"):
self.assertEqual(
- host_config(host, config)['proxycommand'],
- 'foo bar=biz baz'
+ host_config(host, config)["proxycommand"], "foo bar=biz baz"
)
def test_proxycommand_interpolation(self):
"""
ProxyCommand should perform interpolation on the value
"""
- config = paramiko.util.parse_ssh_config(StringIO("""
+ config = paramiko.util.parse_ssh_config(
+ StringIO(
+ """
Host specific
Port 37
ProxyCommand host %h port %p lol
@@ -237,28 +278,32 @@ Host portonly
Host *
Port 25
ProxyCommand host %h port %p
-"""))
+"""
+ )
+ )
for host, val in (
- ('foo.com', "host foo.com port 25"),
- ('specific', "host specific port 37 lol"),
- ('portonly', "host portonly port 155"),
+ ("foo.com", "host foo.com port 25"),
+ ("specific", "host specific port 37 lol"),
+ ("portonly", "host portonly port 155"),
):
- self.assertEqual(
- host_config(host, config)['proxycommand'],
- val
- )
+ self.assertEqual(host_config(host, config)["proxycommand"], val)
def test_proxycommand_tilde_expansion(self):
"""
Tilde (~) should be expanded inside ProxyCommand
"""
- config = paramiko.util.parse_ssh_config(StringIO("""
+ config = paramiko.util.parse_ssh_config(
+ StringIO(
+ """
Host test
ProxyCommand ssh -F ~/.ssh/test_config bastion nc %h %p
-"""))
+"""
+ )
+ )
self.assertEqual(
- 'ssh -F %s/.ssh/test_config bastion nc test 22' % os.path.expanduser('~'),
- host_config('test', config)['proxycommand']
+ "ssh -F %s/.ssh/test_config bastion nc test 22"
+ % os.path.expanduser("~"),
+ host_config("test", config)["proxycommand"],
)
def test_host_config_test_negation(self):
@@ -277,10 +322,10 @@ Host *
"""
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
- host = 'www13.example.com'
+ host = "www13.example.com"
self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
- {'hostname': host, 'port': '8080'}
+ {"hostname": host, "port": "8080"},
)
def test_host_config_test_proxycommand(self):
@@ -295,20 +340,24 @@ Host proxy-without-equal-divisor
ProxyCommand foo=bar:%h-%p
"""
for host, values in {
- 'proxy-with-equal-divisor-and-space' :{'hostname': 'proxy-with-equal-divisor-and-space',
- 'proxycommand': 'foo=bar'},
- 'proxy-with-equal-divisor-and-no-space':{'hostname': 'proxy-with-equal-divisor-and-no-space',
- 'proxycommand': 'foo=bar'},
- 'proxy-without-equal-divisor' :{'hostname': 'proxy-without-equal-divisor',
- 'proxycommand':
- 'foo=bar:proxy-without-equal-divisor-22'}
+ "proxy-with-equal-divisor-and-space": {
+ "hostname": "proxy-with-equal-divisor-and-space",
+ "proxycommand": "foo=bar",
+ },
+ "proxy-with-equal-divisor-and-no-space": {
+ "hostname": "proxy-with-equal-divisor-and-no-space",
+ "proxycommand": "foo=bar",
+ },
+ "proxy-without-equal-divisor": {
+ "hostname": "proxy-without-equal-divisor",
+ "proxycommand": "foo=bar:proxy-without-equal-divisor-22",
+ },
}.items():
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config),
- values
+ paramiko.util.lookup_ssh_host_config(host, config), values
)
def test_host_config_test_identityfile(self):
@@ -326,19 +375,21 @@ Host dsa2*
IdentityFile id_dsa22
"""
for host, values in {
- 'foo' :{'hostname': 'foo',
- 'identityfile': ['id_dsa0', 'id_dsa1']},
- 'dsa2' :{'hostname': 'dsa2',
- 'identityfile': ['id_dsa0', 'id_dsa1', 'id_dsa2', 'id_dsa22']},
- 'dsa22' :{'hostname': 'dsa22',
- 'identityfile': ['id_dsa0', 'id_dsa1', 'id_dsa22']}
+ "foo": {"hostname": "foo", "identityfile": ["id_dsa0", "id_dsa1"]},
+ "dsa2": {
+ "hostname": "dsa2",
+ "identityfile": ["id_dsa0", "id_dsa1", "id_dsa2", "id_dsa22"],
+ },
+ "dsa22": {
+ "hostname": "dsa22",
+ "identityfile": ["id_dsa0", "id_dsa1", "id_dsa22"],
+ },
}.items():
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config),
- values
+ paramiko.util.lookup_ssh_host_config(host, config), values
)
def test_config_addressfamily_and_lazy_fqdn(self):
@@ -350,7 +401,9 @@ AddressFamily inet
IdentityFile something_%l_using_fqdn
"""
config = paramiko.util.parse_ssh_config(StringIO(test_config))
- assert config.lookup('meh') # will die during lookup() if bug regresses
+ assert config.lookup(
+ "meh"
+ ) # will die during lookup() if bug regresses
def test_clamp_value(self):
self.assertEqual(32768, paramiko.util.clamp_value(32767, 32768, 32769))
@@ -366,7 +419,9 @@ IdentityFile something_%l_using_fqdn
def test_get_hostnames(self):
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
- self.assertEqual(config.get_hostnames(), set(['*', '*.example.com', 'spoo.example.com']))
+ self.assertEqual(
+ config.get_hostnames(), {"*", "*.example.com", "spoo.example.com"}
+ )
def test_quoted_host_names(self):
test_config_file = """\
@@ -383,27 +438,23 @@ Host param4 "p a r" "p" "par" para
Port 4444
"""
res = {
- 'param pam': {'hostname': 'param pam', 'port': '1111'},
- 'param': {'hostname': 'param', 'port': '1111'},
- 'pam': {'hostname': 'pam', 'port': '1111'},
-
- 'param2': {'hostname': 'param2', 'port': '2222'},
-
- 'param3': {'hostname': 'param3', 'port': '3333'},
- 'parara': {'hostname': 'parara', 'port': '3333'},
-
- 'param4': {'hostname': 'param4', 'port': '4444'},
- 'p a r': {'hostname': 'p a r', 'port': '4444'},
- 'p': {'hostname': 'p', 'port': '4444'},
- 'par': {'hostname': 'par', 'port': '4444'},
- 'para': {'hostname': 'para', 'port': '4444'},
+ "param pam": {"hostname": "param pam", "port": "1111"},
+ "param": {"hostname": "param", "port": "1111"},
+ "pam": {"hostname": "pam", "port": "1111"},
+ "param2": {"hostname": "param2", "port": "2222"},
+ "param3": {"hostname": "param3", "port": "3333"},
+ "parara": {"hostname": "parara", "port": "3333"},
+ "param4": {"hostname": "param4", "port": "4444"},
+ "p a r": {"hostname": "p a r", "port": "4444"},
+ "p": {"hostname": "p", "port": "4444"},
+ "par": {"hostname": "par", "port": "4444"},
+ "para": {"hostname": "para", "port": "4444"},
}
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
for host, values in res.items():
self.assertEquals(
- paramiko.util.lookup_ssh_host_config(host, config),
- values
+ paramiko.util.lookup_ssh_host_config(host, config), values
)
def test_quoted_params_in_config(self):
@@ -419,62 +470,54 @@ Host param3 parara
IdentityFile "test rsa key"
"""
res = {
- 'param pam': {'hostname': 'param pam', 'identityfile': ['id_rsa']},
- 'param': {'hostname': 'param', 'identityfile': ['id_rsa']},
- 'pam': {'hostname': 'pam', 'identityfile': ['id_rsa']},
-
- 'param2': {'hostname': 'param2', 'identityfile': ['test rsa key']},
-
- 'param3': {'hostname': 'param3', 'identityfile': ['id_rsa', 'test rsa key']},
- 'parara': {'hostname': 'parara', 'identityfile': ['id_rsa', 'test rsa key']},
+ "param pam": {"hostname": "param pam", "identityfile": ["id_rsa"]},
+ "param": {"hostname": "param", "identityfile": ["id_rsa"]},
+ "pam": {"hostname": "pam", "identityfile": ["id_rsa"]},
+ "param2": {"hostname": "param2", "identityfile": ["test rsa key"]},
+ "param3": {
+ "hostname": "param3",
+ "identityfile": ["id_rsa", "test rsa key"],
+ },
+ "parara": {
+ "hostname": "parara",
+ "identityfile": ["id_rsa", "test rsa key"],
+ },
}
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
for host, values in res.items():
self.assertEquals(
- paramiko.util.lookup_ssh_host_config(host, config),
- values
+ paramiko.util.lookup_ssh_host_config(host, config), values
)
def test_quoted_host_in_config(self):
conf = SSHConfig()
correct_data = {
- 'param': ['param'],
- '"param"': ['param'],
-
- 'param pam': ['param', 'pam'],
- '"param" "pam"': ['param', 'pam'],
- '"param" pam': ['param', 'pam'],
- 'param "pam"': ['param', 'pam'],
-
- 'param "pam" p': ['param', 'pam', 'p'],
- '"param" pam "p"': ['param', 'pam', 'p'],
-
- '"pa ram"': ['pa ram'],
- '"pa ram" pam': ['pa ram', 'pam'],
- 'param "p a m"': ['param', 'p a m'],
+ "param": ["param"],
+ '"param"': ["param"],
+ "param pam": ["param", "pam"],
+ '"param" "pam"': ["param", "pam"],
+ '"param" pam': ["param", "pam"],
+ 'param "pam"': ["param", "pam"],
+ 'param "pam" p': ["param", "pam", "p"],
+ '"param" pam "p"': ["param", "pam", "p"],
+ '"pa ram"': ["pa ram"],
+ '"pa ram" pam': ["pa ram", "pam"],
+ 'param "p a m"': ["param", "p a m"],
}
- incorrect_data = [
- 'param"',
- '"param',
- 'param "pam',
- 'param "pam" "p a',
- ]
+ incorrect_data = ['param"', '"param', 'param "pam', 'param "pam" "p a']
for host, values in correct_data.items():
- self.assertEquals(
- conf._get_hosts(host),
- values
- )
+ self.assertEquals(conf._get_hosts(host), values)
for host in incorrect_data:
self.assertRaises(Exception, conf._get_hosts, host)
def test_safe_string(self):
- vanilla = b("vanilla")
- has_bytes = b("has \7\3 bytes")
+ vanilla = b"vanilla"
+ has_bytes = b"has \7\3 bytes"
safe_vanilla = safe_string(vanilla)
safe_has_bytes = safe_string(has_bytes)
- expected_bytes = b("has %07%03 bytes")
- err = "{0!r} != {1!r}"
+ expected_bytes = b"has %07%03 bytes"
+ err = "{!r} != {!r}"
msg = err.format(safe_vanilla, vanilla)
assert safe_vanilla == vanilla, msg
msg = err.format(safe_has_bytes, expected_bytes)
@@ -489,15 +532,18 @@ Host proxycommand-with-equals-none
ProxyCommand=None
"""
for host, values in {
- 'proxycommand-standard-none': {'hostname': 'proxycommand-standard-none'},
- 'proxycommand-with-equals-none': {'hostname': 'proxycommand-with-equals-none'}
+ "proxycommand-standard-none": {
+ "hostname": "proxycommand-standard-none"
+ },
+ "proxycommand-with-equals-none": {
+ "hostname": "proxycommand-with-equals-none"
+ },
}.items():
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEqual(
- paramiko.util.lookup_ssh_host_config(host, config),
- values
+ paramiko.util.lookup_ssh_host_config(host, config), values
)
def test_proxycommand_none_masking(self):
@@ -520,12 +566,10 @@ Host *
# backwards compatibility reasons in 1.x/2.x) appear completely blank,
# as if the host had no ProxyCommand whatsoever.
# Threw another unrelated host in there just for sanity reasons.
- self.assertFalse('proxycommand' in config.lookup('specific-host'))
+ self.assertFalse("proxycommand" in config.lookup("specific-host"))
self.assertEqual(
- config.lookup('other-host')['proxycommand'],
- 'other-proxy'
+ config.lookup("other-host")["proxycommand"], "other-proxy"
)
self.assertEqual(
- config.lookup('some-random-host')['proxycommand'],
- 'default-proxy'
+ config.lookup("some-random-host")["proxycommand"], "default-proxy"
)