diff options
author | Jeff Forcier <jeff@bitprophet.org> | 2017-06-06 13:48:17 -0700 |
---|---|---|
committer | Jeff Forcier <jeff@bitprophet.org> | 2017-06-06 13:48:17 -0700 |
commit | f570418d6a31916991fe963a3e01cbcf75f5cbae (patch) | |
tree | 9940cff41d0d8bff5555ae8c59759537eabce28d /tests | |
parent | d69ef77f7698de677bd977483ba8c1c558b19d02 (diff) | |
parent | 947bd10f451f8a0249fdd8e234429f8a657c60ae (diff) |
Merge branch 'master' into 921-int
Diffstat (limited to 'tests')
-rw-r--r-- | tests/stub_sftp.py | 6 | ||||
-rw-r--r-- | tests/test_auth.py | 7 | ||||
-rw-r--r-- | tests/test_client.py | 21 | ||||
-rw-r--r-- | tests/test_ed25519.key | 8 | ||||
-rw-r--r-- | tests/test_ed25519_password.key | 8 | ||||
-rw-r--r-- | tests/test_gssapi.py | 8 | ||||
-rw-r--r-- | tests/test_hostkeys.py | 12 | ||||
-rw-r--r-- | tests/test_kex.py | 57 | ||||
-rw-r--r-- | tests/test_pkey.py | 19 | ||||
-rw-r--r-- | tests/test_ssh_gss.py | 4 | ||||
-rw-r--r-- | tests/test_transport.py | 22 | ||||
-rw-r--r-- | tests/test_util.py | 7 |
12 files changed, 150 insertions, 29 deletions
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py index 5fcca386..334af561 100644 --- a/tests/stub_sftp.py +++ b/tests/stub_sftp.py @@ -22,8 +22,10 @@ A stub SFTP server for loopback SFTP testing. import os import sys -from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \ - SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED +from paramiko import ( + ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, + SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED, +) from paramiko.common import o666 diff --git a/tests/test_auth.py b/tests/test_auth.py index 23517790..96f7611c 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -24,9 +24,10 @@ import sys import threading import unittest -from paramiko import Transport, ServerInterface, RSAKey, DSSKey, \ - BadAuthenticationType, InteractiveQuery, \ - AuthenticationException +from paramiko import ( + Transport, ServerInterface, RSAKey, DSSKey, BadAuthenticationType, + InteractiveQuery, AuthenticationException, +) from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL from paramiko.py3compat import u from tests.loop import LoopSocket diff --git a/tests/test_client.py b/tests/test_client.py index 5f4f0dd5..3a9001e2 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -43,6 +43,7 @@ FINGERPRINTS = { 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c', 'ssh-rsa': b'\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5', 'ecdsa-sha2-nistp256': b'\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60', + 'ssh-ed25519': b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80', } @@ -194,6 +195,9 @@ class SSHClientTest (unittest.TestCase): """ self._test_connection(key_filename=test_path('test_ecdsa_256.key')) + def test_client_ed25519(self): + self._test_connection(key_filename=test_path('test_ed25519.key')) + def test_3_multiple_key_files(self): """ verify that SSHClient accepts and tries multiple key files. @@ -414,3 +418,20 @@ class SSHClientTest (unittest.TestCase): 'Expected original SSHException in exception') else: self.assertFalse(False, 'SSHException was not thrown.') + + + def test_missing_key_policy_accepts_classes_or_instances(self): + """ + Client.missing_host_key_policy() can take classes or instances. + """ + # AN ACTUAL UNIT TEST?! GOOD LORD + # (But then we have to test a private API...meh.) + client = paramiko.SSHClient() + # Default + assert isinstance(client._policy, paramiko.RejectPolicy) + # Hand in an instance (classic behavior) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + assert isinstance(client._policy, paramiko.AutoAddPolicy) + # Hand in just the class (new behavior) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy) + assert isinstance(client._policy, paramiko.AutoAddPolicy) diff --git a/tests/test_ed25519.key b/tests/test_ed25519.key new file mode 100644 index 00000000..eb9f94c2 --- /dev/null +++ b/tests/test_ed25519.key @@ -0,0 +1,8 @@ +-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW +QyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXwAAAKhjwAdrY8AH +awAAAAtzc2gtZWQyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXw +AAAEA9tGQi2IrprbOSbDCF+RmAHd6meNSXBUQ2ekKXm4/8xnr1K9komH/1WBIvQbbtvnFV +hryd62EfcgRFuLRiokNfAAAAI2FsZXhfZ2F5bm9yQEFsZXhzLU1hY0Jvb2stQWlyLmxvY2 +FsAQI= +-----END OPENSSH PRIVATE KEY----- diff --git a/tests/test_ed25519_password.key b/tests/test_ed25519_password.key new file mode 100644 index 00000000..d178aaae --- /dev/null +++ b/tests/test_ed25519_password.key @@ -0,0 +1,8 @@ +-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jYmMAAAAGYmNyeXB0AAAAGAAAABDaKD4ac7 +kieb+UfXaLaw68AAAAEAAAAAEAAAAzAAAAC3NzaC1lZDI1NTE5AAAAIOQn7fjND5ozMSV3 +CvbEtIdT73hWCMRjzS/lRdUDw50xAAAAsE8kLGyYBnl9ihJNqv378y6mO3SkzrDbWXOnK6 +ij0vnuTAvcqvWHAnyu6qBbplu/W2m55ZFeAItgaEcV2/V76sh/sAKlERqrLFyXylN0xoOW +NU5+zU08aTlbSKGmeNUU2xE/xfJq12U9XClIRuVUkUpYANxNPbmTRpVrbD3fgXMhK97Jrb +DEn8ca1IqMPiYmd/hpe5+tq3OxyRljXjCUFWTnqkp9VvUdzSTdSGZHsW9i +-----END OPENSSH PRIVATE KEY----- diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index 96c268d9..bc220108 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -104,9 +104,11 @@ class GSSAPITest(unittest.TestCase): status = gss_srv_ctxt.verify_mic(mic_msg, mic_token) self.assertEquals(0, status) else: - gss_flags = sspicon.ISC_REQ_INTEGRITY |\ - sspicon.ISC_REQ_MUTUAL_AUTH |\ - sspicon.ISC_REQ_DELEGATE + gss_flags = ( + sspicon.ISC_REQ_INTEGRITY | + sspicon.ISC_REQ_MUTUAL_AUTH | + sspicon.ISC_REQ_DELEGATE + ) # Initialize a GSS-API context. target_name = "host/" + socket.getfqdn(targ_name) gss_ctxt = sspi.ClientAuth("Kerberos", diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py index 2bdcad9c..2c7ceeb9 100644 --- a/tests/test_hostkeys.py +++ b/tests/test_hostkeys.py @@ -115,3 +115,15 @@ class HostKeysTest (unittest.TestCase): self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp) fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper() self.assertEqual(b'4478F0B9A23CC5182009FF755BC1D26C', fp) + + def test_delitem(self): + hostdict = paramiko.HostKeys('hostfile.temp') + target = 'happy.example.com' + entry = hostdict[target] # will KeyError if not present + del hostdict[target] + try: + entry = hostdict[target] + except KeyError: + pass # Good + else: + assert False, "Entry was not deleted from HostKeys on delitem!" diff --git a/tests/test_kex.py b/tests/test_kex.py index 19804fbf..b7f588f7 100644 --- a/tests/test_kex.py +++ b/tests/test_kex.py @@ -20,7 +20,7 @@ Some unit tests for the key exchange protocols. """ -from binascii import hexlify +from binascii import hexlify, unhexlify import os import unittest @@ -29,11 +29,24 @@ from paramiko.kex_group1 import KexGroup1 from paramiko.kex_gex import KexGex, KexGexSHA256 from paramiko import Message from paramiko.common import byte_chr +from paramiko.kex_ecdh_nist import KexNistp256 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import ec def dummy_urandom(n): return byte_chr(0xcc) * n +def dummy_generate_key_pair(obj): + private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037 + public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989" + public_key_numbers_obj = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)) + obj.P = ec.EllipticCurvePrivateNumbers(private_value=private_key_value, public_numbers=public_key_numbers_obj).private_key(default_backend()) + if obj.transport.server_mode: + obj.Q_S = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)).public_key(default_backend()) + return + obj.Q_C = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)).public_key(default_backend()) + class FakeKey (object): def __str__(self): @@ -93,9 +106,12 @@ class KexTest (unittest.TestCase): def setUp(self): self._original_urandom = os.urandom os.urandom = dummy_urandom + self._original_generate_key_pair = KexNistp256._generate_key_pair + KexNistp256._generate_key_pair = dummy_generate_key_pair def tearDown(self): os.urandom = self._original_urandom + KexNistp256._generate_key_pair = self._original_generate_key_pair def test_1_group1_client(self): transport = FakeTransport() @@ -369,4 +385,43 @@ class KexTest (unittest.TestCase): self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) + def test_11_kex_nistp256_client(self): + K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 + transport = FakeTransport() + transport.server_mode = False + kex = KexNistp256(transport) + kex.start_kex() + self.assertEqual((paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY,), transport._expect) + + #fake reply + msg = Message() + msg.add_string('fake-host-key') + Q_S = unhexlify("043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210") + msg.add_string(Q_S) + msg.add_string('fake-sig') + msg.rewind() + kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY, msg) + H = b'BAF7CE243A836037EB5D2221420F35C02B9AB6C957FE3BDE3369307B9612570A' + self.assertEqual(K, kex.transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify) + self.assertTrue(transport._activated) + + def test_12_kex_nistp256_server(self): + K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 + transport = FakeTransport() + transport.server_mode = True + kex = KexNistp256(transport) + kex.start_kex() + self.assertEqual((paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT,), transport._expect) + #fake init + msg=Message() + Q_C = unhexlify("043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210") + H = b'2EF4957AFD530DD3F05DBEABF68D724FACC060974DA9704F2AEE4C3DE861E7CA' + msg.add_string(Q_C) + msg.rewind() + kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT, msg) + self.assertEqual(K, transport._K) + self.assertTrue(transport._activated) + self.assertEqual(H, hexlify(transport._H).upper()) diff --git a/tests/test_pkey.py b/tests/test_pkey.py index 24d78c3e..a26ff170 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -27,7 +27,7 @@ from binascii import hexlify from hashlib import md5 import base64 -from paramiko import RSAKey, DSSKey, ECDSAKey, Message, util +from paramiko import RSAKey, DSSKey, ECDSAKey, Ed25519Key, Message, util from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2 from tests.util import test_path @@ -112,14 +112,7 @@ TEST_KEY_BYTESTR_2 = '\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x81\x TEST_KEY_BYTESTR_3 = '\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\x07k%<\x1fT$E#>ғfD\x18 \x0cae#̬S#VlE\x1epvo\x17M߉DUXL<\x06\x10דw\u2bd5ٿw˟0)#y{\x10l\tPru\t\x19Π\u070e/f0yFmm\x1f' -class KeyTest (unittest.TestCase): - - def setUp(self): - pass - - def tearDown(self): - pass - +class KeyTest(unittest.TestCase): def test_1_generate_key_bytes(self): key = util.generate_key_bytes(md5, x1234, 'happy birthday', 30) exp = b'\x61\xE1\xF2\x72\xF4\xC1\xC4\x56\x15\x86\xBD\x32\x24\x98\xC0\xE9\x24\x67\x27\x80\xF4\x7B\xB3\x7D\xDA\x7D\x54\x01\x9E\x64' @@ -436,3 +429,11 @@ class KeyTest (unittest.TestCase): key = RSAKey.from_private_key_file(test_path('test_rsa.key')) comparable = TEST_KEY_BYTESTR_2 if PY2 else TEST_KEY_BYTESTR_3 self.assertEqual(str(key), comparable) + + def test_ed25519(self): + key1 = Ed25519Key.from_private_key_file(test_path('test_ed25519.key')) + key2 = Ed25519Key.from_private_key_file( + test_path('test_ed25519_password.key'), b'abc123' + ) + + self.assertNotEqual(key1.asbytes(), key2.asbytes()) diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index e20d348f..967b3b81 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -43,9 +43,7 @@ class NullServer (paramiko.ServerInterface): return paramiko.AUTH_FAILED def enable_auth_gssapi(self): - UseGSSAPI = True - GSSAPICleanupCredentials = True - return UseGSSAPI + return True def check_channel_request(self, kind, chanid): return paramiko.OPEN_SUCCEEDED diff --git a/tests/test_transport.py b/tests/test_transport.py index d81ad8f3..c426cef1 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -31,13 +31,16 @@ import random from hashlib import sha1 import unittest -from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \ - SSHException, ChannelException, Packetizer +from paramiko import ( + Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, SSHException, + ChannelException, Packetizer, +) from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED -from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, \ - MIN_PACKET_SIZE, MIN_WINDOW_SIZE, MAX_WINDOW_SIZE, \ - DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE +from paramiko.common import ( + MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, MIN_PACKET_SIZE, MIN_WINDOW_SIZE, + MAX_WINDOW_SIZE, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE, +) from paramiko.py3compat import bytes from paramiko.message import Message from tests.loop import LoopSocket @@ -162,6 +165,15 @@ class TransportTest(unittest.TestCase): except TypeError: pass + def test_1b_security_options_reset(self): + o = self.tc.get_security_options() + # should not throw any exceptions + o.ciphers = o.ciphers + o.digests = o.digests + o.key_types = o.key_types + o.kex = o.kex + o.compression = o.compression + def test_2_compute_key(self): self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929 self.tc.H = b'\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3' diff --git a/tests/test_util.py b/tests/test_util.py index a31e4507..7880e156 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -475,9 +475,10 @@ Host param3 parara safe_has_bytes = safe_string(has_bytes) expected_bytes = b("has %07%03 bytes") err = "{0!r} != {1!r}" - assert safe_vanilla == vanilla, err.format(safe_vanilla, vanilla) - assert safe_has_bytes == expected_bytes, \ - err.format(safe_has_bytes, expected_bytes) + msg = err.format(safe_vanilla, vanilla) + assert safe_vanilla == vanilla, msg + msg = err.format(safe_has_bytes, expected_bytes) + assert safe_has_bytes == expected_bytes, msg def test_proxycommand_none_issue_418(self): test_config_file = """ |