diff options
author | Jeff Forcier <jeff@bitprophet.org> | 2022-04-25 10:32:31 -0400 |
---|---|---|
committer | Jeff Forcier <jeff@bitprophet.org> | 2022-04-25 10:32:31 -0400 |
commit | 9b22c28a02e5ae0291857a7fb9051cf942280731 (patch) | |
tree | 74a2aa4583f2917ee73cdbe4b4ca9b0cc280b96a /tests | |
parent | ab335cdab8d6dc218e5d8658c3b32f4e7d0d74e5 (diff) | |
parent | df1701c1834cae333d5e6d9f41b0a4bea3da72e4 (diff) |
Merge branch 'main' into 1951-int
Diffstat (limited to 'tests')
-rw-r--r-- | tests/configs/match-exec | 2 | ||||
-rw-r--r-- | tests/loop.py | 2 | ||||
-rw-r--r-- | tests/stub_sftp.py | 2 | ||||
-rw-r--r-- | tests/test_agent.py | 50 | ||||
-rw-r--r-- | tests/test_auth.py | 2 | ||||
-rw-r--r-- | tests/test_buffered_pipe.py | 2 | ||||
-rw-r--r-- | tests/test_client.py | 22 | ||||
-rw-r--r-- | tests/test_config.py | 30 | ||||
-rw-r--r-- | tests/test_file.py | 2 | ||||
-rw-r--r-- | tests/test_gssapi.py | 2 | ||||
-rw-r--r-- | tests/test_hostkeys.py | 2 | ||||
-rw-r--r-- | tests/test_kex.py | 5 | ||||
-rw-r--r-- | tests/test_kex_gss.py | 2 | ||||
-rw-r--r-- | tests/test_message.py | 2 | ||||
-rw-r--r-- | tests/test_packetizer.py | 2 | ||||
-rw-r--r-- | tests/test_pkey.py | 116 | ||||
-rw-r--r-- | tests/test_sftp.py | 2 | ||||
-rw-r--r-- | tests/test_sftp_big.py | 2 | ||||
-rw-r--r-- | tests/test_ssh_gss.py | 2 | ||||
-rw-r--r-- | tests/test_transport.py | 300 | ||||
-rw-r--r-- | tests/test_util.py | 2 | ||||
-rw-r--r-- | tests/util.py | 32 |
22 files changed, 544 insertions, 41 deletions
diff --git a/tests/configs/match-exec b/tests/configs/match-exec index 763346ea..62a147aa 100644 --- a/tests/configs/match-exec +++ b/tests/configs/match-exec @@ -12,5 +12,5 @@ Host target User intermediate HostName configured -Match exec "%d %h %L %l %n %p %r %u" +Match exec "%C %d %h %L %l %n %p %r %u" Port 1337 diff --git a/tests/loop.py b/tests/loop.py index dd1f5a0c..87fb089a 100644 --- a/tests/loop.py +++ b/tests/loop.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import socket import threading diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py index 1528a0b8..0c0372e9 100644 --- a/tests/stub_sftp.py +++ b/tests/stub_sftp.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ A stub SFTP server for loopback SFTP testing. diff --git a/tests/test_agent.py b/tests/test_agent.py new file mode 100644 index 00000000..c3973bbb --- /dev/null +++ b/tests/test_agent.py @@ -0,0 +1,50 @@ +import unittest + +from paramiko.message import Message +from paramiko.agent import ( + SSH2_AGENT_SIGN_RESPONSE, + cSSH2_AGENTC_SIGN_REQUEST, + SSH_AGENT_RSA_SHA2_256, + SSH_AGENT_RSA_SHA2_512, + AgentKey, +) +from paramiko.py3compat import b + + +class ChaosAgent: + def _send_message(self, msg): + self._sent_message = msg + sig = Message() + sig.add_string(b("lol")) + sig.rewind() + return SSH2_AGENT_SIGN_RESPONSE, sig + + +class AgentTests(unittest.TestCase): + def _sign_with_agent(self, kwargs, expectation): + agent = ChaosAgent() + key = AgentKey(agent, b("secret!!!")) + result = key.sign_ssh_data(b("token"), **kwargs) + assert result == b("lol") + msg = agent._sent_message + msg.rewind() + assert msg.get_byte() == cSSH2_AGENTC_SIGN_REQUEST + assert msg.get_string() == b("secret!!!") + assert msg.get_string() == b("token") + assert msg.get_int() == expectation + + def test_agent_signing_defaults_to_0_for_flags_field(self): + # No algorithm kwarg at all + self._sign_with_agent(kwargs=dict(), expectation=0) + + def test_agent_signing_is_2_for_SHA256(self): + self._sign_with_agent( + kwargs=dict(algorithm="rsa-sha2-256"), + expectation=SSH_AGENT_RSA_SHA2_256, + ) + + def test_agent_signing_is_2_for_SHA512(self): + self._sign_with_agent( + kwargs=dict(algorithm="rsa-sha2-512"), + expectation=SSH_AGENT_RSA_SHA2_512, + ) diff --git a/tests/test_auth.py b/tests/test_auth.py index 01fbac5b..0f0a6169 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Some unit tests for authenticating over a Transport. diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py index 61c99cc0..35e2cded 100644 --- a/tests/test_buffered_pipe.py +++ b/tests/test_buffered_pipe.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Some unit tests for BufferedPipe. diff --git a/tests/test_client.py b/tests/test_client.py index f14aac23..1c28d824 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Some unit tests for SSHClient. @@ -41,7 +41,7 @@ from paramiko import SSHClient from paramiko.pkey import PublicBlob from paramiko.ssh_exception import SSHException, AuthenticationException -from .util import _support, slow +from .util import _support, requires_sha1_signing, slow requires_gss_auth = unittest.skipUnless( @@ -235,33 +235,39 @@ class ClientTest(unittest.TestCase): class SSHClientTest(ClientTest): + @requires_sha1_signing def test_client(self): """ verify that the SSHClient stuff works too. """ self._test_connection(password="pygmalion") + @requires_sha1_signing def test_client_dsa(self): """ verify that SSHClient works with a DSA key. """ self._test_connection(key_filename=_support("test_dss.key")) + @requires_sha1_signing def test_client_rsa(self): """ verify that SSHClient works with an RSA key. """ self._test_connection(key_filename=_support("test_rsa.key")) + @requires_sha1_signing def test_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ self._test_connection(key_filename=_support("test_ecdsa_256.key")) + @requires_sha1_signing def test_client_ed25519(self): self._test_connection(key_filename=_support("test_ed25519.key")) + @requires_sha1_signing def test_multiple_key_files(self): """ verify that SSHClient accepts and tries multiple key files. @@ -293,6 +299,7 @@ class SSHClientTest(ClientTest): self.tearDown() self.setUp() + @requires_sha1_signing def test_multiple_key_files_failure(self): """ Expect failure when multiple keys in play and none are accepted @@ -306,6 +313,7 @@ class SSHClientTest(ClientTest): allowed_keys=["ecdsa-sha2-nistp256"], ) + @requires_sha1_signing def test_certs_allowed_as_key_filename_values(self): # NOTE: giving cert path here, not key path. (Key path test is below. # They're similar except for which path is given; the expected auth and @@ -319,6 +327,7 @@ class SSHClientTest(ClientTest): public_blob=PublicBlob.from_file(cert_path), ) + @requires_sha1_signing def test_certs_implicitly_loaded_alongside_key_filename_keys(self): # NOTE: a regular test_connection() w/ test_rsa.key would incidentally # test this (because test_xxx.key-cert.pub exists) but incidental tests @@ -469,6 +478,7 @@ class SSHClientTest(ClientTest): kwargs = dict(self.connect_kwargs, banner_timeout=0.5) self.assertRaises(paramiko.SSHException, self.tc.connect, **kwargs) + @requires_sha1_signing def test_auth_trickledown(self): """ Failed key auth doesn't prevent subsequent pw auth from succeeding @@ -489,6 +499,7 @@ class SSHClientTest(ClientTest): ) self._test_connection(**kwargs) + @requires_sha1_signing @slow def test_auth_timeout(self): """ @@ -591,6 +602,7 @@ class SSHClientTest(ClientTest): host_key = paramiko.ECDSAKey.generate() self._client_host_key_bad(host_key) + @requires_sha1_signing def test_host_key_negotiation_2(self): host_key = paramiko.RSAKey.generate(2048) self._client_host_key_bad(host_key) @@ -598,6 +610,7 @@ class SSHClientTest(ClientTest): def test_host_key_negotiation_3(self): self._client_host_key_good(paramiko.ECDSAKey, "test_ecdsa_256.key") + @requires_sha1_signing def test_host_key_negotiation_4(self): self._client_host_key_good(paramiko.RSAKey, "test_rsa.key") @@ -681,6 +694,7 @@ class PasswordPassphraseTests(ClientTest): # instead of suffering a real connection cycle. # TODO: in that case, move the below to be part of an integration suite? + @requires_sha1_signing def test_password_kwarg_works_for_password_auth(self): # Straightforward / duplicate of earlier basic password test. self._test_connection(password="pygmalion") @@ -688,10 +702,12 @@ class PasswordPassphraseTests(ClientTest): # TODO: more granular exception pending #387; should be signaling "no auth # methods available" because no key and no password @raises(SSHException) + @requires_sha1_signing def test_passphrase_kwarg_not_used_for_password_auth(self): # Using the "right" password in the "wrong" field shouldn't work. self._test_connection(passphrase="pygmalion") + @requires_sha1_signing def test_passphrase_kwarg_used_for_key_passphrase(self): # Straightforward again, with new passphrase kwarg. self._test_connection( @@ -699,6 +715,7 @@ class PasswordPassphraseTests(ClientTest): passphrase="television", ) + @requires_sha1_signing def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given( self ): # noqa @@ -709,6 +726,7 @@ class PasswordPassphraseTests(ClientTest): ) @raises(AuthenticationException) # TODO: more granular + @requires_sha1_signing def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( # noqa self ): diff --git a/tests/test_config.py b/tests/test_config.py index 5e9aa059..b46dc7b4 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -42,6 +42,7 @@ def socket(): # Patch out getfqdn to return some real string for when it gets called; # some code (eg tokenization) gets mad w/ MagicMocks mocket.getfqdn.return_value = "some.fake.fqdn" + mocket.gethostname.return_value = "local.fake.fqdn" yield mocket @@ -206,7 +207,7 @@ Host test assert got == expected @patch("paramiko.config.getpass") - def test_controlpath_token_expansion(self, getpass): + def test_controlpath_token_expansion(self, getpass, socket): getpass.getuser.return_value = "gandalf" config = SSHConfig.from_text( """ @@ -217,6 +218,9 @@ Host explicit_user Host explicit_host HostName ohai ControlPath remoteuser %r host %h orighost %n + +Host hashbrowns + ControlPath %C """ ) result = config.lookup("explicit_user")["controlpath"] @@ -225,6 +229,9 @@ Host explicit_host result = config.lookup("explicit_host")["controlpath"] # Remote user falls back to local user; host and orighost may differ assert result == "remoteuser gandalf host ohai orighost explicit_host" + # Supports %C + result = config.lookup("hashbrowns")["controlpath"] + assert result == "a438e7dbf5308b923aba9db8fe2ca63447ac8688" def test_negation(self): config = SSHConfig.from_text( @@ -276,10 +283,11 @@ ProxyCommand foo=bar:%h-%p assert config.lookup(host) == values - def test_identityfile(self): + @patch("paramiko.config.getpass") + def test_identityfile(self, getpass, socket): + getpass.getuser.return_value = "gandalf" config = SSHConfig.from_text( """ - IdentityFile id_dsa0 Host * @@ -290,6 +298,9 @@ IdentityFile id_dsa2 Host dsa2* IdentityFile id_dsa22 + +Host hashbrowns +IdentityFile %C """ ) for host, values in { @@ -302,8 +313,15 @@ IdentityFile id_dsa22 "hostname": "dsa22", "identityfile": ["id_dsa0", "id_dsa1", "id_dsa22"], }, + "hashbrowns": { + "hostname": "hashbrowns", + "identityfile": [ + "id_dsa0", + "id_dsa1", + "a438e7dbf5308b923aba9db8fe2ca63447ac8688", + ], + }, }.items(): - assert config.lookup(host) == values def test_config_addressfamily_and_lazy_fqdn(self): @@ -739,10 +757,10 @@ class TestMatchExec(object): @patch("paramiko.config.getpass") @patch("paramiko.config.invoke.run") def test_tokenizes_argument(self, run, getpass, socket): - socket.gethostname.return_value = "local.fqdn" getpass.getuser.return_value = "gandalf" - # Actual exec value is "%d %h %L %l %n %p %r %u" + # Actual exec value is "%C %d %h %L %l %n %p %r %u" parts = ( + "bf5ba06778434a9384ee4217e462f64888bd0cd2", expanduser("~"), "configured", "local", diff --git a/tests/test_file.py b/tests/test_file.py index 2a3da74b..d4062c02 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Some unit tests for the BufferedFile abstraction. diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index acdc7c82..23c3ef42 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -16,7 +16,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Test the used APIs for GSS-API / SSPI authentication diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py index 41a9244f..723ea1a5 100644 --- a/tests/test_hostkeys.py +++ b/tests/test_hostkeys.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Some unit tests for HostKeys. diff --git a/tests/test_kex.py b/tests/test_kex.py index c251611a..b6463558 100644 --- a/tests/test_kex.py +++ b/tests/test_kex.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Some unit tests for the key exchange protocols. @@ -76,7 +76,7 @@ class FakeKey(object): def asbytes(self): return b"fake-key" - def sign_ssh_data(self, H): + def sign_ssh_data(self, H, algorithm): return b"fake-sig" @@ -93,6 +93,7 @@ class FakeTransport(object): remote_version = "SSH-2.0-lame" local_kex_init = "local-kex-init" remote_kex_init = "remote-kex-init" + host_key_type = "fake-key" def _send_message(self, m): self._message = m diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index 6f5625dc..26659ae3 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -17,7 +17,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Unit Tests for the GSS-API / SSPI SSHv2 Diffie-Hellman Key Exchange and user diff --git a/tests/test_message.py b/tests/test_message.py index 57766d90..23b06858 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Some unit tests for ssh protocol message blocks. diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py index de80770e..27dee358 100644 --- a/tests/test_packetizer.py +++ b/tests/test_packetizer.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Some unit tests for the ssh2 protocol in Transport. diff --git a/tests/test_pkey.py b/tests/test_pkey.py index 94b2492b..738e8cf0 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -15,7 +15,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Some unit tests for public/private key objects. @@ -23,6 +23,7 @@ Some unit tests for public/private key objects. import unittest import os +import stat from binascii import hexlify from hashlib import md5 @@ -36,13 +37,14 @@ from paramiko import ( SSHException, ) from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2 +from paramiko.common import o600 from cryptography.exceptions import UnsupportedAlgorithm from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateNumbers -from mock import patch +from mock import patch, Mock import pytest -from .util import _support, is_low_entropy +from .util import _support, is_low_entropy, requires_sha1_signing # from openssh's ssh-keygen @@ -63,6 +65,8 @@ FINGER_ECDSA_256 = "256 25:19:eb:55:e6:a1:47:ff:4f:38:d2:75:6f:a5:d5:60" FINGER_ECDSA_384 = "384 c1:8d:a0:59:09:47:41:8e:a8:a6:07:01:29:23:b4:65" FINGER_ECDSA_521 = "521 44:58:22:52:12:33:16:0e:ce:0e:be:2c:7c:7e:cc:1e" SIGNED_RSA = "20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8" # noqa +SIGNED_RSA_256 = "cc:6:60:e0:0:2c:ac:9e:26:bc:d5:68:64:3f:9f:a7:e5:aa:41:eb:88:4a:25:5:9c:93:84:66:ef:ef:60:f4:34:fb:f4:c8:3d:55:33:6a:77:bd:b2:ee:83:f:71:27:41:7e:f5:7:5:0:a9:4c:7:80:6f:be:76:67:cb:58:35:b9:2b:f3:c2:d3:3c:ee:e1:3f:59:e0:fa:e4:5c:92:ed:ae:74:de:d:d6:27:16:8f:84:a3:86:68:c:94:90:7d:6e:cc:81:12:d8:b6:ad:aa:31:a8:13:3d:63:81:3e:bb:5:b6:38:4d:2:d:1b:5b:70:de:83:cc:3a:cb:31" # noqa +SIGNED_RSA_512 = "87:46:8b:75:92:33:78:a0:22:35:32:39:23:c6:ab:e1:6:92:ad:bc:7f:6e:ab:19:32:e4:78:b2:2c:8f:1d:c:65:da:fc:a5:7:ca:b6:55:55:31:83:b1:a0:af:d1:95:c5:2e:af:56:ba:f5:41:64:f:39:9d:af:82:43:22:8f:90:52:9d:89:e7:45:97:df:f3:f2:bc:7b:3a:db:89:e:34:fd:18:62:25:1b:ef:77:aa:c6:6c:99:36:3a:84:d6:9c:2a:34:8c:7f:f4:bb:c9:a5:9a:6c:11:f2:cf:da:51:5e:1e:7f:90:27:34:de:b2:f3:15:4f:db:47:32:6b:a7" # noqa FINGER_RSA_2K_OPENSSH = "2048 68:d1:72:01:bf:c0:0c:66:97:78:df:ce:75:74:46:d6" FINGER_DSS_1K_OPENSSH = "1024 cf:1d:eb:d7:61:d3:12:94:c6:c0:c6:54:35:35:b0:82" FINGER_EC_384_OPENSSH = "384 72:14:df:c1:9a:c3:e6:0e:11:29:d6:32:18:7b:ea:9b" @@ -238,21 +242,30 @@ class KeyTest(unittest.TestCase): self.assertTrue(not pub.can_sign()) self.assertEqual(key, pub) - def test_sign_rsa(self): - # verify that the rsa private key can sign and verify + def _sign_and_verify_rsa(self, algorithm, saved_sig): key = RSAKey.from_private_key_file(_support("test_rsa.key")) - msg = key.sign_ssh_data(b"ice weasels") - self.assertTrue(type(msg) is Message) + msg = key.sign_ssh_data(b"ice weasels", algorithm) + assert isinstance(msg, Message) msg.rewind() - self.assertEqual("ssh-rsa", msg.get_text()) - sig = bytes().join( - [byte_chr(int(x, 16)) for x in SIGNED_RSA.split(":")] + assert msg.get_text() == algorithm + expected = bytes().join( + [byte_chr(int(x, 16)) for x in saved_sig.split(":")] ) - self.assertEqual(sig, msg.get_binary()) + assert msg.get_binary() == expected msg.rewind() pub = RSAKey(data=key.asbytes()) self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg)) + @requires_sha1_signing + def test_sign_and_verify_ssh_rsa(self): + self._sign_and_verify_rsa("ssh-rsa", SIGNED_RSA) + + def test_sign_and_verify_rsa_sha2_512(self): + self._sign_and_verify_rsa("rsa-sha2-512", SIGNED_RSA_512) + + def test_sign_and_verify_rsa_sha2_256(self): + self._sign_and_verify_rsa("rsa-sha2-256", SIGNED_RSA_256) + def test_sign_dss(self): # verify that the dss private key can sign and verify key = DSSKey.from_private_key_file(_support("test_dss.key")) @@ -268,6 +281,7 @@ class KeyTest(unittest.TestCase): pub = DSSKey(data=key.asbytes()) self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg)) + @requires_sha1_signing def test_generate_rsa(self): key = RSAKey.generate(1024) msg = key.sign_ssh_data(b"jerri blank") @@ -612,6 +626,11 @@ class KeyTest(unittest.TestCase): for key1, key2 in self.keys(): assert key1 == key2 + def test_keys_are_not_equal_to_other(self): + for value in [None, True, ""]: + for key1, _ in self.keys(): + assert key1 != value + def test_keys_are_hashable(self): # NOTE: this isn't a great test due to hashseed randomization under # Python 3 preventing use of static values, but it does still prove @@ -686,3 +705,78 @@ class KeyTest(unittest.TestCase): key1.load_certificate, _support("test_rsa.key-cert.pub"), ) + + @patch("paramiko.pkey.os") + def _test_keyfile_race(self, os_, exists): + # Re: CVE-2022-24302 + password = "television" + newpassword = "radio" + source = _support("test_ecdsa_384.key") + new = source + ".new" + # Mock setup + os_.path.exists.return_value = exists + # Attach os flag values to mock + for attr, value in vars(os).items(): + if attr.startswith("O_"): + setattr(os_, attr, value) + # Load fixture key + key = ECDSAKey(filename=source, password=password) + key._write_private_key = Mock() + # Write out in new location + key.write_private_key_file(new, password=newpassword) + # Expected open via os module + os_.open.assert_called_once_with( + new, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, o600 + ) + os_.fdopen.assert_called_once_with(os_.open.return_value, "w") + # Old chmod still around for backwards compat + os_.chmod.assert_called_once_with(new, o600) + assert ( + key._write_private_key.call_args[0][0] + == os_.fdopen.return_value.__enter__.return_value + ) + + def test_new_keyfiles_avoid_file_descriptor_race_on_chmod(self): + self._test_keyfile_race(exists=False) + + def test_existing_keyfiles_still_work_ok(self): + self._test_keyfile_race(exists=True) + + def test_new_keyfiles_avoid_descriptor_race_integration(self): + # Integration-style version of above + password = "television" + newpassword = "radio" + source = _support("test_ecdsa_384.key") + new = source + ".new" + # Load fixture key + key = ECDSAKey(filename=source, password=password) + try: + # Write out in new location + key.write_private_key_file(new, password=newpassword) + # Test mode + assert stat.S_IMODE(os.stat(new).st_mode) == o600 + # Prove can open with new password + reloaded = ECDSAKey(filename=new, password=newpassword) + assert reloaded == key + finally: + if os.path.exists(new): + os.unlink(new) + + def test_sign_rsa_with_certificate(self): + data = b"ice weasels" + key_path = _support(os.path.join("cert_support", "test_rsa.key")) + key = RSAKey.from_private_key_file(key_path) + msg = key.sign_ssh_data(data, "rsa-sha2-256") + msg.rewind() + assert "rsa-sha2-256" == msg.get_text() + sign = msg.get_binary() + cert_path = _support( + os.path.join("cert_support", "test_rsa.key-cert.pub") + ) + key.load_certificate(cert_path) + msg = key.sign_ssh_data(data, "rsa-sha2-256-cert-v01@openssh.com") + msg.rewind() + assert "rsa-sha2-256" == msg.get_text() + assert sign == msg.get_binary() + msg.rewind() + assert key.verify_ssh_sig(b"ice weasels", msg) diff --git a/tests/test_sftp.py b/tests/test_sftp.py index 6134d070..0650e8db 100644 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ some unit tests to make sure sftp works. diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py index fc556faf..4643bcaa 100644 --- a/tests/test_sftp_big.py +++ b/tests/test_sftp_big.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ some unit tests to make sure sftp works well with large files. diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index 92801c20..4d171854 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -17,7 +17,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Unit Tests for the GSS-API / SSPI SSHv2 Authentication (gssapi-with-mic) diff --git a/tests/test_transport.py b/tests/test_transport.py index e2174896..8e5f8cd7 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Some unit tests for the ssh2 protocol in Transport. @@ -23,6 +23,7 @@ Some unit tests for the ssh2 protocol in Transport. from __future__ import with_statement from binascii import hexlify +from contextlib import contextmanager import select import socket import time @@ -38,6 +39,8 @@ from paramiko import ( Packetizer, RSAKey, SSHException, + AuthenticationException, + IncompatiblePeer, SecurityOptions, ServerInterface, Transport, @@ -58,7 +61,7 @@ from paramiko.common import ( from paramiko.py3compat import bytes, byte_chr from paramiko.message import Message -from .util import needs_builtin, _support, slow +from .util import needs_builtin, _support, requires_sha1_signing, slow from .loop import LoopSocket @@ -80,6 +83,9 @@ class NullServer(ServerInterface): paranoid_did_public_key = False paranoid_key = DSSKey.from_private_key_file(_support("test_dss.key")) + def __init__(self, allowed_keys=None): + self.allowed_keys = allowed_keys if allowed_keys is not None else [] + def get_allowed_auths(self, username): if username == "slowdive": return "publickey,password" @@ -90,6 +96,11 @@ class NullServer(ServerInterface): return AUTH_SUCCESSFUL return AUTH_FAILED + def check_auth_publickey(self, username, key): + if key in self.allowed_keys: + return AUTH_SUCCESSFUL + return AUTH_FAILED + def check_channel_request(self, kind, chanid): if kind == "bogus": return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED @@ -154,6 +165,7 @@ class TransportTest(unittest.TestCase): self.socks.close() self.sockc.close() + # TODO: unify with newer contextmanager def setup_test_server( self, client_options=None, server_options=None, connect_kwargs=None ): @@ -245,7 +257,7 @@ class TransportTest(unittest.TestCase): self.assertEqual(True, self.tc.is_authenticated()) self.assertEqual(True, self.ts.is_authenticated()) - def testa_long_banner(self): + def test_long_banner(self): """ verify that a long banner doesn't mess up the handshake. """ @@ -339,7 +351,7 @@ class TransportTest(unittest.TestCase): self.assertEqual("This is on stderr.\n", f.readline()) self.assertEqual("", f.readline()) - def testa_channel_can_be_used_as_context_manager(self): + def test_channel_can_be_used_as_context_manager(self): """ verify that exec_command() does something reasonable. """ @@ -1109,7 +1121,12 @@ class AlgorithmDisablingTests(unittest.TestCase): t = Transport(sock=Mock()) assert t.preferred_ciphers == t._preferred_ciphers assert t.preferred_macs == t._preferred_macs - assert t.preferred_keys == t._preferred_keys + assert t.preferred_keys == tuple( + t._preferred_keys + + tuple( + "{}-cert-v01@openssh.com".format(x) for x in t._preferred_keys + ) + ) assert t.preferred_kex == t._preferred_kex def test_preferred_lists_filter_disabled_algorithms(self): @@ -1128,6 +1145,7 @@ class AlgorithmDisablingTests(unittest.TestCase): assert "hmac-md5" not in t.preferred_macs assert "ssh-dss" in t._preferred_keys assert "ssh-dss" not in t.preferred_keys + assert "ssh-dss-cert-v01@openssh.com" not in t.preferred_keys assert "diffie-hellman-group14-sha256" in t._preferred_kex assert "diffie-hellman-group14-sha256" not in t.preferred_kex @@ -1169,3 +1187,275 @@ class AlgorithmDisablingTests(unittest.TestCase): assert "ssh-dss" not in server_keys assert "diffie-hellman-group14-sha256" not in kexen assert "zlib" not in compressions + + +@contextmanager +def server( + hostkey=None, + init=None, + server_init=None, + client_init=None, + connect=None, + pubkeys=None, + catch_error=False, +): + """ + SSH server contextmanager for testing. + + :param hostkey: + Host key to use for the server; if None, loads + ``test_rsa.key``. + :param init: + Default `Transport` constructor kwargs to use for both sides. + :param server_init: + Extends and/or overrides ``init`` for server transport only. + :param client_init: + Extends and/or overrides ``init`` for client transport only. + :param connect: + Kwargs to use for ``connect()`` on the client. + :param pubkeys: + List of public keys for auth. + :param catch_error: + Whether to capture connection errors & yield from contextmanager. + Necessary for connection_time exception testing. + """ + if init is None: + init = {} + if server_init is None: + server_init = {} + if client_init is None: + client_init = {} + if connect is None: + connect = dict(username="slowdive", password="pygmalion") + socks = LoopSocket() + sockc = LoopSocket() + sockc.link(socks) + tc = Transport(sockc, **dict(init, **client_init)) + ts = Transport(socks, **dict(init, **server_init)) + + if hostkey is None: + hostkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + ts.add_server_key(hostkey) + event = threading.Event() + server = NullServer(allowed_keys=pubkeys) + assert not event.is_set() + assert not ts.is_active() + assert tc.get_username() is None + assert ts.get_username() is None + assert not tc.is_authenticated() + assert not ts.is_authenticated() + + err = None + # Trap errors and yield instead of raising right away; otherwise callers + # cannot usefully deal with problems at connect time which stem from errors + # in the server side. + try: + ts.start_server(event, server) + tc.connect(**connect) + + event.wait(1.0) + assert event.is_set() + assert ts.is_active() + assert tc.is_active() + + except Exception as e: + if not catch_error: + raise + err = e + + yield (tc, ts, err) if catch_error else (tc, ts) + + tc.close() + ts.close() + socks.close() + sockc.close() + + +_disable_sha2 = dict( + disabled_algorithms=dict(keys=["rsa-sha2-256", "rsa-sha2-512"]) +) +_disable_sha1 = dict(disabled_algorithms=dict(keys=["ssh-rsa"])) +_disable_sha2_pubkey = dict( + disabled_algorithms=dict(pubkeys=["rsa-sha2-256", "rsa-sha2-512"]) +) +_disable_sha1_pubkey = dict(disabled_algorithms=dict(pubkeys=["ssh-rsa"])) + + +class TestSHA2SignatureKeyExchange(unittest.TestCase): + # NOTE: these all rely on the default server() hostkey being RSA + # NOTE: these rely on both sides being properly implemented re: agreed-upon + # hostkey during kex being what's actually used. Truly proving that eg + # SHA512 was used, is quite difficult w/o super gross hacks. However, there + # are new tests in test_pkey.py which use known signature blobs to prove + # the SHA2 family was in fact used! + + @requires_sha1_signing + def test_base_case_ssh_rsa_still_used_as_fallback(self): + # Prove that ssh-rsa is used if either, or both, participants have SHA2 + # algorithms disabled + for which in ("init", "client_init", "server_init"): + with server(**{which: _disable_sha2}) as (tc, _): + assert tc.host_key_type == "ssh-rsa" + + def test_kex_with_sha2_512(self): + # It's the default! + with server() as (tc, _): + assert tc.host_key_type == "rsa-sha2-512" + + def test_kex_with_sha2_256(self): + # No 512 -> you get 256 + with server( + init=dict(disabled_algorithms=dict(keys=["rsa-sha2-512"])) + ) as (tc, _): + assert tc.host_key_type == "rsa-sha2-256" + + def _incompatible_peers(self, client_init, server_init): + with server( + client_init=client_init, server_init=server_init, catch_error=True + ) as (tc, ts, err): + # If neither side blew up then that's bad! + assert err is not None + # If client side blew up first, it'll be straightforward + if isinstance(err, IncompatiblePeer): + pass + # If server side blew up first, client sees EOF & we need to check + # the server transport for its saved error (otherwise it can only + # appear in log output) + elif isinstance(err, EOFError): + assert ts.saved_exception is not None + assert isinstance(ts.saved_exception, IncompatiblePeer) + # If it was something else, welp + else: + raise err + + def test_client_sha2_disabled_server_sha1_disabled_no_match(self): + self._incompatible_peers( + client_init=_disable_sha2, server_init=_disable_sha1 + ) + + def test_client_sha1_disabled_server_sha2_disabled_no_match(self): + self._incompatible_peers( + client_init=_disable_sha1, server_init=_disable_sha2 + ) + + def test_explicit_client_hostkey_not_limited(self): + # Be very explicit about the hostkey on BOTH ends, + # and ensure it still ends up choosing sha2-512. + # (This is a regression test vs previous implementation which overwrote + # the entire preferred-hostkeys structure when given an explicit key as + # a client.) + hostkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + with server(hostkey=hostkey, connect=dict(hostkey=hostkey)) as (tc, _): + assert tc.host_key_type == "rsa-sha2-512" + + +class TestExtInfo(unittest.TestCase): + def test_ext_info_handshake(self): + with server() as (tc, _): + kex = tc._get_latest_kex_init() + assert kex["kex_algo_list"][-1] == "ext-info-c" + assert tc.server_extensions == { + "server-sig-algs": b"ssh-ed25519,ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521,rsa-sha2-512,rsa-sha2-256,ssh-rsa,ssh-dss" # noqa + } + + def test_client_uses_server_sig_algs_for_pubkey_auth(self): + privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + server_init=dict( + disabled_algorithms=dict(pubkeys=["rsa-sha2-512"]) + ), + ) as (tc, _): + assert tc.is_authenticated() + # Client settled on 256 despite itself not having 512 disabled + assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" + + +# TODO: these could move into test_auth.py but that badly needs refactoring +# with this module anyways... +class TestSHA2SignaturePubkeys(unittest.TestCase): + def test_pubkey_auth_honors_disabled_algorithms(self): + privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + init=dict( + disabled_algorithms=dict( + pubkeys=["ssh-rsa", "rsa-sha2-256", "rsa-sha2-512"] + ) + ), + catch_error=True, + ) as (_, _, err): + assert isinstance(err, SSHException) + assert "no RSA pubkey algorithms" in str(err) + + def test_client_sha2_disabled_server_sha1_disabled_no_match(self): + privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + client_init=_disable_sha2_pubkey, + server_init=_disable_sha1_pubkey, + catch_error=True, + ) as (tc, ts, err): + assert isinstance(err, AuthenticationException) + + def test_client_sha1_disabled_server_sha2_disabled_no_match(self): + privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + client_init=_disable_sha1_pubkey, + server_init=_disable_sha2_pubkey, + catch_error=True, + ) as (tc, ts, err): + assert isinstance(err, AuthenticationException) + + @requires_sha1_signing + def test_ssh_rsa_still_used_when_sha2_disabled(self): + privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + # NOTE: this works because key obj comparison uses public bytes + # TODO: would be nice for PKey to grow a legit "give me another obj of + # same class but just the public bits" using asbytes() + with server( + pubkeys=[privkey], connect=dict(pkey=privkey), init=_disable_sha2 + ) as (tc, _): + assert tc.is_authenticated() + + def test_sha2_512(self): + privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + init=dict( + disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-256"]) + ), + ) as (tc, ts): + assert tc.is_authenticated() + assert tc._agreed_pubkey_algorithm == "rsa-sha2-512" + + def test_sha2_256(self): + privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + init=dict( + disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"]) + ), + ) as (tc, ts): + assert tc.is_authenticated() + assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" + + def test_sha2_256_when_client_only_enables_256(self): + privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + with server( + pubkeys=[privkey], + connect=dict(pkey=privkey), + # Client-side only; server still accepts all 3. + client_init=dict( + disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"]) + ), + ) as (tc, ts): + assert tc.is_authenticated() + assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" diff --git a/tests/test_util.py b/tests/test_util.py index 8ce260d1..0e485759 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -14,7 +14,7 @@ # # You should have received a copy of the GNU Lesser General Public License # along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """ Some unit tests for utility functions. diff --git a/tests/util.py b/tests/util.py index 1355ce8a..3ec5d092 100644 --- a/tests/util.py +++ b/tests/util.py @@ -9,6 +9,10 @@ import pytest from paramiko.py3compat import builtins, PY2 from paramiko.ssh_gss import GSS_AUTH_AVAILABLE +from cryptography.exceptions import UnsupportedAlgorithm, _Reasons +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import padding, rsa tests_dir = dirname(realpath(__file__)) @@ -144,3 +148,31 @@ def is_low_entropy(): # I don't see a way to tell internally if the hash seed was set this # way, but env should be plenty sufficient, this is only for testing. return is_32bit and os.environ.get("PYTHONHASHSEED", None) == "0" + + +def sha1_signing_unsupported(): + """ + This is used to skip tests in environments where SHA-1 signing is + not supported by the backend. + """ + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + message = b"Some dummy text" + try: + private_key.sign( + message, + padding.PSS( + mgf=padding.MGF1(hashes.SHA1()), + salt_length=padding.PSS.MAX_LENGTH, + ), + hashes.SHA1(), + ) + return False + except UnsupportedAlgorithm as e: + return e._reason is _Reasons.UNSUPPORTED_HASH + + +requires_sha1_signing = unittest.skipIf( + sha1_signing_unsupported(), "SHA-1 signing not supported" +) |