summaryrefslogtreecommitdiffhomepage
path: root/tests/test_util.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_util.py')
-rw-r--r--tests/test_util.py229
1 files changed, 195 insertions, 34 deletions
diff --git a/tests/test_util.py b/tests/test_util.py
index 458709b2..0e7d0b2b 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -7,7 +7,7 @@
# Software Foundation; either version 2.1 of the License, or (at your option)
# any later version.
#
-# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
@@ -21,14 +21,14 @@ Some unit tests for utility functions.
"""
from binascii import hexlify
-import cStringIO
import errno
import os
-import unittest
from Crypto.Hash import SHA
import paramiko.util
+from paramiko.util import lookup_ssh_host_config as host_config, safe_string
+from paramiko.py3compat import StringIO, byte_ord, b
-from util import ParamikoTest
+from tests.util import ParamikoTest
test_config_file = """\
Host *
@@ -64,7 +64,7 @@ class UtilTest(ParamikoTest):
"""
verify that all the classes can be imported from paramiko.
"""
- symbols = globals().keys()
+ symbols = list(globals().keys())
self.assertTrue('Transport' in symbols)
self.assertTrue('SSHClient' in symbols)
self.assertTrue('MissingHostKeyPolicy' in symbols)
@@ -100,48 +100,56 @@ class UtilTest(ParamikoTest):
def test_2_parse_config(self):
global test_config_file
- f = cStringIO.StringIO(test_config_file)
+ f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
- self.assertEquals(config._config,
- [ {'identityfile': '~/.ssh/id_rsa', 'host': '*', 'user': 'robey',
- 'crazy': 'something dumb '},
- {'host': '*.example.com', 'user': 'bjork', 'port': '3333'},
- {'host': 'spoo.example.com', 'crazy': 'something else'}])
+ self.assertEqual(config._config,
+ [{'host': ['*'], 'config': {}}, {'host': ['*'], 'config': {'identityfile': ['~/.ssh/id_rsa'], 'user': 'robey'}},
+ {'host': ['*.example.com'], 'config': {'user': 'bjork', 'port': '3333'}},
+ {'host': ['*'], 'config': {'crazy': 'something dumb '}},
+ {'host': ['spoo.example.com'], 'config': {'crazy': 'something else'}}])
def test_3_host_config(self):
global test_config_file
- f = cStringIO.StringIO(test_config_file)
+ f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
+
for host, values in {
- 'irc.danger.com': {'user': 'robey', 'crazy': 'something dumb '},
- 'irc.example.com': {'user': 'bjork', 'crazy': 'something dumb ', 'port': '3333'},
- 'spoo.example.com': {'user': 'bjork', 'crazy': 'something else', 'port': '3333'}
+ 'irc.danger.com': {'crazy': 'something dumb ',
+ 'hostname': 'irc.danger.com',
+ 'user': 'robey'},
+ 'irc.example.com': {'crazy': 'something dumb ',
+ 'hostname': 'irc.example.com',
+ 'user': 'robey',
+ 'port': '3333'},
+ 'spoo.example.com': {'crazy': 'something dumb ',
+ 'hostname': 'spoo.example.com',
+ 'user': 'robey',
+ 'port': '3333'}
}.items():
values = dict(values,
hostname=host,
- identityfile=os.path.expanduser("~/.ssh/id_rsa")
+ identityfile=[os.path.expanduser("~/.ssh/id_rsa")]
)
- self.assertEquals(
+ self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
values
)
def test_4_generate_key_bytes(self):
- x = paramiko.util.generate_key_bytes(SHA, 'ABCDEFGH', 'This is my secret passphrase.', 64)
- hex = ''.join(['%02x' % ord(c) for c in x])
- self.assertEquals(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')
+ x = paramiko.util.generate_key_bytes(SHA, b'ABCDEFGH', 'This is my secret passphrase.', 64)
+ hex = ''.join(['%02x' % byte_ord(c) for c in x])
+ self.assertEqual(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')
def test_5_host_keys(self):
- f = open('hostfile.temp', 'w')
- f.write(test_hosts_file)
- f.close()
+ with open('hostfile.temp', 'w') as f:
+ f.write(test_hosts_file)
try:
hostdict = paramiko.util.load_host_keys('hostfile.temp')
- self.assertEquals(2, len(hostdict))
- self.assertEquals(1, len(hostdict.values()[0]))
- self.assertEquals(1, len(hostdict.values()[1]))
+ self.assertEqual(2, len(hostdict))
+ self.assertEqual(1, len(list(hostdict.values())[0]))
+ self.assertEqual(1, len(list(hostdict.values())[1]))
fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
- self.assertEquals('E6684DB30E109B67B70FF1DC5C7F1363', fp)
+ self.assertEqual(b'E6684DB30E109B67B70FF1DC5C7F1363', fp)
finally:
os.unlink('hostfile.temp')
@@ -149,8 +157,8 @@ class UtilTest(ParamikoTest):
from paramiko.common import rng
# just verify that we can pull out 32 bytes and not get an exception.
x = rng.read(32)
- self.assertEquals(len(x), 32)
-
+ self.assertEqual(len(x), 32)
+
def test_7_host_config_expose_issue_33(self):
test_config_file = """
Host www13.*
@@ -162,16 +170,16 @@ Host *.example.com
Host *
Port 3333
"""
- f = cStringIO.StringIO(test_config_file)
+ f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
host = 'www13.example.com'
- self.assertEquals(
+ self.assertEqual(
paramiko.util.lookup_ssh_host_config(host, config),
{'hostname': host, 'port': '22'}
)
def test_8_eintr_retry(self):
- self.assertEquals('foo', paramiko.util.retry_on_signal(lambda: 'foo'))
+ self.assertEqual('foo', paramiko.util.retry_on_signal(lambda: 'foo'))
# Variables that are set by raises_intr
intr_errors_remaining = [3]
@@ -182,8 +190,8 @@ Host *
intr_errors_remaining[0] -= 1
raise IOError(errno.EINTR, 'file', 'interrupted system call')
self.assertTrue(paramiko.util.retry_on_signal(raises_intr) is None)
- self.assertEquals(0, intr_errors_remaining[0])
- self.assertEquals(4, call_count[0])
+ self.assertEqual(0, intr_errors_remaining[0])
+ self.assertEqual(4, call_count[0])
def raises_ioerror_not_eintr():
raise IOError(errno.ENOENT, 'file', 'file not found')
@@ -194,3 +202,156 @@ Host *
raise AssertionError('foo')
self.assertRaises(AssertionError,
lambda: paramiko.util.retry_on_signal(raises_other_exception))
+
+ def test_9_proxycommand_config_equals_parsing(self):
+ """
+ ProxyCommand should not split on equals signs within the value.
+ """
+ conf = """
+Host space-delimited
+ ProxyCommand foo bar=biz baz
+
+Host equals-delimited
+ ProxyCommand=foo bar=biz baz
+"""
+ f = StringIO(conf)
+ config = paramiko.util.parse_ssh_config(f)
+ for host in ('space-delimited', 'equals-delimited'):
+ self.assertEqual(
+ host_config(host, config)['proxycommand'],
+ 'foo bar=biz baz'
+ )
+
+ def test_10_proxycommand_interpolation(self):
+ """
+ ProxyCommand should perform interpolation on the value
+ """
+ config = paramiko.util.parse_ssh_config(StringIO("""
+Host specific
+ Port 37
+ ProxyCommand host %h port %p lol
+
+Host portonly
+ Port 155
+
+Host *
+ Port 25
+ ProxyCommand host %h port %p
+"""))
+ for host, val in (
+ ('foo.com', "host foo.com port 25"),
+ ('specific', "host specific port 37 lol"),
+ ('portonly', "host portonly port 155"),
+ ):
+ self.assertEqual(
+ host_config(host, config)['proxycommand'],
+ val
+ )
+
+ def test_11_host_config_test_negation(self):
+ test_config_file = """
+Host www13.* !*.example.com
+ Port 22
+
+Host *.example.com !www13.*
+ Port 2222
+
+Host www13.*
+ Port 8080
+
+Host *
+ Port 3333
+ """
+ f = StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ host = 'www13.example.com'
+ self.assertEqual(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ {'hostname': host, 'port': '8080'}
+ )
+
+ def test_12_host_config_test_proxycommand(self):
+ test_config_file = """
+Host proxy-with-equal-divisor-and-space
+ProxyCommand = foo=bar
+
+Host proxy-with-equal-divisor-and-no-space
+ProxyCommand=foo=bar
+
+Host proxy-without-equal-divisor
+ProxyCommand foo=bar:%h-%p
+ """
+ for host, values in {
+ 'proxy-with-equal-divisor-and-space' :{'hostname': 'proxy-with-equal-divisor-and-space',
+ 'proxycommand': 'foo=bar'},
+ 'proxy-with-equal-divisor-and-no-space':{'hostname': 'proxy-with-equal-divisor-and-no-space',
+ 'proxycommand': 'foo=bar'},
+ 'proxy-without-equal-divisor' :{'hostname': 'proxy-without-equal-divisor',
+ 'proxycommand':
+ 'foo=bar:proxy-without-equal-divisor-22'}
+ }.items():
+
+ f = StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ self.assertEqual(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ values
+ )
+
+ def test_11_host_config_test_identityfile(self):
+ test_config_file = """
+
+IdentityFile id_dsa0
+
+Host *
+IdentityFile id_dsa1
+
+Host dsa2
+IdentityFile id_dsa2
+
+Host dsa2*
+IdentityFile id_dsa22
+ """
+ for host, values in {
+ 'foo' :{'hostname': 'foo',
+ 'identityfile': ['id_dsa0', 'id_dsa1']},
+ 'dsa2' :{'hostname': 'dsa2',
+ 'identityfile': ['id_dsa0', 'id_dsa1', 'id_dsa2', 'id_dsa22']},
+ 'dsa22' :{'hostname': 'dsa22',
+ 'identityfile': ['id_dsa0', 'id_dsa1', 'id_dsa22']}
+ }.items():
+
+ f = StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ self.assertEqual(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ values
+ )
+
+ def test_12_config_addressfamily_and_lazy_fqdn(self):
+ """
+ Ensure the code path honoring non-'all' AddressFamily doesn't asplode
+ """
+ test_config = """
+AddressFamily inet
+IdentityFile something_%l_using_fqdn
+"""
+ config = paramiko.util.parse_ssh_config(StringIO(test_config))
+ assert config.lookup('meh') # will die during lookup() if bug regresses
+
+ def test_13_config_dos_crlf_succeeds(self):
+ config_file = StringIO("host abcqwerty\r\nHostName 127.0.0.1\r\n")
+ config = paramiko.SSHConfig()
+ config.parse(config_file)
+ self.assertEqual(config.lookup("abcqwerty")["hostname"], "127.0.0.1")
+
+ def test_safe_string(self):
+ vanilla = b("vanilla")
+ has_bytes = b("has \7\3 bytes")
+ safe_vanilla = safe_string(vanilla)
+ safe_has_bytes = safe_string(has_bytes)
+ expected_bytes = b("has %07%03 bytes")
+ err = "{0!r} != {1!r}"
+ assert safe_vanilla == vanilla, err.format(safe_vanilla, vanilla)
+ assert safe_has_bytes == expected_bytes, \
+ err.format(safe_has_bytes, expected_bytes)