summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2022-04-25 10:32:31 -0400
committerJeff Forcier <jeff@bitprophet.org>2022-04-25 10:32:31 -0400
commit9b22c28a02e5ae0291857a7fb9051cf942280731 (patch)
tree74a2aa4583f2917ee73cdbe4b4ca9b0cc280b96a /tests
parentab335cdab8d6dc218e5d8658c3b32f4e7d0d74e5 (diff)
parentdf1701c1834cae333d5e6d9f41b0a4bea3da72e4 (diff)
Merge branch 'main' into 1951-int
Diffstat (limited to 'tests')
-rw-r--r--tests/configs/match-exec2
-rw-r--r--tests/loop.py2
-rw-r--r--tests/stub_sftp.py2
-rw-r--r--tests/test_agent.py50
-rw-r--r--tests/test_auth.py2
-rw-r--r--tests/test_buffered_pipe.py2
-rw-r--r--tests/test_client.py22
-rw-r--r--tests/test_config.py30
-rw-r--r--tests/test_file.py2
-rw-r--r--tests/test_gssapi.py2
-rw-r--r--tests/test_hostkeys.py2
-rw-r--r--tests/test_kex.py5
-rw-r--r--tests/test_kex_gss.py2
-rw-r--r--tests/test_message.py2
-rw-r--r--tests/test_packetizer.py2
-rw-r--r--tests/test_pkey.py116
-rw-r--r--tests/test_sftp.py2
-rw-r--r--tests/test_sftp_big.py2
-rw-r--r--tests/test_ssh_gss.py2
-rw-r--r--tests/test_transport.py300
-rw-r--r--tests/test_util.py2
-rw-r--r--tests/util.py32
22 files changed, 544 insertions, 41 deletions
diff --git a/tests/configs/match-exec b/tests/configs/match-exec
index 763346ea..62a147aa 100644
--- a/tests/configs/match-exec
+++ b/tests/configs/match-exec
@@ -12,5 +12,5 @@ Host target
User intermediate
HostName configured
-Match exec "%d %h %L %l %n %p %r %u"
+Match exec "%C %d %h %L %l %n %p %r %u"
Port 1337
diff --git a/tests/loop.py b/tests/loop.py
index dd1f5a0c..87fb089a 100644
--- a/tests/loop.py
+++ b/tests/loop.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import socket
import threading
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py
index 1528a0b8..0c0372e9 100644
--- a/tests/stub_sftp.py
+++ b/tests/stub_sftp.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
A stub SFTP server for loopback SFTP testing.
diff --git a/tests/test_agent.py b/tests/test_agent.py
new file mode 100644
index 00000000..c3973bbb
--- /dev/null
+++ b/tests/test_agent.py
@@ -0,0 +1,50 @@
+import unittest
+
+from paramiko.message import Message
+from paramiko.agent import (
+ SSH2_AGENT_SIGN_RESPONSE,
+ cSSH2_AGENTC_SIGN_REQUEST,
+ SSH_AGENT_RSA_SHA2_256,
+ SSH_AGENT_RSA_SHA2_512,
+ AgentKey,
+)
+from paramiko.py3compat import b
+
+
+class ChaosAgent:
+ def _send_message(self, msg):
+ self._sent_message = msg
+ sig = Message()
+ sig.add_string(b("lol"))
+ sig.rewind()
+ return SSH2_AGENT_SIGN_RESPONSE, sig
+
+
+class AgentTests(unittest.TestCase):
+ def _sign_with_agent(self, kwargs, expectation):
+ agent = ChaosAgent()
+ key = AgentKey(agent, b("secret!!!"))
+ result = key.sign_ssh_data(b("token"), **kwargs)
+ assert result == b("lol")
+ msg = agent._sent_message
+ msg.rewind()
+ assert msg.get_byte() == cSSH2_AGENTC_SIGN_REQUEST
+ assert msg.get_string() == b("secret!!!")
+ assert msg.get_string() == b("token")
+ assert msg.get_int() == expectation
+
+ def test_agent_signing_defaults_to_0_for_flags_field(self):
+ # No algorithm kwarg at all
+ self._sign_with_agent(kwargs=dict(), expectation=0)
+
+ def test_agent_signing_is_2_for_SHA256(self):
+ self._sign_with_agent(
+ kwargs=dict(algorithm="rsa-sha2-256"),
+ expectation=SSH_AGENT_RSA_SHA2_256,
+ )
+
+ def test_agent_signing_is_2_for_SHA512(self):
+ self._sign_with_agent(
+ kwargs=dict(algorithm="rsa-sha2-512"),
+ expectation=SSH_AGENT_RSA_SHA2_512,
+ )
diff --git a/tests/test_auth.py b/tests/test_auth.py
index 01fbac5b..0f0a6169 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Some unit tests for authenticating over a Transport.
diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py
index 61c99cc0..35e2cded 100644
--- a/tests/test_buffered_pipe.py
+++ b/tests/test_buffered_pipe.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Some unit tests for BufferedPipe.
diff --git a/tests/test_client.py b/tests/test_client.py
index f14aac23..1c28d824 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Some unit tests for SSHClient.
@@ -41,7 +41,7 @@ from paramiko import SSHClient
from paramiko.pkey import PublicBlob
from paramiko.ssh_exception import SSHException, AuthenticationException
-from .util import _support, slow
+from .util import _support, requires_sha1_signing, slow
requires_gss_auth = unittest.skipUnless(
@@ -235,33 +235,39 @@ class ClientTest(unittest.TestCase):
class SSHClientTest(ClientTest):
+ @requires_sha1_signing
def test_client(self):
"""
verify that the SSHClient stuff works too.
"""
self._test_connection(password="pygmalion")
+ @requires_sha1_signing
def test_client_dsa(self):
"""
verify that SSHClient works with a DSA key.
"""
self._test_connection(key_filename=_support("test_dss.key"))
+ @requires_sha1_signing
def test_client_rsa(self):
"""
verify that SSHClient works with an RSA key.
"""
self._test_connection(key_filename=_support("test_rsa.key"))
+ @requires_sha1_signing
def test_client_ecdsa(self):
"""
verify that SSHClient works with an ECDSA key.
"""
self._test_connection(key_filename=_support("test_ecdsa_256.key"))
+ @requires_sha1_signing
def test_client_ed25519(self):
self._test_connection(key_filename=_support("test_ed25519.key"))
+ @requires_sha1_signing
def test_multiple_key_files(self):
"""
verify that SSHClient accepts and tries multiple key files.
@@ -293,6 +299,7 @@ class SSHClientTest(ClientTest):
self.tearDown()
self.setUp()
+ @requires_sha1_signing
def test_multiple_key_files_failure(self):
"""
Expect failure when multiple keys in play and none are accepted
@@ -306,6 +313,7 @@ class SSHClientTest(ClientTest):
allowed_keys=["ecdsa-sha2-nistp256"],
)
+ @requires_sha1_signing
def test_certs_allowed_as_key_filename_values(self):
# NOTE: giving cert path here, not key path. (Key path test is below.
# They're similar except for which path is given; the expected auth and
@@ -319,6 +327,7 @@ class SSHClientTest(ClientTest):
public_blob=PublicBlob.from_file(cert_path),
)
+ @requires_sha1_signing
def test_certs_implicitly_loaded_alongside_key_filename_keys(self):
# NOTE: a regular test_connection() w/ test_rsa.key would incidentally
# test this (because test_xxx.key-cert.pub exists) but incidental tests
@@ -469,6 +478,7 @@ class SSHClientTest(ClientTest):
kwargs = dict(self.connect_kwargs, banner_timeout=0.5)
self.assertRaises(paramiko.SSHException, self.tc.connect, **kwargs)
+ @requires_sha1_signing
def test_auth_trickledown(self):
"""
Failed key auth doesn't prevent subsequent pw auth from succeeding
@@ -489,6 +499,7 @@ class SSHClientTest(ClientTest):
)
self._test_connection(**kwargs)
+ @requires_sha1_signing
@slow
def test_auth_timeout(self):
"""
@@ -591,6 +602,7 @@ class SSHClientTest(ClientTest):
host_key = paramiko.ECDSAKey.generate()
self._client_host_key_bad(host_key)
+ @requires_sha1_signing
def test_host_key_negotiation_2(self):
host_key = paramiko.RSAKey.generate(2048)
self._client_host_key_bad(host_key)
@@ -598,6 +610,7 @@ class SSHClientTest(ClientTest):
def test_host_key_negotiation_3(self):
self._client_host_key_good(paramiko.ECDSAKey, "test_ecdsa_256.key")
+ @requires_sha1_signing
def test_host_key_negotiation_4(self):
self._client_host_key_good(paramiko.RSAKey, "test_rsa.key")
@@ -681,6 +694,7 @@ class PasswordPassphraseTests(ClientTest):
# instead of suffering a real connection cycle.
# TODO: in that case, move the below to be part of an integration suite?
+ @requires_sha1_signing
def test_password_kwarg_works_for_password_auth(self):
# Straightforward / duplicate of earlier basic password test.
self._test_connection(password="pygmalion")
@@ -688,10 +702,12 @@ class PasswordPassphraseTests(ClientTest):
# TODO: more granular exception pending #387; should be signaling "no auth
# methods available" because no key and no password
@raises(SSHException)
+ @requires_sha1_signing
def test_passphrase_kwarg_not_used_for_password_auth(self):
# Using the "right" password in the "wrong" field shouldn't work.
self._test_connection(passphrase="pygmalion")
+ @requires_sha1_signing
def test_passphrase_kwarg_used_for_key_passphrase(self):
# Straightforward again, with new passphrase kwarg.
self._test_connection(
@@ -699,6 +715,7 @@ class PasswordPassphraseTests(ClientTest):
passphrase="television",
)
+ @requires_sha1_signing
def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given(
self
): # noqa
@@ -709,6 +726,7 @@ class PasswordPassphraseTests(ClientTest):
)
@raises(AuthenticationException) # TODO: more granular
+ @requires_sha1_signing
def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( # noqa
self
):
diff --git a/tests/test_config.py b/tests/test_config.py
index 5e9aa059..b46dc7b4 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -42,6 +42,7 @@ def socket():
# Patch out getfqdn to return some real string for when it gets called;
# some code (eg tokenization) gets mad w/ MagicMocks
mocket.getfqdn.return_value = "some.fake.fqdn"
+ mocket.gethostname.return_value = "local.fake.fqdn"
yield mocket
@@ -206,7 +207,7 @@ Host test
assert got == expected
@patch("paramiko.config.getpass")
- def test_controlpath_token_expansion(self, getpass):
+ def test_controlpath_token_expansion(self, getpass, socket):
getpass.getuser.return_value = "gandalf"
config = SSHConfig.from_text(
"""
@@ -217,6 +218,9 @@ Host explicit_user
Host explicit_host
HostName ohai
ControlPath remoteuser %r host %h orighost %n
+
+Host hashbrowns
+ ControlPath %C
"""
)
result = config.lookup("explicit_user")["controlpath"]
@@ -225,6 +229,9 @@ Host explicit_host
result = config.lookup("explicit_host")["controlpath"]
# Remote user falls back to local user; host and orighost may differ
assert result == "remoteuser gandalf host ohai orighost explicit_host"
+ # Supports %C
+ result = config.lookup("hashbrowns")["controlpath"]
+ assert result == "a438e7dbf5308b923aba9db8fe2ca63447ac8688"
def test_negation(self):
config = SSHConfig.from_text(
@@ -276,10 +283,11 @@ ProxyCommand foo=bar:%h-%p
assert config.lookup(host) == values
- def test_identityfile(self):
+ @patch("paramiko.config.getpass")
+ def test_identityfile(self, getpass, socket):
+ getpass.getuser.return_value = "gandalf"
config = SSHConfig.from_text(
"""
-
IdentityFile id_dsa0
Host *
@@ -290,6 +298,9 @@ IdentityFile id_dsa2
Host dsa2*
IdentityFile id_dsa22
+
+Host hashbrowns
+IdentityFile %C
"""
)
for host, values in {
@@ -302,8 +313,15 @@ IdentityFile id_dsa22
"hostname": "dsa22",
"identityfile": ["id_dsa0", "id_dsa1", "id_dsa22"],
},
+ "hashbrowns": {
+ "hostname": "hashbrowns",
+ "identityfile": [
+ "id_dsa0",
+ "id_dsa1",
+ "a438e7dbf5308b923aba9db8fe2ca63447ac8688",
+ ],
+ },
}.items():
-
assert config.lookup(host) == values
def test_config_addressfamily_and_lazy_fqdn(self):
@@ -739,10 +757,10 @@ class TestMatchExec(object):
@patch("paramiko.config.getpass")
@patch("paramiko.config.invoke.run")
def test_tokenizes_argument(self, run, getpass, socket):
- socket.gethostname.return_value = "local.fqdn"
getpass.getuser.return_value = "gandalf"
- # Actual exec value is "%d %h %L %l %n %p %r %u"
+ # Actual exec value is "%C %d %h %L %l %n %p %r %u"
parts = (
+ "bf5ba06778434a9384ee4217e462f64888bd0cd2",
expanduser("~"),
"configured",
"local",
diff --git a/tests/test_file.py b/tests/test_file.py
index 2a3da74b..d4062c02 100644
--- a/tests/test_file.py
+++ b/tests/test_file.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Some unit tests for the BufferedFile abstraction.
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index acdc7c82..23c3ef42 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -16,7 +16,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Test the used APIs for GSS-API / SSPI authentication
diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py
index 41a9244f..723ea1a5 100644
--- a/tests/test_hostkeys.py
+++ b/tests/test_hostkeys.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Some unit tests for HostKeys.
diff --git a/tests/test_kex.py b/tests/test_kex.py
index c251611a..b6463558 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Some unit tests for the key exchange protocols.
@@ -76,7 +76,7 @@ class FakeKey(object):
def asbytes(self):
return b"fake-key"
- def sign_ssh_data(self, H):
+ def sign_ssh_data(self, H, algorithm):
return b"fake-sig"
@@ -93,6 +93,7 @@ class FakeTransport(object):
remote_version = "SSH-2.0-lame"
local_kex_init = "local-kex-init"
remote_kex_init = "remote-kex-init"
+ host_key_type = "fake-key"
def _send_message(self, m):
self._message = m
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index 6f5625dc..26659ae3 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -17,7 +17,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Unit Tests for the GSS-API / SSPI SSHv2 Diffie-Hellman Key Exchange and user
diff --git a/tests/test_message.py b/tests/test_message.py
index 57766d90..23b06858 100644
--- a/tests/test_message.py
+++ b/tests/test_message.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Some unit tests for ssh protocol message blocks.
diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py
index de80770e..27dee358 100644
--- a/tests/test_packetizer.py
+++ b/tests/test_packetizer.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Some unit tests for the ssh2 protocol in Transport.
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 94b2492b..738e8cf0 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -15,7 +15,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Some unit tests for public/private key objects.
@@ -23,6 +23,7 @@ Some unit tests for public/private key objects.
import unittest
import os
+import stat
from binascii import hexlify
from hashlib import md5
@@ -36,13 +37,14 @@ from paramiko import (
SSHException,
)
from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2
+from paramiko.common import o600
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateNumbers
-from mock import patch
+from mock import patch, Mock
import pytest
-from .util import _support, is_low_entropy
+from .util import _support, is_low_entropy, requires_sha1_signing
# from openssh's ssh-keygen
@@ -63,6 +65,8 @@ FINGER_ECDSA_256 = "256 25:19:eb:55:e6:a1:47:ff:4f:38:d2:75:6f:a5:d5:60"
FINGER_ECDSA_384 = "384 c1:8d:a0:59:09:47:41:8e:a8:a6:07:01:29:23:b4:65"
FINGER_ECDSA_521 = "521 44:58:22:52:12:33:16:0e:ce:0e:be:2c:7c:7e:cc:1e"
SIGNED_RSA = "20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8" # noqa
+SIGNED_RSA_256 = "cc:6:60:e0:0:2c:ac:9e:26:bc:d5:68:64:3f:9f:a7:e5:aa:41:eb:88:4a:25:5:9c:93:84:66:ef:ef:60:f4:34:fb:f4:c8:3d:55:33:6a:77:bd:b2:ee:83:f:71:27:41:7e:f5:7:5:0:a9:4c:7:80:6f:be:76:67:cb:58:35:b9:2b:f3:c2:d3:3c:ee:e1:3f:59:e0:fa:e4:5c:92:ed:ae:74:de:d:d6:27:16:8f:84:a3:86:68:c:94:90:7d:6e:cc:81:12:d8:b6:ad:aa:31:a8:13:3d:63:81:3e:bb:5:b6:38:4d:2:d:1b:5b:70:de:83:cc:3a:cb:31" # noqa
+SIGNED_RSA_512 = "87:46:8b:75:92:33:78:a0:22:35:32:39:23:c6:ab:e1:6:92:ad:bc:7f:6e:ab:19:32:e4:78:b2:2c:8f:1d:c:65:da:fc:a5:7:ca:b6:55:55:31:83:b1:a0:af:d1:95:c5:2e:af:56:ba:f5:41:64:f:39:9d:af:82:43:22:8f:90:52:9d:89:e7:45:97:df:f3:f2:bc:7b:3a:db:89:e:34:fd:18:62:25:1b:ef:77:aa:c6:6c:99:36:3a:84:d6:9c:2a:34:8c:7f:f4:bb:c9:a5:9a:6c:11:f2:cf:da:51:5e:1e:7f:90:27:34:de:b2:f3:15:4f:db:47:32:6b:a7" # noqa
FINGER_RSA_2K_OPENSSH = "2048 68:d1:72:01:bf:c0:0c:66:97:78:df:ce:75:74:46:d6"
FINGER_DSS_1K_OPENSSH = "1024 cf:1d:eb:d7:61:d3:12:94:c6:c0:c6:54:35:35:b0:82"
FINGER_EC_384_OPENSSH = "384 72:14:df:c1:9a:c3:e6:0e:11:29:d6:32:18:7b:ea:9b"
@@ -238,21 +242,30 @@ class KeyTest(unittest.TestCase):
self.assertTrue(not pub.can_sign())
self.assertEqual(key, pub)
- def test_sign_rsa(self):
- # verify that the rsa private key can sign and verify
+ def _sign_and_verify_rsa(self, algorithm, saved_sig):
key = RSAKey.from_private_key_file(_support("test_rsa.key"))
- msg = key.sign_ssh_data(b"ice weasels")
- self.assertTrue(type(msg) is Message)
+ msg = key.sign_ssh_data(b"ice weasels", algorithm)
+ assert isinstance(msg, Message)
msg.rewind()
- self.assertEqual("ssh-rsa", msg.get_text())
- sig = bytes().join(
- [byte_chr(int(x, 16)) for x in SIGNED_RSA.split(":")]
+ assert msg.get_text() == algorithm
+ expected = bytes().join(
+ [byte_chr(int(x, 16)) for x in saved_sig.split(":")]
)
- self.assertEqual(sig, msg.get_binary())
+ assert msg.get_binary() == expected
msg.rewind()
pub = RSAKey(data=key.asbytes())
self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg))
+ @requires_sha1_signing
+ def test_sign_and_verify_ssh_rsa(self):
+ self._sign_and_verify_rsa("ssh-rsa", SIGNED_RSA)
+
+ def test_sign_and_verify_rsa_sha2_512(self):
+ self._sign_and_verify_rsa("rsa-sha2-512", SIGNED_RSA_512)
+
+ def test_sign_and_verify_rsa_sha2_256(self):
+ self._sign_and_verify_rsa("rsa-sha2-256", SIGNED_RSA_256)
+
def test_sign_dss(self):
# verify that the dss private key can sign and verify
key = DSSKey.from_private_key_file(_support("test_dss.key"))
@@ -268,6 +281,7 @@ class KeyTest(unittest.TestCase):
pub = DSSKey(data=key.asbytes())
self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg))
+ @requires_sha1_signing
def test_generate_rsa(self):
key = RSAKey.generate(1024)
msg = key.sign_ssh_data(b"jerri blank")
@@ -612,6 +626,11 @@ class KeyTest(unittest.TestCase):
for key1, key2 in self.keys():
assert key1 == key2
+ def test_keys_are_not_equal_to_other(self):
+ for value in [None, True, ""]:
+ for key1, _ in self.keys():
+ assert key1 != value
+
def test_keys_are_hashable(self):
# NOTE: this isn't a great test due to hashseed randomization under
# Python 3 preventing use of static values, but it does still prove
@@ -686,3 +705,78 @@ class KeyTest(unittest.TestCase):
key1.load_certificate,
_support("test_rsa.key-cert.pub"),
)
+
+ @patch("paramiko.pkey.os")
+ def _test_keyfile_race(self, os_, exists):
+ # Re: CVE-2022-24302
+ password = "television"
+ newpassword = "radio"
+ source = _support("test_ecdsa_384.key")
+ new = source + ".new"
+ # Mock setup
+ os_.path.exists.return_value = exists
+ # Attach os flag values to mock
+ for attr, value in vars(os).items():
+ if attr.startswith("O_"):
+ setattr(os_, attr, value)
+ # Load fixture key
+ key = ECDSAKey(filename=source, password=password)
+ key._write_private_key = Mock()
+ # Write out in new location
+ key.write_private_key_file(new, password=newpassword)
+ # Expected open via os module
+ os_.open.assert_called_once_with(
+ new, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, o600
+ )
+ os_.fdopen.assert_called_once_with(os_.open.return_value, "w")
+ # Old chmod still around for backwards compat
+ os_.chmod.assert_called_once_with(new, o600)
+ assert (
+ key._write_private_key.call_args[0][0]
+ == os_.fdopen.return_value.__enter__.return_value
+ )
+
+ def test_new_keyfiles_avoid_file_descriptor_race_on_chmod(self):
+ self._test_keyfile_race(exists=False)
+
+ def test_existing_keyfiles_still_work_ok(self):
+ self._test_keyfile_race(exists=True)
+
+ def test_new_keyfiles_avoid_descriptor_race_integration(self):
+ # Integration-style version of above
+ password = "television"
+ newpassword = "radio"
+ source = _support("test_ecdsa_384.key")
+ new = source + ".new"
+ # Load fixture key
+ key = ECDSAKey(filename=source, password=password)
+ try:
+ # Write out in new location
+ key.write_private_key_file(new, password=newpassword)
+ # Test mode
+ assert stat.S_IMODE(os.stat(new).st_mode) == o600
+ # Prove can open with new password
+ reloaded = ECDSAKey(filename=new, password=newpassword)
+ assert reloaded == key
+ finally:
+ if os.path.exists(new):
+ os.unlink(new)
+
+ def test_sign_rsa_with_certificate(self):
+ data = b"ice weasels"
+ key_path = _support(os.path.join("cert_support", "test_rsa.key"))
+ key = RSAKey.from_private_key_file(key_path)
+ msg = key.sign_ssh_data(data, "rsa-sha2-256")
+ msg.rewind()
+ assert "rsa-sha2-256" == msg.get_text()
+ sign = msg.get_binary()
+ cert_path = _support(
+ os.path.join("cert_support", "test_rsa.key-cert.pub")
+ )
+ key.load_certificate(cert_path)
+ msg = key.sign_ssh_data(data, "rsa-sha2-256-cert-v01@openssh.com")
+ msg.rewind()
+ assert "rsa-sha2-256" == msg.get_text()
+ assert sign == msg.get_binary()
+ msg.rewind()
+ assert key.verify_ssh_sig(b"ice weasels", msg)
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index 6134d070..0650e8db 100644
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
some unit tests to make sure sftp works.
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index fc556faf..4643bcaa 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
some unit tests to make sure sftp works well with large files.
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index 92801c20..4d171854 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -17,7 +17,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Unit Tests for the GSS-API / SSPI SSHv2 Authentication (gssapi-with-mic)
diff --git a/tests/test_transport.py b/tests/test_transport.py
index e2174896..8e5f8cd7 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Some unit tests for the ssh2 protocol in Transport.
@@ -23,6 +23,7 @@ Some unit tests for the ssh2 protocol in Transport.
from __future__ import with_statement
from binascii import hexlify
+from contextlib import contextmanager
import select
import socket
import time
@@ -38,6 +39,8 @@ from paramiko import (
Packetizer,
RSAKey,
SSHException,
+ AuthenticationException,
+ IncompatiblePeer,
SecurityOptions,
ServerInterface,
Transport,
@@ -58,7 +61,7 @@ from paramiko.common import (
from paramiko.py3compat import bytes, byte_chr
from paramiko.message import Message
-from .util import needs_builtin, _support, slow
+from .util import needs_builtin, _support, requires_sha1_signing, slow
from .loop import LoopSocket
@@ -80,6 +83,9 @@ class NullServer(ServerInterface):
paranoid_did_public_key = False
paranoid_key = DSSKey.from_private_key_file(_support("test_dss.key"))
+ def __init__(self, allowed_keys=None):
+ self.allowed_keys = allowed_keys if allowed_keys is not None else []
+
def get_allowed_auths(self, username):
if username == "slowdive":
return "publickey,password"
@@ -90,6 +96,11 @@ class NullServer(ServerInterface):
return AUTH_SUCCESSFUL
return AUTH_FAILED
+ def check_auth_publickey(self, username, key):
+ if key in self.allowed_keys:
+ return AUTH_SUCCESSFUL
+ return AUTH_FAILED
+
def check_channel_request(self, kind, chanid):
if kind == "bogus":
return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
@@ -154,6 +165,7 @@ class TransportTest(unittest.TestCase):
self.socks.close()
self.sockc.close()
+ # TODO: unify with newer contextmanager
def setup_test_server(
self, client_options=None, server_options=None, connect_kwargs=None
):
@@ -245,7 +257,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual(True, self.tc.is_authenticated())
self.assertEqual(True, self.ts.is_authenticated())
- def testa_long_banner(self):
+ def test_long_banner(self):
"""
verify that a long banner doesn't mess up the handshake.
"""
@@ -339,7 +351,7 @@ class TransportTest(unittest.TestCase):
self.assertEqual("This is on stderr.\n", f.readline())
self.assertEqual("", f.readline())
- def testa_channel_can_be_used_as_context_manager(self):
+ def test_channel_can_be_used_as_context_manager(self):
"""
verify that exec_command() does something reasonable.
"""
@@ -1109,7 +1121,12 @@ class AlgorithmDisablingTests(unittest.TestCase):
t = Transport(sock=Mock())
assert t.preferred_ciphers == t._preferred_ciphers
assert t.preferred_macs == t._preferred_macs
- assert t.preferred_keys == t._preferred_keys
+ assert t.preferred_keys == tuple(
+ t._preferred_keys
+ + tuple(
+ "{}-cert-v01@openssh.com".format(x) for x in t._preferred_keys
+ )
+ )
assert t.preferred_kex == t._preferred_kex
def test_preferred_lists_filter_disabled_algorithms(self):
@@ -1128,6 +1145,7 @@ class AlgorithmDisablingTests(unittest.TestCase):
assert "hmac-md5" not in t.preferred_macs
assert "ssh-dss" in t._preferred_keys
assert "ssh-dss" not in t.preferred_keys
+ assert "ssh-dss-cert-v01@openssh.com" not in t.preferred_keys
assert "diffie-hellman-group14-sha256" in t._preferred_kex
assert "diffie-hellman-group14-sha256" not in t.preferred_kex
@@ -1169,3 +1187,275 @@ class AlgorithmDisablingTests(unittest.TestCase):
assert "ssh-dss" not in server_keys
assert "diffie-hellman-group14-sha256" not in kexen
assert "zlib" not in compressions
+
+
+@contextmanager
+def server(
+ hostkey=None,
+ init=None,
+ server_init=None,
+ client_init=None,
+ connect=None,
+ pubkeys=None,
+ catch_error=False,
+):
+ """
+ SSH server contextmanager for testing.
+
+ :param hostkey:
+ Host key to use for the server; if None, loads
+ ``test_rsa.key``.
+ :param init:
+ Default `Transport` constructor kwargs to use for both sides.
+ :param server_init:
+ Extends and/or overrides ``init`` for server transport only.
+ :param client_init:
+ Extends and/or overrides ``init`` for client transport only.
+ :param connect:
+ Kwargs to use for ``connect()`` on the client.
+ :param pubkeys:
+ List of public keys for auth.
+ :param catch_error:
+ Whether to capture connection errors & yield from contextmanager.
+ Necessary for connection_time exception testing.
+ """
+ if init is None:
+ init = {}
+ if server_init is None:
+ server_init = {}
+ if client_init is None:
+ client_init = {}
+ if connect is None:
+ connect = dict(username="slowdive", password="pygmalion")
+ socks = LoopSocket()
+ sockc = LoopSocket()
+ sockc.link(socks)
+ tc = Transport(sockc, **dict(init, **client_init))
+ ts = Transport(socks, **dict(init, **server_init))
+
+ if hostkey is None:
+ hostkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ ts.add_server_key(hostkey)
+ event = threading.Event()
+ server = NullServer(allowed_keys=pubkeys)
+ assert not event.is_set()
+ assert not ts.is_active()
+ assert tc.get_username() is None
+ assert ts.get_username() is None
+ assert not tc.is_authenticated()
+ assert not ts.is_authenticated()
+
+ err = None
+ # Trap errors and yield instead of raising right away; otherwise callers
+ # cannot usefully deal with problems at connect time which stem from errors
+ # in the server side.
+ try:
+ ts.start_server(event, server)
+ tc.connect(**connect)
+
+ event.wait(1.0)
+ assert event.is_set()
+ assert ts.is_active()
+ assert tc.is_active()
+
+ except Exception as e:
+ if not catch_error:
+ raise
+ err = e
+
+ yield (tc, ts, err) if catch_error else (tc, ts)
+
+ tc.close()
+ ts.close()
+ socks.close()
+ sockc.close()
+
+
+_disable_sha2 = dict(
+ disabled_algorithms=dict(keys=["rsa-sha2-256", "rsa-sha2-512"])
+)
+_disable_sha1 = dict(disabled_algorithms=dict(keys=["ssh-rsa"]))
+_disable_sha2_pubkey = dict(
+ disabled_algorithms=dict(pubkeys=["rsa-sha2-256", "rsa-sha2-512"])
+)
+_disable_sha1_pubkey = dict(disabled_algorithms=dict(pubkeys=["ssh-rsa"]))
+
+
+class TestSHA2SignatureKeyExchange(unittest.TestCase):
+ # NOTE: these all rely on the default server() hostkey being RSA
+ # NOTE: these rely on both sides being properly implemented re: agreed-upon
+ # hostkey during kex being what's actually used. Truly proving that eg
+ # SHA512 was used, is quite difficult w/o super gross hacks. However, there
+ # are new tests in test_pkey.py which use known signature blobs to prove
+ # the SHA2 family was in fact used!
+
+ @requires_sha1_signing
+ def test_base_case_ssh_rsa_still_used_as_fallback(self):
+ # Prove that ssh-rsa is used if either, or both, participants have SHA2
+ # algorithms disabled
+ for which in ("init", "client_init", "server_init"):
+ with server(**{which: _disable_sha2}) as (tc, _):
+ assert tc.host_key_type == "ssh-rsa"
+
+ def test_kex_with_sha2_512(self):
+ # It's the default!
+ with server() as (tc, _):
+ assert tc.host_key_type == "rsa-sha2-512"
+
+ def test_kex_with_sha2_256(self):
+ # No 512 -> you get 256
+ with server(
+ init=dict(disabled_algorithms=dict(keys=["rsa-sha2-512"]))
+ ) as (tc, _):
+ assert tc.host_key_type == "rsa-sha2-256"
+
+ def _incompatible_peers(self, client_init, server_init):
+ with server(
+ client_init=client_init, server_init=server_init, catch_error=True
+ ) as (tc, ts, err):
+ # If neither side blew up then that's bad!
+ assert err is not None
+ # If client side blew up first, it'll be straightforward
+ if isinstance(err, IncompatiblePeer):
+ pass
+ # If server side blew up first, client sees EOF & we need to check
+ # the server transport for its saved error (otherwise it can only
+ # appear in log output)
+ elif isinstance(err, EOFError):
+ assert ts.saved_exception is not None
+ assert isinstance(ts.saved_exception, IncompatiblePeer)
+ # If it was something else, welp
+ else:
+ raise err
+
+ def test_client_sha2_disabled_server_sha1_disabled_no_match(self):
+ self._incompatible_peers(
+ client_init=_disable_sha2, server_init=_disable_sha1
+ )
+
+ def test_client_sha1_disabled_server_sha2_disabled_no_match(self):
+ self._incompatible_peers(
+ client_init=_disable_sha1, server_init=_disable_sha2
+ )
+
+ def test_explicit_client_hostkey_not_limited(self):
+ # Be very explicit about the hostkey on BOTH ends,
+ # and ensure it still ends up choosing sha2-512.
+ # (This is a regression test vs previous implementation which overwrote
+ # the entire preferred-hostkeys structure when given an explicit key as
+ # a client.)
+ hostkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ with server(hostkey=hostkey, connect=dict(hostkey=hostkey)) as (tc, _):
+ assert tc.host_key_type == "rsa-sha2-512"
+
+
+class TestExtInfo(unittest.TestCase):
+ def test_ext_info_handshake(self):
+ with server() as (tc, _):
+ kex = tc._get_latest_kex_init()
+ assert kex["kex_algo_list"][-1] == "ext-info-c"
+ assert tc.server_extensions == {
+ "server-sig-algs": b"ssh-ed25519,ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521,rsa-sha2-512,rsa-sha2-256,ssh-rsa,ssh-dss" # noqa
+ }
+
+ def test_client_uses_server_sig_algs_for_pubkey_auth(self):
+ privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ with server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ server_init=dict(
+ disabled_algorithms=dict(pubkeys=["rsa-sha2-512"])
+ ),
+ ) as (tc, _):
+ assert tc.is_authenticated()
+ # Client settled on 256 despite itself not having 512 disabled
+ assert tc._agreed_pubkey_algorithm == "rsa-sha2-256"
+
+
+# TODO: these could move into test_auth.py but that badly needs refactoring
+# with this module anyways...
+class TestSHA2SignaturePubkeys(unittest.TestCase):
+ def test_pubkey_auth_honors_disabled_algorithms(self):
+ privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ with server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ init=dict(
+ disabled_algorithms=dict(
+ pubkeys=["ssh-rsa", "rsa-sha2-256", "rsa-sha2-512"]
+ )
+ ),
+ catch_error=True,
+ ) as (_, _, err):
+ assert isinstance(err, SSHException)
+ assert "no RSA pubkey algorithms" in str(err)
+
+ def test_client_sha2_disabled_server_sha1_disabled_no_match(self):
+ privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ with server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ client_init=_disable_sha2_pubkey,
+ server_init=_disable_sha1_pubkey,
+ catch_error=True,
+ ) as (tc, ts, err):
+ assert isinstance(err, AuthenticationException)
+
+ def test_client_sha1_disabled_server_sha2_disabled_no_match(self):
+ privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ with server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ client_init=_disable_sha1_pubkey,
+ server_init=_disable_sha2_pubkey,
+ catch_error=True,
+ ) as (tc, ts, err):
+ assert isinstance(err, AuthenticationException)
+
+ @requires_sha1_signing
+ def test_ssh_rsa_still_used_when_sha2_disabled(self):
+ privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ # NOTE: this works because key obj comparison uses public bytes
+ # TODO: would be nice for PKey to grow a legit "give me another obj of
+ # same class but just the public bits" using asbytes()
+ with server(
+ pubkeys=[privkey], connect=dict(pkey=privkey), init=_disable_sha2
+ ) as (tc, _):
+ assert tc.is_authenticated()
+
+ def test_sha2_512(self):
+ privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ with server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ init=dict(
+ disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-256"])
+ ),
+ ) as (tc, ts):
+ assert tc.is_authenticated()
+ assert tc._agreed_pubkey_algorithm == "rsa-sha2-512"
+
+ def test_sha2_256(self):
+ privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ with server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ init=dict(
+ disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"])
+ ),
+ ) as (tc, ts):
+ assert tc.is_authenticated()
+ assert tc._agreed_pubkey_algorithm == "rsa-sha2-256"
+
+ def test_sha2_256_when_client_only_enables_256(self):
+ privkey = RSAKey.from_private_key_file(_support("test_rsa.key"))
+ with server(
+ pubkeys=[privkey],
+ connect=dict(pkey=privkey),
+ # Client-side only; server still accepts all 3.
+ client_init=dict(
+ disabled_algorithms=dict(pubkeys=["ssh-rsa", "rsa-sha2-512"])
+ ),
+ ) as (tc, ts):
+ assert tc.is_authenticated()
+ assert tc._agreed_pubkey_algorithm == "rsa-sha2-256"
diff --git a/tests/test_util.py b/tests/test_util.py
index 8ce260d1..0e485759 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -14,7 +14,7 @@
#
# You should have received a copy of the GNU Lesser General Public License
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Some unit tests for utility functions.
diff --git a/tests/util.py b/tests/util.py
index 1355ce8a..3ec5d092 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -9,6 +9,10 @@ import pytest
from paramiko.py3compat import builtins, PY2
from paramiko.ssh_gss import GSS_AUTH_AVAILABLE
+from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.asymmetric import padding, rsa
tests_dir = dirname(realpath(__file__))
@@ -144,3 +148,31 @@ def is_low_entropy():
# I don't see a way to tell internally if the hash seed was set this
# way, but env should be plenty sufficient, this is only for testing.
return is_32bit and os.environ.get("PYTHONHASHSEED", None) == "0"
+
+
+def sha1_signing_unsupported():
+ """
+ This is used to skip tests in environments where SHA-1 signing is
+ not supported by the backend.
+ """
+ private_key = rsa.generate_private_key(
+ public_exponent=65537, key_size=2048, backend=default_backend()
+ )
+ message = b"Some dummy text"
+ try:
+ private_key.sign(
+ message,
+ padding.PSS(
+ mgf=padding.MGF1(hashes.SHA1()),
+ salt_length=padding.PSS.MAX_LENGTH,
+ ),
+ hashes.SHA1(),
+ )
+ return False
+ except UnsupportedAlgorithm as e:
+ return e._reason is _Reasons.UNSUPPORTED_HASH
+
+
+requires_sha1_signing = unittest.skipIf(
+ sha1_signing_unsupported(), "SHA-1 signing not supported"
+)