diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/stub_sftp.py | 12 | ||||
-rw-r--r-- | tests/test_auth.py | 19 | ||||
-rw-r--r-- | tests/test_client.py | 47 | ||||
-rw-r--r-- | tests/test_ed25519.key | 8 | ||||
-rw-r--r-- | tests/test_ed25519_password.key | 8 | ||||
-rw-r--r-- | tests/test_kex.py | 76 | ||||
-rw-r--r-- | tests/test_kex_gss.py | 20 | ||||
-rw-r--r-- | tests/test_pkey.py | 28 | ||||
-rw-r--r-- | tests/test_sftp.py | 34 |
9 files changed, 243 insertions, 9 deletions
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py index 06ceb419..100076d6 100644 --- a/tests/stub_sftp.py +++ b/tests/stub_sftp.py @@ -30,6 +30,7 @@ from paramiko import ( SFTPAttributes, SFTPHandle, SFTP_OK, + SFTP_FAILURE, AUTH_SUCCESSFUL, OPEN_SUCCEEDED, ) @@ -150,6 +151,17 @@ class StubSFTPServer(SFTPServerInterface): def rename(self, oldpath, newpath): oldpath = self._realpath(oldpath) newpath = self._realpath(newpath) + if os.path.exists(newpath): + return SFTP_FAILURE + try: + os.rename(oldpath, newpath) + except OSError as e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def posix_rename(self, oldpath, newpath): + oldpath = self._realpath(oldpath) + newpath = self._realpath(newpath) try: os.rename(oldpath, newpath) except OSError as e: diff --git a/tests/test_auth.py b/tests/test_auth.py index 45dcb3a4..6358a053 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -23,6 +23,7 @@ Some unit tests for authenticating over a Transport. import sys import threading import unittest +from time import sleep from paramiko import ( Transport, @@ -84,6 +85,9 @@ class NullServer(ServerInterface): return AUTH_SUCCESSFUL if username == "bad-server": raise Exception("Ack!") + if username == "unresponsive-server": + sleep(5) + return AUTH_SUCCESSFUL return AUTH_FAILED def check_auth_publickey(self, username, key): @@ -250,3 +254,18 @@ class AuthTest(unittest.TestCase): except: etype, evalue, etb = sys.exc_info() self.assertTrue(issubclass(etype, AuthenticationException)) + + def test_9_auth_non_responsive(self): + """ + verify that authentication times out if server takes to long to + respond (or never responds). + """ + self.tc.auth_timeout = 1 # 1 second, to speed up test + self.start_server() + self.tc.connect() + try: + remain = self.tc.auth_password("unresponsive-server", "hello") + except: + etype, evalue, etb = sys.exc_info() + self.assertTrue(issubclass(etype, AuthenticationException)) + self.assertTrue("Authentication timeout" in str(evalue)) diff --git a/tests/test_client.py b/tests/test_client.py index fed38791..87f7bcb2 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -34,8 +34,8 @@ import weakref from tempfile import mkstemp import paramiko -from paramiko.py3compat import PY2, b -from paramiko.ssh_exception import SSHException +from paramiko.common import PY2 +from paramiko.ssh_exception import SSHException, AuthenticationException from .util import _support, slow @@ -44,6 +44,7 @@ FINGERPRINTS = { "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", + "ssh-ed25519": b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80', } @@ -61,6 +62,9 @@ class NullServer(paramiko.ServerInterface): def check_auth_password(self, username, password): if (username == "slowdive") and (password == "pygmalion"): return paramiko.AUTH_SUCCESSFUL + if (username == "slowdive") and (password == "unresponsive-server"): + time.sleep(5) + return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED def check_auth_publickey(self, username, key): @@ -203,6 +207,9 @@ class SSHClientTest(unittest.TestCase): """ self._test_connection(key_filename=_support("test_ecdsa_256.key")) + def test_client_ed25519(self): + self._test_connection(key_filename=_support("test_ed25519.key")) + def test_3_multiple_key_files(self): """ verify that SSHClient accepts and tries multiple key files. @@ -393,7 +400,19 @@ class SSHClientTest(unittest.TestCase): ) self._test_connection(**kwargs) - def test_9_auth_trickledown_gsskex(self): + def test_9_auth_timeout(self): + """ + verify that the SSHClient has a configurable auth timeout + """ + # Connect with a half second auth timeout + self.assertRaises( + AuthenticationException, + self._test_connection, + password="unresponsive-server", + auth_timeout=0.5, + ) + + def test_10_auth_trickledown_gsskex(self): """ Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding """ @@ -402,7 +421,7 @@ class SSHClientTest(unittest.TestCase): kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")]) self._test_connection(**kwargs) - def test_10_auth_trickledown_gssauth(self): + def test_11_auth_trickledown_gssauth(self): """ Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding """ @@ -411,7 +430,7 @@ class SSHClientTest(unittest.TestCase): kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")]) self._test_connection(**kwargs) - def test_11_reject_policy(self): + def test_12_reject_policy(self): """ verify that SSHClient's RejectPolicy works. """ @@ -427,7 +446,7 @@ class SSHClientTest(unittest.TestCase): **self.connect_kwargs ) - def test_12_reject_policy_gsskex(self): + def test_13_reject_policy_gsskex(self): """ verify that SSHClient's RejectPolicy works, even if gssapi-keyex was enabled but not used. @@ -534,3 +553,19 @@ class SSHClientTest(unittest.TestCase): ) else: self.assertFalse(False, "SSHException was not thrown.") + + def test_missing_key_policy_accepts_classes_or_instances(self): + """ + Client.missing_host_key_policy() can take classes or instances. + """ + # AN ACTUAL UNIT TEST?! GOOD LORD + # (But then we have to test a private API...meh.) + client = paramiko.SSHClient() + # Default + assert isinstance(client._policy, paramiko.RejectPolicy) + # Hand in an instance (classic behavior) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + assert isinstance(client._policy, paramiko.AutoAddPolicy) + # Hand in just the class (new behavior) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy) + assert isinstance(client._policy, paramiko.AutoAddPolicy) diff --git a/tests/test_ed25519.key b/tests/test_ed25519.key new file mode 100644 index 00000000..eb9f94c2 --- /dev/null +++ b/tests/test_ed25519.key @@ -0,0 +1,8 @@ +-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW +QyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXwAAAKhjwAdrY8AH +awAAAAtzc2gtZWQyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXw +AAAEA9tGQi2IrprbOSbDCF+RmAHd6meNSXBUQ2ekKXm4/8xnr1K9komH/1WBIvQbbtvnFV +hryd62EfcgRFuLRiokNfAAAAI2FsZXhfZ2F5bm9yQEFsZXhzLU1hY0Jvb2stQWlyLmxvY2 +FsAQI= +-----END OPENSSH PRIVATE KEY----- diff --git a/tests/test_ed25519_password.key b/tests/test_ed25519_password.key new file mode 100644 index 00000000..d178aaae --- /dev/null +++ b/tests/test_ed25519_password.key @@ -0,0 +1,8 @@ +-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jYmMAAAAGYmNyeXB0AAAAGAAAABDaKD4ac7 +kieb+UfXaLaw68AAAAEAAAAAEAAAAzAAAAC3NzaC1lZDI1NTE5AAAAIOQn7fjND5ozMSV3 +CvbEtIdT73hWCMRjzS/lRdUDw50xAAAAsE8kLGyYBnl9ihJNqv378y6mO3SkzrDbWXOnK6 +ij0vnuTAvcqvWHAnyu6qBbplu/W2m55ZFeAItgaEcV2/V76sh/sAKlERqrLFyXylN0xoOW +NU5+zU08aTlbSKGmeNUU2xE/xfJq12U9XClIRuVUkUpYANxNPbmTRpVrbD3fgXMhK97Jrb +DEn8ca1IqMPiYmd/hpe5+tq3OxyRljXjCUFWTnqkp9VvUdzSTdSGZHsW9i +-----END OPENSSH PRIVATE KEY----- diff --git a/tests/test_kex.py b/tests/test_kex.py index 5b749b4d..65eb9a17 100644 --- a/tests/test_kex.py +++ b/tests/test_kex.py @@ -20,7 +20,7 @@ Some unit tests for the key exchange protocols. """ -from binascii import hexlify +from binascii import hexlify, unhexlify import os import unittest @@ -32,12 +32,34 @@ from paramiko.kex_group1 import KexGroup1 from paramiko.kex_gex import KexGex, KexGexSHA256 from paramiko import Message from paramiko.common import byte_chr +from paramiko.kex_ecdh_nist import KexNistp256 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import ec def dummy_urandom(n): return byte_chr(0xcc) * n +def dummy_generate_key_pair(obj): + private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037 + public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989" + public_key_numbers_obj = ec.EllipticCurvePublicNumbers.from_encoded_point( + ec.SECP256R1(), unhexlify(public_key_numbers) + ) + obj.P = ec.EllipticCurvePrivateNumbers( + private_value=private_key_value, public_numbers=public_key_numbers_obj + ).private_key(default_backend()) + if obj.transport.server_mode: + obj.Q_S = ec.EllipticCurvePublicNumbers.from_encoded_point( + ec.SECP256R1(), unhexlify(public_key_numbers) + ).public_key(default_backend()) + return + obj.Q_C = ec.EllipticCurvePublicNumbers.from_encoded_point( + ec.SECP256R1(), unhexlify(public_key_numbers) + ).public_key(default_backend()) + + class FakeKey(object): def __str__(self): return "fake-key" @@ -96,9 +118,12 @@ class KexTest(unittest.TestCase): def setUp(self): self._original_urandom = os.urandom os.urandom = dummy_urandom + self._original_generate_key_pair = KexNistp256._generate_key_pair + KexNistp256._generate_key_pair = dummy_generate_key_pair def tearDown(self): os.urandom = self._original_urandom + KexNistp256._generate_key_pair = self._original_generate_key_pair def test_1_group1_client(self): transport = FakeTransport() @@ -423,3 +448,52 @@ class KexTest(unittest.TestCase): self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) + + def test_11_kex_nistp256_client(self): + K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 + transport = FakeTransport() + transport.server_mode = False + kex = KexNistp256(transport) + kex.start_kex() + self.assertEqual( + (paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY,), transport._expect + ) + + # fake reply + msg = Message() + msg.add_string("fake-host-key") + Q_S = unhexlify( + "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" + ) + msg.add_string(Q_S) + msg.add_string("fake-sig") + msg.rewind() + kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY, msg) + H = b"BAF7CE243A836037EB5D2221420F35C02B9AB6C957FE3BDE3369307B9612570A" + self.assertEqual(K, kex.transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) + self.assertTrue(transport._activated) + + def test_12_kex_nistp256_server(self): + K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 + transport = FakeTransport() + transport.server_mode = True + kex = KexNistp256(transport) + kex.start_kex() + self.assertEqual( + (paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT,), transport._expect + ) + + # fake init + msg = Message() + Q_C = unhexlify( + "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" + ) + H = b"2EF4957AFD530DD3F05DBEABF68D724FACC060974DA9704F2AEE4C3DE861E7CA" + msg.add_string(Q_C) + msg.rewind() + kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT, msg) + self.assertEqual(K, transport._K) + self.assertTrue(transport._activated) + self.assertEqual(H, hexlify(transport._H).upper()) diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index d5f624ce..c71ff91c 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -95,7 +95,7 @@ class GSSKexTest(unittest.TestCase): server = NullServer() self.ts.start_server(self.event, server) - def test_1_gsskex_and_auth(self): + def _test_gsskex_and_auth(self, gss_host, rekey=False): """ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated Diffie-Hellman Key Exchange and user authentication with the GSS-API @@ -114,6 +114,7 @@ class GSSKexTest(unittest.TestCase): username=self.username, gss_auth=True, gss_kex=True, + gss_host=gss_host, ) self.event.wait(1.0) @@ -121,9 +122,12 @@ class GSSKexTest(unittest.TestCase): self.assert_(self.ts.is_active()) self.assertEquals(self.username, self.ts.get_username()) self.assertEquals(True, self.ts.is_authenticated()) + self.assertEquals(True, self.tc.get_transport().gss_kex_used) stdin, stdout, stderr = self.tc.exec_command("yes") schan = self.ts.accept(1.0) + if rekey: + self.tc.get_transport().renegotiate_keys() schan.send("Hello there.\n") schan.send_stderr("This is on stderr.\n") @@ -137,3 +141,17 @@ class GSSKexTest(unittest.TestCase): stdin.close() stdout.close() stderr.close() + + def test_1_gsskex_and_auth(self): + """ + Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated + Diffie-Hellman Key Exchange and user authentication with the GSS-API + context created during key exchange. + """ + self._test_gsskex_and_auth(gss_host=None) + + def test_2_gsskex_and_auth_rekey(self): + """ + Verify that Paramiko can rekey. + """ + self._test_gsskex_and_auth(gss_host=None, rekey=True) diff --git a/tests/test_pkey.py b/tests/test_pkey.py index 4e9653a0..3a1279b6 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -27,7 +27,7 @@ from binascii import hexlify from hashlib import md5 import base64 -from paramiko import RSAKey, DSSKey, ECDSAKey, Message, util +from paramiko import RSAKey, DSSKey, ECDSAKey, Ed25519Key, Message, util from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2 from .util import _support @@ -461,6 +461,32 @@ class KeyTest(unittest.TestCase): comparable = TEST_KEY_BYTESTR_2 if PY2 else TEST_KEY_BYTESTR_3 self.assertEqual(str(key), comparable) + def test_ed25519(self): + key1 = Ed25519Key.from_private_key_file(_support("test_ed25519.key")) + key2 = Ed25519Key.from_private_key_file( + _support("test_ed25519_password.key"), b"abc123" + ) + self.assertNotEqual(key1.asbytes(), key2.asbytes()) + + def test_ed25519_compare(self): + # verify that the private & public keys compare equal + key = Ed25519Key.from_private_key_file(_support("test_ed25519.key")) + self.assertEqual(key, key) + pub = Ed25519Key(data=key.asbytes()) + self.assertTrue(key.can_sign()) + self.assertTrue(not pub.can_sign()) + self.assertEqual(key, pub) + + def test_ed25519_nonbytes_password(self): + # https://github.com/paramiko/paramiko/issues/1039 + key = Ed25519Key.from_private_key_file( + _support("test_ed25519_password.key"), + # NOTE: not a bytes. Amusingly, the test above for same key DOES + # explicitly cast to bytes...code smell! + "abc123", + ) + # No exception -> it's good. Meh. + def test_keyfile_is_actually_encrypted(self): # Read an existing encrypted private key file_ = _support("test_rsa_password.key") diff --git a/tests/test_sftp.py b/tests/test_sftp.py index ccfdf7b0..87c57340 100644 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -185,6 +185,40 @@ class TestSFTP(object): except: pass + def test_5a_posix_rename(self, sftp): + """Test posix-rename@openssh.com protocol extension.""" + try: + # first check that the normal rename works as specified + with sftp.open(sftp.FOLDER + "/a", "w") as f: + f.write("one") + sftp.rename(sftp.FOLDER + "/a", sftp.FOLDER + "/b") + with sftp.open(sftp.FOLDER + "/a", "w") as f: + f.write("two") + try: + sftp.rename(sftp.FOLDER + "/a", sftp.FOLDER + "/b") + self.assertTrue( + False, "no exception when rename-ing onto existing file" + ) + except (OSError, IOError): + pass + + # now check with the posix_rename + sftp.posix_rename(sftp.FOLDER + "/a", sftp.FOLDER + "/b") + with sftp.open(sftp.FOLDER + "/b", "r") as f: + data = u(f.read()) + err = "Contents of renamed file not the same as original file" + assert data == "two", err + + finally: + try: + sftp.remove(sftp.FOLDER + "/a") + except: + pass + try: + sftp.remove(sftp.FOLDER + "/b") + except: + pass + def test_6_folder(self, sftp): """ create a temporary folder, verify that we can create a file in it, then |