summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2017-06-06 14:04:04 -0700
committerJeff Forcier <jeff@bitprophet.org>2017-06-06 14:04:04 -0700
commitf0372ad11cc59e2b750fc3a3252f5ee3458ce623 (patch)
tree25f6944e44f8c26ca736e6da39a9ca0ad678491b /tests
parentc5febfa0176be1d230aabf6bb4ed94731a60e325 (diff)
parent2804b79cdeb2c140862d48883b7fdc105f05b95a (diff)
Merge branch 'master' into 869-int
Diffstat (limited to 'tests')
-rw-r--r--tests/stub_sftp.py8
-rw-r--r--tests/test_auth.py7
-rw-r--r--tests/test_client.py23
-rw-r--r--tests/test_ed25519.key8
-rw-r--r--tests/test_ed25519_password.key8
-rw-r--r--tests/test_gssapi.py8
-rw-r--r--tests/test_hostkeys.py12
-rw-r--r--tests/test_kex.py57
-rw-r--r--tests/test_packetizer.py8
-rw-r--r--tests/test_pkey.py28
-rwxr-xr-xtests/test_sftp.py2
-rw-r--r--tests/test_ssh_gss.py4
-rw-r--r--tests/test_transport.py22
-rw-r--r--tests/test_util.py7
14 files changed, 168 insertions, 34 deletions
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py
index 24380ba1..334af561 100644
--- a/tests/stub_sftp.py
+++ b/tests/stub_sftp.py
@@ -22,8 +22,10 @@ A stub SFTP server for loopback SFTP testing.
import os
import sys
-from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \
- SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED
+from paramiko import (
+ ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes,
+ SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED,
+)
from paramiko.common import o666
@@ -55,7 +57,7 @@ class StubSFTPHandle (SFTPHandle):
class StubSFTPServer (SFTPServerInterface):
# assume current folder is a fine root
- # (the tests always create and eventualy delete a subfolder, so there shouldn't be any mess)
+ # (the tests always create and eventually delete a subfolder, so there shouldn't be any mess)
ROOT = os.getcwd()
def _realpath(self, path):
diff --git a/tests/test_auth.py b/tests/test_auth.py
index 4fe6a325..58b2f44f 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -25,9 +25,10 @@ import threading
import unittest
from time import sleep
-from paramiko import Transport, ServerInterface, RSAKey, DSSKey, \
- BadAuthenticationType, InteractiveQuery, \
- AuthenticationException
+from paramiko import (
+ Transport, ServerInterface, RSAKey, DSSKey, BadAuthenticationType,
+ InteractiveQuery, AuthenticationException,
+)
from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL
from paramiko.py3compat import u
from tests.loop import LoopSocket
diff --git a/tests/test_client.py b/tests/test_client.py
index 2949d242..aa3ff59b 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -43,6 +43,7 @@ FINGERPRINTS = {
'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c',
'ssh-rsa': b'\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5',
'ecdsa-sha2-nistp256': b'\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60',
+ 'ssh-ed25519': b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80',
}
@@ -197,6 +198,9 @@ class SSHClientTest (unittest.TestCase):
"""
self._test_connection(key_filename=test_path('test_ecdsa_256.key'))
+ def test_client_ed25519(self):
+ self._test_connection(key_filename=test_path('test_ed25519.key'))
+
def test_3_multiple_key_files(self):
"""
verify that SSHClient accepts and tries multiple key files.
@@ -370,7 +374,7 @@ class SSHClientTest (unittest.TestCase):
# NOTE: re #387, re #394
# If pkey module used within Client._auth isn't correctly handling auth
# errors (e.g. if it allows things like ValueError to bubble up as per
- # midway thru #394) client.connect() will fail (at key load step)
+ # midway through #394) client.connect() will fail (at key load step)
# instead of succeeding (at password step)
kwargs = dict(
# Password-protected key whose passphrase is not 'pygmalion' (it's
@@ -435,3 +439,20 @@ class SSHClientTest (unittest.TestCase):
'Expected original SSHException in exception')
else:
self.assertFalse(False, 'SSHException was not thrown.')
+
+
+ def test_missing_key_policy_accepts_classes_or_instances(self):
+ """
+ Client.missing_host_key_policy() can take classes or instances.
+ """
+ # AN ACTUAL UNIT TEST?! GOOD LORD
+ # (But then we have to test a private API...meh.)
+ client = paramiko.SSHClient()
+ # Default
+ assert isinstance(client._policy, paramiko.RejectPolicy)
+ # Hand in an instance (classic behavior)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ assert isinstance(client._policy, paramiko.AutoAddPolicy)
+ # Hand in just the class (new behavior)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
+ assert isinstance(client._policy, paramiko.AutoAddPolicy)
diff --git a/tests/test_ed25519.key b/tests/test_ed25519.key
new file mode 100644
index 00000000..eb9f94c2
--- /dev/null
+++ b/tests/test_ed25519.key
@@ -0,0 +1,8 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
+QyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXwAAAKhjwAdrY8AH
+awAAAAtzc2gtZWQyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXw
+AAAEA9tGQi2IrprbOSbDCF+RmAHd6meNSXBUQ2ekKXm4/8xnr1K9komH/1WBIvQbbtvnFV
+hryd62EfcgRFuLRiokNfAAAAI2FsZXhfZ2F5bm9yQEFsZXhzLU1hY0Jvb2stQWlyLmxvY2
+FsAQI=
+-----END OPENSSH PRIVATE KEY-----
diff --git a/tests/test_ed25519_password.key b/tests/test_ed25519_password.key
new file mode 100644
index 00000000..d178aaae
--- /dev/null
+++ b/tests/test_ed25519_password.key
@@ -0,0 +1,8 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jYmMAAAAGYmNyeXB0AAAAGAAAABDaKD4ac7
+kieb+UfXaLaw68AAAAEAAAAAEAAAAzAAAAC3NzaC1lZDI1NTE5AAAAIOQn7fjND5ozMSV3
+CvbEtIdT73hWCMRjzS/lRdUDw50xAAAAsE8kLGyYBnl9ihJNqv378y6mO3SkzrDbWXOnK6
+ij0vnuTAvcqvWHAnyu6qBbplu/W2m55ZFeAItgaEcV2/V76sh/sAKlERqrLFyXylN0xoOW
+NU5+zU08aTlbSKGmeNUU2xE/xfJq12U9XClIRuVUkUpYANxNPbmTRpVrbD3fgXMhK97Jrb
+DEn8ca1IqMPiYmd/hpe5+tq3OxyRljXjCUFWTnqkp9VvUdzSTdSGZHsW9i
+-----END OPENSSH PRIVATE KEY-----
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index 96c268d9..bc220108 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -104,9 +104,11 @@ class GSSAPITest(unittest.TestCase):
status = gss_srv_ctxt.verify_mic(mic_msg, mic_token)
self.assertEquals(0, status)
else:
- gss_flags = sspicon.ISC_REQ_INTEGRITY |\
- sspicon.ISC_REQ_MUTUAL_AUTH |\
- sspicon.ISC_REQ_DELEGATE
+ gss_flags = (
+ sspicon.ISC_REQ_INTEGRITY |
+ sspicon.ISC_REQ_MUTUAL_AUTH |
+ sspicon.ISC_REQ_DELEGATE
+ )
# Initialize a GSS-API context.
target_name = "host/" + socket.getfqdn(targ_name)
gss_ctxt = sspi.ClientAuth("Kerberos",
diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py
index 2bdcad9c..2c7ceeb9 100644
--- a/tests/test_hostkeys.py
+++ b/tests/test_hostkeys.py
@@ -115,3 +115,15 @@ class HostKeysTest (unittest.TestCase):
self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp)
fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper()
self.assertEqual(b'4478F0B9A23CC5182009FF755BC1D26C', fp)
+
+ def test_delitem(self):
+ hostdict = paramiko.HostKeys('hostfile.temp')
+ target = 'happy.example.com'
+ entry = hostdict[target] # will KeyError if not present
+ del hostdict[target]
+ try:
+ entry = hostdict[target]
+ except KeyError:
+ pass # Good
+ else:
+ assert False, "Entry was not deleted from HostKeys on delitem!"
diff --git a/tests/test_kex.py b/tests/test_kex.py
index 19804fbf..b7f588f7 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -20,7 +20,7 @@
Some unit tests for the key exchange protocols.
"""
-from binascii import hexlify
+from binascii import hexlify, unhexlify
import os
import unittest
@@ -29,11 +29,24 @@ from paramiko.kex_group1 import KexGroup1
from paramiko.kex_gex import KexGex, KexGexSHA256
from paramiko import Message
from paramiko.common import byte_chr
+from paramiko.kex_ecdh_nist import KexNistp256
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.asymmetric import ec
def dummy_urandom(n):
return byte_chr(0xcc) * n
+def dummy_generate_key_pair(obj):
+ private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037
+ public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989"
+ public_key_numbers_obj = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers))
+ obj.P = ec.EllipticCurvePrivateNumbers(private_value=private_key_value, public_numbers=public_key_numbers_obj).private_key(default_backend())
+ if obj.transport.server_mode:
+ obj.Q_S = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)).public_key(default_backend())
+ return
+ obj.Q_C = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)).public_key(default_backend())
+
class FakeKey (object):
def __str__(self):
@@ -93,9 +106,12 @@ class KexTest (unittest.TestCase):
def setUp(self):
self._original_urandom = os.urandom
os.urandom = dummy_urandom
+ self._original_generate_key_pair = KexNistp256._generate_key_pair
+ KexNistp256._generate_key_pair = dummy_generate_key_pair
def tearDown(self):
os.urandom = self._original_urandom
+ KexNistp256._generate_key_pair = self._original_generate_key_pair
def test_1_group1_client(self):
transport = FakeTransport()
@@ -369,4 +385,43 @@ class KexTest (unittest.TestCase):
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
+ def test_11_kex_nistp256_client(self):
+ K = 91610929826364598472338906427792435253694642563583721654249504912114314269754
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexNistp256(transport)
+ kex.start_kex()
+ self.assertEqual((paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY,), transport._expect)
+
+ #fake reply
+ msg = Message()
+ msg.add_string('fake-host-key')
+ Q_S = unhexlify("043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210")
+ msg.add_string(Q_S)
+ msg.add_string('fake-sig')
+ msg.rewind()
+ kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY, msg)
+ H = b'BAF7CE243A836037EB5D2221420F35C02B9AB6C957FE3BDE3369307B9612570A'
+ self.assertEqual(K, kex.transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify)
+ self.assertTrue(transport._activated)
+
+ def test_12_kex_nistp256_server(self):
+ K = 91610929826364598472338906427792435253694642563583721654249504912114314269754
+ transport = FakeTransport()
+ transport.server_mode = True
+ kex = KexNistp256(transport)
+ kex.start_kex()
+ self.assertEqual((paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT,), transport._expect)
+ #fake init
+ msg=Message()
+ Q_C = unhexlify("043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210")
+ H = b'2EF4957AFD530DD3F05DBEABF68D724FACC060974DA9704F2AEE4C3DE861E7CA'
+ msg.add_string(Q_C)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT, msg)
+ self.assertEqual(K, transport._K)
+ self.assertTrue(transport._activated)
+ self.assertEqual(H, hexlify(transport._H).upper())
diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py
index a1ea398c..02173292 100644
--- a/tests/test_packetizer.py
+++ b/tests/test_packetizer.py
@@ -114,9 +114,13 @@ class PacketizerTest (unittest.TestCase):
import signal
class TimeoutError(Exception):
- pass
+ def __init__(self, error_message):
+ if hasattr(errno, 'ETIME'):
+ self.message = os.sterror(errno.ETIME)
+ else:
+ self.messaage = error_message
- def timeout(seconds=1, error_message=os.strerror(errno.ETIME)):
+ def timeout(seconds=1, error_message='Timer expired'):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 2181dd91..a26ff170 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
# Copyright (C) 2003-2009 Robey Pointer <robeypointer@gmail.com>
#
# This file is part of paramiko.
@@ -26,8 +27,8 @@ from binascii import hexlify
from hashlib import md5
import base64
-from paramiko import RSAKey, DSSKey, ECDSAKey, Message, util
-from paramiko.py3compat import StringIO, byte_chr, b, bytes
+from paramiko import RSAKey, DSSKey, ECDSAKey, Ed25519Key, Message, util
+from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2
from tests.util import test_path
@@ -107,15 +108,11 @@ L4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA==
x1234 = b'\x01\x02\x03\x04'
+TEST_KEY_BYTESTR_2 = '\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x81\x00\xd3\x8fV\xea\x07\x85\xa6k%\x8d<\x1f\xbc\x8dT\x98\xa5\x96$\xf3E#\xbe>\xbc\xd2\x93\x93\x87f\xceD\x18\xdb \x0c\xb3\xa1a\x96\xf8e#\xcc\xacS\x8a#\xefVlE\x83\x1epv\xc1o\x17M\xef\xdf\x89DUXL\xa6\x8b\xaa<\x06\x10\xd7\x93w\xec\xaf\xe2\xaf\x95\xd8\xfb\xd9\xbfw\xcb\x9f0)#y{\x10\x90\xaa\x85l\tPru\x8c\t\x19\xce\xa0\xf1\xd2\xdc\x8e/\x8b\xa8f\x9c0\xdey\x84\xd2F\xf7\xcbmm\x1f\x87'
+TEST_KEY_BYTESTR_3 = '\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\x07k%<\x1fT$E#>ғfD\x18 \x0cae#̬S#VlE\x1epvo\x17M߉DUXL<\x06\x10דw\u2bd5ٿw˟0)#y{\x10l\tPru\t\x19Π\u070e/f0yFmm\x1f'
-class KeyTest (unittest.TestCase):
-
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
+class KeyTest(unittest.TestCase):
def test_1_generate_key_bytes(self):
key = util.generate_key_bytes(md5, x1234, 'happy birthday', 30)
exp = b'\x61\xE1\xF2\x72\xF4\xC1\xC4\x56\x15\x86\xBD\x32\x24\x98\xC0\xE9\x24\x67\x27\x80\xF4\x7B\xB3\x7D\xDA\x7D\x54\x01\x9E\x64'
@@ -427,3 +424,16 @@ class KeyTest (unittest.TestCase):
self.assertEqual(key, key2)
finally:
os.remove(newfile)
+
+ def test_stringification(self):
+ key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ comparable = TEST_KEY_BYTESTR_2 if PY2 else TEST_KEY_BYTESTR_3
+ self.assertEqual(str(key), comparable)
+
+ def test_ed25519(self):
+ key1 = Ed25519Key.from_private_key_file(test_path('test_ed25519.key'))
+ key2 = Ed25519Key.from_private_key_file(
+ test_path('test_ed25519_password.key'), b'abc123'
+ )
+
+ self.assertNotEqual(key1.asbytes(), key2.asbytes())
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index e4c2c3a3..d3064fff 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -413,7 +413,7 @@ class SFTPTest (unittest.TestCase):
def test_A_readline_seek(self):
"""
create a text file and write a bunch of text into it. then count the lines
- in the file, and seek around to retreive particular lines. this should
+ in the file, and seek around to retrieve particular lines. this should
verify that read buffering and 'tell' work well together, and that read
buffering is reset on 'seek'.
"""
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index e20d348f..967b3b81 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -43,9 +43,7 @@ class NullServer (paramiko.ServerInterface):
return paramiko.AUTH_FAILED
def enable_auth_gssapi(self):
- UseGSSAPI = True
- GSSAPICleanupCredentials = True
- return UseGSSAPI
+ return True
def check_channel_request(self, kind, chanid):
return paramiko.OPEN_SUCCEEDED
diff --git a/tests/test_transport.py b/tests/test_transport.py
index d81ad8f3..c426cef1 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -31,13 +31,16 @@ import random
from hashlib import sha1
import unittest
-from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \
- SSHException, ChannelException, Packetizer
+from paramiko import (
+ Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, SSHException,
+ ChannelException, Packetizer,
+)
from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
-from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, \
- MIN_PACKET_SIZE, MIN_WINDOW_SIZE, MAX_WINDOW_SIZE, \
- DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE
+from paramiko.common import (
+ MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, MIN_PACKET_SIZE, MIN_WINDOW_SIZE,
+ MAX_WINDOW_SIZE, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE,
+)
from paramiko.py3compat import bytes
from paramiko.message import Message
from tests.loop import LoopSocket
@@ -162,6 +165,15 @@ class TransportTest(unittest.TestCase):
except TypeError:
pass
+ def test_1b_security_options_reset(self):
+ o = self.tc.get_security_options()
+ # should not throw any exceptions
+ o.ciphers = o.ciphers
+ o.digests = o.digests
+ o.key_types = o.key_types
+ o.kex = o.kex
+ o.compression = o.compression
+
def test_2_compute_key(self):
self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929
self.tc.H = b'\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3'
diff --git a/tests/test_util.py b/tests/test_util.py
index a31e4507..7880e156 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -475,9 +475,10 @@ Host param3 parara
safe_has_bytes = safe_string(has_bytes)
expected_bytes = b("has %07%03 bytes")
err = "{0!r} != {1!r}"
- assert safe_vanilla == vanilla, err.format(safe_vanilla, vanilla)
- assert safe_has_bytes == expected_bytes, \
- err.format(safe_has_bytes, expected_bytes)
+ msg = err.format(safe_vanilla, vanilla)
+ assert safe_vanilla == vanilla, msg
+ msg = err.format(safe_has_bytes, expected_bytes)
+ assert safe_has_bytes == expected_bytes, msg
def test_proxycommand_none_issue_418(self):
test_config_file = """