diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/configs/match-exec | 2 | ||||
-rw-r--r-- | tests/test_config.py | 30 | ||||
-rw-r--r-- | tests/test_pkey.py | 60 | ||||
-rw-r--r-- | tests/test_transport.py | 8 |
4 files changed, 91 insertions, 9 deletions
diff --git a/tests/configs/match-exec b/tests/configs/match-exec index 763346ea..62a147aa 100644 --- a/tests/configs/match-exec +++ b/tests/configs/match-exec @@ -12,5 +12,5 @@ Host target User intermediate HostName configured -Match exec "%d %h %L %l %n %p %r %u" +Match exec "%C %d %h %L %l %n %p %r %u" Port 1337 diff --git a/tests/test_config.py b/tests/test_config.py index 5e9aa059..b46dc7b4 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -42,6 +42,7 @@ def socket(): # Patch out getfqdn to return some real string for when it gets called; # some code (eg tokenization) gets mad w/ MagicMocks mocket.getfqdn.return_value = "some.fake.fqdn" + mocket.gethostname.return_value = "local.fake.fqdn" yield mocket @@ -206,7 +207,7 @@ Host test assert got == expected @patch("paramiko.config.getpass") - def test_controlpath_token_expansion(self, getpass): + def test_controlpath_token_expansion(self, getpass, socket): getpass.getuser.return_value = "gandalf" config = SSHConfig.from_text( """ @@ -217,6 +218,9 @@ Host explicit_user Host explicit_host HostName ohai ControlPath remoteuser %r host %h orighost %n + +Host hashbrowns + ControlPath %C """ ) result = config.lookup("explicit_user")["controlpath"] @@ -225,6 +229,9 @@ Host explicit_host result = config.lookup("explicit_host")["controlpath"] # Remote user falls back to local user; host and orighost may differ assert result == "remoteuser gandalf host ohai orighost explicit_host" + # Supports %C + result = config.lookup("hashbrowns")["controlpath"] + assert result == "a438e7dbf5308b923aba9db8fe2ca63447ac8688" def test_negation(self): config = SSHConfig.from_text( @@ -276,10 +283,11 @@ ProxyCommand foo=bar:%h-%p assert config.lookup(host) == values - def test_identityfile(self): + @patch("paramiko.config.getpass") + def test_identityfile(self, getpass, socket): + getpass.getuser.return_value = "gandalf" config = SSHConfig.from_text( """ - IdentityFile id_dsa0 Host * @@ -290,6 +298,9 @@ IdentityFile id_dsa2 Host dsa2* IdentityFile id_dsa22 + +Host hashbrowns +IdentityFile %C """ ) for host, values in { @@ -302,8 +313,15 @@ IdentityFile id_dsa22 "hostname": "dsa22", "identityfile": ["id_dsa0", "id_dsa1", "id_dsa22"], }, + "hashbrowns": { + "hostname": "hashbrowns", + "identityfile": [ + "id_dsa0", + "id_dsa1", + "a438e7dbf5308b923aba9db8fe2ca63447ac8688", + ], + }, }.items(): - assert config.lookup(host) == values def test_config_addressfamily_and_lazy_fqdn(self): @@ -739,10 +757,10 @@ class TestMatchExec(object): @patch("paramiko.config.getpass") @patch("paramiko.config.invoke.run") def test_tokenizes_argument(self, run, getpass, socket): - socket.gethostname.return_value = "local.fqdn" getpass.getuser.return_value = "gandalf" - # Actual exec value is "%d %h %L %l %n %p %r %u" + # Actual exec value is "%C %d %h %L %l %n %p %r %u" parts = ( + "bf5ba06778434a9384ee4217e462f64888bd0cd2", expanduser("~"), "configured", "local", diff --git a/tests/test_pkey.py b/tests/test_pkey.py index 687e776b..3bc0459a 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -23,6 +23,7 @@ Some unit tests for public/private key objects. import unittest import os +import stat from binascii import hexlify from hashlib import md5 @@ -36,10 +37,11 @@ from paramiko import ( SSHException, ) from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2 +from paramiko.common import o600 from cryptography.exceptions import UnsupportedAlgorithm from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateNumbers -from mock import patch +from mock import patch, Mock import pytest from .util import _support, is_low_entropy @@ -702,6 +704,62 @@ class KeyTest(unittest.TestCase): _support("test_rsa.key-cert.pub"), ) + @patch("paramiko.pkey.os") + def _test_keyfile_race(self, os_, exists): + # Re: CVE-2022-24302 + password = "television" + newpassword = "radio" + source = _support("test_ecdsa_384.key") + new = source + ".new" + # Mock setup + os_.path.exists.return_value = exists + # Attach os flag values to mock + for attr, value in vars(os).items(): + if attr.startswith("O_"): + setattr(os_, attr, value) + # Load fixture key + key = ECDSAKey(filename=source, password=password) + key._write_private_key = Mock() + # Write out in new location + key.write_private_key_file(new, password=newpassword) + # Expected open via os module + os_.open.assert_called_once_with( + new, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, o600 + ) + os_.fdopen.assert_called_once_with(os_.open.return_value, "w") + # Old chmod still around for backwards compat + os_.chmod.assert_called_once_with(new, o600) + assert ( + key._write_private_key.call_args[0][0] + == os_.fdopen.return_value.__enter__.return_value + ) + + def test_new_keyfiles_avoid_file_descriptor_race_on_chmod(self): + self._test_keyfile_race(exists=False) + + def test_existing_keyfiles_still_work_ok(self): + self._test_keyfile_race(exists=True) + + def test_new_keyfiles_avoid_descriptor_race_integration(self): + # Integration-style version of above + password = "television" + newpassword = "radio" + source = _support("test_ecdsa_384.key") + new = source + ".new" + # Load fixture key + key = ECDSAKey(filename=source, password=password) + try: + # Write out in new location + key.write_private_key_file(new, password=newpassword) + # Test mode + assert stat.S_IMODE(os.stat(new).st_mode) == o600 + # Prove can open with new password + reloaded = ECDSAKey(filename=new, password=newpassword) + assert reloaded == key + finally: + if os.path.exists(new): + os.unlink(new) + def test_sign_rsa_with_certificate(self): data = b"ice weasels" key_path = _support(os.path.join("cert_support", "test_rsa.key")) diff --git a/tests/test_transport.py b/tests/test_transport.py index 737ef705..fa7a3c1a 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -1121,7 +1121,12 @@ class AlgorithmDisablingTests(unittest.TestCase): t = Transport(sock=Mock()) assert t.preferred_ciphers == t._preferred_ciphers assert t.preferred_macs == t._preferred_macs - assert t.preferred_keys == t._preferred_keys + assert t.preferred_keys == tuple( + t._preferred_keys + + tuple( + "{}-cert-v01@openssh.com".format(x) for x in t._preferred_keys + ) + ) assert t.preferred_kex == t._preferred_kex def test_preferred_lists_filter_disabled_algorithms(self): @@ -1140,6 +1145,7 @@ class AlgorithmDisablingTests(unittest.TestCase): assert "hmac-md5" not in t.preferred_macs assert "ssh-dss" in t._preferred_keys assert "ssh-dss" not in t.preferred_keys + assert "ssh-dss-cert-v01@openssh.com" not in t.preferred_keys assert "diffie-hellman-group14-sha256" in t._preferred_kex assert "diffie-hellman-group14-sha256" not in t.preferred_kex |