# Copyright (C) 2003-2009 Robey Pointer # # This file is part of paramiko. # # Paramiko is free software; you can redistribute it and/or modify it under the # terms of the GNU Lesser General Public License as published by the Free # Software Foundation; either version 2.1 of the License, or (at your option) # any later version. # # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. """ Some unit tests for utility functions. """ from binascii import hexlify import errno import os from hashlib import sha1 import unittest import paramiko.util from paramiko.util import lookup_ssh_host_config as host_config, safe_string from paramiko.py3compat import StringIO, byte_ord, b # Note some lines in this configuration have trailing spaces on purpose test_config_file = """\ Host * User robey IdentityFile =~/.ssh/id_rsa # comment Host *.example.com \tUser bjork Port=3333 Host * """ dont_strip_whitespace_please = "\t \t Crazy something dumb " test_config_file += dont_strip_whitespace_please test_config_file += """ Host spoo.example.com Crazy something else """ test_hosts_file = """\ secure.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA1PD6U2/TVxET6lkpKhOk5r\ 9q/kAYG6sP9f5zuUYP8i7FOFp/6ncCEbbtg/lB+A3iidyxoSWl+9jtoyyDOOVX4UIDV9G11Ml8om3\ D+jrpI9cycZHqilK0HmxDeCuxbwyMuaCygU9gS2qoRvNLWZk70OpIKSSpBo0Wl3/XUmz9uhc= happy.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31M\ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\ 5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M= """ # for test 1: from paramiko import * class UtilTest(unittest.TestCase): def test_import(self): """ verify that all the classes can be imported from paramiko. """ symbols = list(globals().keys()) self.assertTrue('Transport' in symbols) self.assertTrue('SSHClient' in symbols) self.assertTrue('MissingHostKeyPolicy' in symbols) self.assertTrue('AutoAddPolicy' in symbols) self.assertTrue('RejectPolicy' in symbols) self.assertTrue('WarningPolicy' in symbols) self.assertTrue('SecurityOptions' in symbols) self.assertTrue('SubsystemHandler' in symbols) self.assertTrue('Channel' in symbols) self.assertTrue('RSAKey' in symbols) self.assertTrue('DSSKey' in symbols) self.assertTrue('Message' in symbols) self.assertTrue('SSHException' in symbols) self.assertTrue('AuthenticationException' in symbols) self.assertTrue('PasswordRequiredException' in symbols) self.assertTrue('BadAuthenticationType' in symbols) self.assertTrue('ChannelException' in symbols) self.assertTrue('SFTP' in symbols) self.assertTrue('SFTPFile' in symbols) self.assertTrue('SFTPHandle' in symbols) self.assertTrue('SFTPClient' in symbols) self.assertTrue('SFTPServer' in symbols) self.assertTrue('SFTPError' in symbols) self.assertTrue('SFTPAttributes' in symbols) self.assertTrue('SFTPServerInterface' in symbols) self.assertTrue('ServerInterface' in symbols) self.assertTrue('BufferedFile' in symbols) self.assertTrue('Agent' in symbols) self.assertTrue('AgentKey' in symbols) self.assertTrue('HostKeys' in symbols) self.assertTrue('SSHConfig' in symbols) self.assertTrue('util' in symbols) def test_parse_config(self): global test_config_file f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) self.assertEqual(config._config, [{'host': ['*'], 'config': {}}, {'host': ['*'], 'config': {'identityfile': ['~/.ssh/id_rsa'], 'user': 'robey'}}, {'host': ['*.example.com'], 'config': {'user': 'bjork', 'port': '3333'}}, {'host': ['*'], 'config': {'crazy': 'something dumb'}}, {'host': ['spoo.example.com'], 'config': {'crazy': 'something else'}}]) def test_host_config(self): global test_config_file f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) for host, values in { 'irc.danger.com': {'crazy': 'something dumb', 'hostname': 'irc.danger.com', 'user': 'robey'}, 'irc.example.com': {'crazy': 'something dumb', 'hostname': 'irc.example.com', 'user': 'robey', 'port': '3333'}, 'spoo.example.com': {'crazy': 'something dumb', 'hostname': 'spoo.example.com', 'user': 'robey', 'port': '3333'} }.items(): values = dict(values, hostname=host, identityfile=[os.path.expanduser("~/.ssh/id_rsa")] ) self.assertEqual( paramiko.util.lookup_ssh_host_config(host, config), values ) def test_generate_key_bytes(self): x = paramiko.util.generate_key_bytes(sha1, b'ABCDEFGH', 'This is my secret passphrase.', 64) hex = ''.join(['%02x' % byte_ord(c) for c in x]) self.assertEqual(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b') def test_host_keys(self): with open('hostfile.temp', 'w') as f: f.write(test_hosts_file) try: hostdict = paramiko.util.load_host_keys('hostfile.temp') self.assertEqual(2, len(hostdict)) self.assertEqual(1, len(list(hostdict.values())[0])) self.assertEqual(1, len(list(hostdict.values())[1])) fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper() self.assertEqual(b'E6684DB30E109B67B70FF1DC5C7F1363', fp) finally: os.unlink('hostfile.temp') def test_host_config_expose_issue_33(self): test_config_file = """ Host www13.* Port 22 Host *.example.com Port 2222 Host * Port 3333 """ f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) host = 'www13.example.com' self.assertEqual( paramiko.util.lookup_ssh_host_config(host, config), {'hostname': host, 'port': '22'} ) def test_eintr_retry(self): self.assertEqual('foo', paramiko.util.retry_on_signal(lambda: 'foo')) # Variables that are set by raises_intr intr_errors_remaining = [3] call_count = [0] def raises_intr(): call_count[0] += 1 if intr_errors_remaining[0] > 0: intr_errors_remaining[0] -= 1 raise IOError(errno.EINTR, 'file', 'interrupted system call') self.assertTrue(paramiko.util.retry_on_signal(raises_intr) is None) self.assertEqual(0, intr_errors_remaining[0]) self.assertEqual(4, call_count[0]) def raises_ioerror_not_eintr(): raise IOError(errno.ENOENT, 'file', 'file not found') self.assertRaises(IOError, lambda: paramiko.util.retry_on_signal(raises_ioerror_not_eintr)) def raises_other_exception(): raise AssertionError('foo') self.assertRaises(AssertionError, lambda: paramiko.util.retry_on_signal(raises_other_exception)) def test_proxycommand_config_equals_parsing(self): """ ProxyCommand should not split on equals signs within the value. """ conf = """ Host space-delimited ProxyCommand foo bar=biz baz Host equals-delimited ProxyCommand=foo bar=biz baz """ f = StringIO(conf) config = paramiko.util.parse_ssh_config(f) for host in ('space-delimited', 'equals-delimited'): self.assertEqual( host_config(host, config)['proxycommand'], 'foo bar=biz baz' ) def test_proxycommand_interpolation(self): """ ProxyCommand should perform interpolation on the value """ config = paramiko.util.parse_ssh_config(StringIO(""" Host specific Port 37 ProxyCommand host %h port %p lol Host portonly Port 155 Host * Port 25 ProxyCommand host %h port %p """)) for host, val in ( ('foo.com', "host foo.com port 25"), ('specific', "host specific port 37 lol"), ('portonly', "host portonly port 155"), ): self.assertEqual( host_config(host, config)['proxycommand'], val ) def test_proxycommand_tilde_expansion(self): """ Tilde (~) should be expanded inside ProxyCommand """ config = paramiko.util.parse_ssh_config(StringIO(""" Host test ProxyCommand ssh -F ~/.ssh/test_config bastion nc %h %p """)) self.assertEqual( 'ssh -F %s/.ssh/test_config bastion nc test 22' % os.path.expanduser('~'), host_config('test', config)['proxycommand'] ) def test_host_config_test_negation(self): test_config_file = """ Host www13.* !*.example.com Port 22 Host *.example.com !www13.* Port 2222 Host www13.* Port 8080 Host * Port 3333 """ f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) host = 'www13.example.com' self.assertEqual( paramiko.util.lookup_ssh_host_config(host, config), {'hostname': host, 'port': '8080'} ) def test_host_config_test_proxycommand(self): test_config_file = """ Host proxy-with-equal-divisor-and-space ProxyCommand = foo=bar Host proxy-with-equal-divisor-and-no-space ProxyCommand=foo=bar Host proxy-without-equal-divisor ProxyCommand foo=bar:%h-%p """ for host, values in { 'proxy-with-equal-divisor-and-space' :{'hostname': 'proxy-with-equal-divisor-and-space', 'proxycommand': 'foo=bar'}, 'proxy-with-equal-divisor-and-no-space':{'hostname': 'proxy-with-equal-divisor-and-no-space', 'proxycommand': 'foo=bar'}, 'proxy-without-equal-divisor' :{'hostname': 'proxy-without-equal-divisor', 'proxycommand': 'foo=bar:proxy-without-equal-divisor-22'} }.items(): f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) self.assertEqual( paramiko.util.lookup_ssh_host_config(host, config), values ) def test_host_config_test_identityfile(self): test_config_file = """ IdentityFile id_dsa0 Host * IdentityFile id_dsa1 Host dsa2 IdentityFile id_dsa2 Host dsa2* IdentityFile id_dsa22 """ for host, values in { 'foo' :{'hostname': 'foo', 'identityfile': ['id_dsa0', 'id_dsa1']}, 'dsa2' :{'hostname': 'dsa2', 'identityfile': ['id_dsa0', 'id_dsa1', 'id_dsa2', 'id_dsa22']}, 'dsa22' :{'hostname': 'dsa22', 'identityfile': ['id_dsa0', 'id_dsa1', 'id_dsa22']} }.items(): f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) self.assertEqual( paramiko.util.lookup_ssh_host_config(host, config), values ) def test_config_addressfamily_and_lazy_fqdn(self): """ Ensure the code path honoring non-'all' AddressFamily doesn't asplode """ test_config = """ AddressFamily inet IdentityFile something_%l_using_fqdn """ config = paramiko.util.parse_ssh_config(StringIO(test_config)) assert config.lookup('meh') # will die during lookup() if bug regresses def test_clamp_value(self): self.assertEqual(32768, paramiko.util.clamp_value(32767, 32768, 32769)) self.assertEqual(32767, paramiko.util.clamp_value(32767, 32765, 32769)) self.assertEqual(32769, paramiko.util.clamp_value(32767, 32770, 32769)) def test_config_dos_crlf_succeeds(self): config_file = StringIO("host abcqwerty\r\nHostName 127.0.0.1\r\n") config = paramiko.SSHConfig() config.parse(config_file) self.assertEqual(config.lookup("abcqwerty")["hostname"], "127.0.0.1") def test_get_hostnames(self): f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) self.assertEqual(config.get_hostnames(), {'*', '*.example.com', 'spoo.example.com'}) def test_quoted_host_names(self): test_config_file = """\ Host "param pam" param "pam" Port 1111 Host "param2" Port 2222 Host param3 parara Port 3333 Host param4 "p a r" "p" "par" para Port 4444 """ res = { 'param pam': {'hostname': 'param pam', 'port': '1111'}, 'param': {'hostname': 'param', 'port': '1111'}, 'pam': {'hostname': 'pam', 'port': '1111'}, 'param2': {'hostname': 'param2', 'port': '2222'}, 'param3': {'hostname': 'param3', 'port': '3333'}, 'parara': {'hostname': 'parara', 'port': '3333'}, 'param4': {'hostname': 'param4', 'port': '4444'}, 'p a r': {'hostname': 'p a r', 'port': '4444'}, 'p': {'hostname': 'p', 'port': '4444'}, 'par': {'hostname': 'par', 'port': '4444'}, 'para': {'hostname': 'para', 'port': '4444'}, } f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) for host, values in res.items(): self.assertEquals( paramiko.util.lookup_ssh_host_config(host, config), values ) def test_quoted_params_in_config(self): test_config_file = """\ Host "param pam" param "pam" IdentityFile id_rsa Host "param2" IdentityFile "test rsa key" Host param3 parara IdentityFile id_rsa IdentityFile "test rsa key" """ res = { 'param pam': {'hostname': 'param pam', 'identityfile': ['id_rsa']}, 'param': {'hostname': 'param', 'identityfile': ['id_rsa']}, 'pam': {'hostname': 'pam', 'identityfile': ['id_rsa']}, 'param2': {'hostname': 'param2', 'identityfile': ['test rsa key']}, 'param3': {'hostname': 'param3', 'identityfile': ['id_rsa', 'test rsa key']}, 'parara': {'hostname': 'parara', 'identityfile': ['id_rsa', 'test rsa key']}, } f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) for host, values in res.items(): self.assertEquals( paramiko.util.lookup_ssh_host_config(host, config), values ) def test_quoted_host_in_config(self): conf = SSHConfig() correct_data = { 'param': ['param'], '"param"': ['param'], 'param pam': ['param', 'pam'], '"param" "pam"': ['param', 'pam'], '"param" pam': ['param', 'pam'], 'param "pam"': ['param', 'pam'], 'param "pam" p': ['param', 'pam', 'p'], '"param" pam "p"': ['param', 'pam', 'p'], '"pa ram"': ['pa ram'], '"pa ram" pam': ['pa ram', 'pam'], 'param "p a m"': ['param', 'p a m'], } incorrect_data = [ 'param"', '"param', 'param "pam', 'param "pam" "p a', ] for host, values in correct_data.items(): self.assertEquals( conf._get_hosts(host), values ) for host in incorrect_data: self.assertRaises(Exception, conf._get_hosts, host) def test_safe_string(self): vanilla = b"vanilla" has_bytes = b"has \7\3 bytes" safe_vanilla = safe_string(vanilla) safe_has_bytes = safe_string(has_bytes) expected_bytes = b"has %07%03 bytes" err = "{!r} != {!r}" msg = err.format(safe_vanilla, vanilla) assert safe_vanilla == vanilla, msg msg = err.format(safe_has_bytes, expected_bytes) assert safe_has_bytes == expected_bytes, msg def test_proxycommand_none_issue_418(self): test_config_file = """ Host proxycommand-standard-none ProxyCommand None Host proxycommand-with-equals-none ProxyCommand=None """ for host, values in { 'proxycommand-standard-none': {'hostname': 'proxycommand-standard-none'}, 'proxycommand-with-equals-none': {'hostname': 'proxycommand-with-equals-none'} }.items(): f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) self.assertEqual( paramiko.util.lookup_ssh_host_config(host, config), values ) def test_proxycommand_none_masking(self): # Re: https://github.com/paramiko/paramiko/issues/670 source_config = """ Host specific-host ProxyCommand none Host other-host ProxyCommand other-proxy Host * ProxyCommand default-proxy """ config = paramiko.SSHConfig() config.parse(StringIO(source_config)) # When bug is present, the full stripping-out of specific-host's # ProxyCommand means it actually appears to pick up the default # ProxyCommand value instead, due to cascading. It should (for # backwards compatibility reasons in 1.x/2.x) appear completely blank, # as if the host had no ProxyCommand whatsoever. # Threw another unrelated host in there just for sanity reasons. self.assertFalse('proxycommand' in config.lookup('specific-host')) self.assertEqual( config.lookup('other-host')['proxycommand'], 'other-proxy' ) self.assertEqual( config.lookup('some-random-host')['proxycommand'], 'default-proxy' )