summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/stub_sftp.py14
-rw-r--r--tests/test_auth.py19
-rw-r--r--tests/test_client.py48
-rw-r--r--tests/test_ed25519.key8
-rw-r--r--tests/test_ed25519_password.key8
-rw-r--r--tests/test_kex.py57
-rw-r--r--tests/test_kex_gss.py21
-rw-r--r--tests/test_pkey.py30
-rwxr-xr-xtests/test_sftp.py33
9 files changed, 226 insertions, 12 deletions
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py
index 334af561..0d673091 100644
--- a/tests/stub_sftp.py
+++ b/tests/stub_sftp.py
@@ -24,7 +24,7 @@ import os
import sys
from paramiko import (
ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes,
- SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED,
+ SFTPHandle, SFTP_OK, SFTP_FAILURE, AUTH_SUCCESSFUL, OPEN_SUCCEEDED,
)
from paramiko.common import o666
@@ -141,12 +141,24 @@ class StubSFTPServer (SFTPServerInterface):
def rename(self, oldpath, newpath):
oldpath = self._realpath(oldpath)
newpath = self._realpath(newpath)
+ if os.path.exists(newpath):
+ return SFTP_FAILURE
try:
os.rename(oldpath, newpath)
except OSError as e:
return SFTPServer.convert_errno(e.errno)
return SFTP_OK
+ def posix_rename(self, oldpath, newpath):
+ oldpath = self._realpath(oldpath)
+ newpath = self._realpath(newpath)
+ try:
+ os.rename(oldpath, newpath)
+ except OSError as e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+
def mkdir(self, path, attr):
path = self._realpath(path)
try:
diff --git a/tests/test_auth.py b/tests/test_auth.py
index 96f7611c..e78397c6 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -23,6 +23,7 @@ Some unit tests for authenticating over a Transport.
import sys
import threading
import unittest
+from time import sleep
from paramiko import (
Transport, ServerInterface, RSAKey, DSSKey, BadAuthenticationType,
@@ -74,6 +75,9 @@ class NullServer (ServerInterface):
return AUTH_SUCCESSFUL
if username == 'bad-server':
raise Exception("Ack!")
+ if username == 'unresponsive-server':
+ sleep(5)
+ return AUTH_SUCCESSFUL
return AUTH_FAILED
def check_auth_publickey(self, username, key):
@@ -233,3 +237,18 @@ class AuthTest (unittest.TestCase):
except:
etype, evalue, etb = sys.exc_info()
self.assertTrue(issubclass(etype, AuthenticationException))
+
+ def test_9_auth_non_responsive(self):
+ """
+ verify that authentication times out if server takes to long to
+ respond (or never responds).
+ """
+ self.tc.auth_timeout = 1 # 1 second, to speed up test
+ self.start_server()
+ self.tc.connect()
+ try:
+ remain = self.tc.auth_password('unresponsive-server', 'hello')
+ except:
+ etype, evalue, etb = sys.exc_info()
+ self.assertTrue(issubclass(etype, AuthenticationException))
+ self.assertTrue('Authentication timeout' in str(evalue))
diff --git a/tests/test_client.py b/tests/test_client.py
index 4a7117ac..7710055b 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -35,14 +35,15 @@ import time
from tests.util import test_path
import paramiko
-from paramiko.py3compat import PY2, b
-from paramiko.ssh_exception import SSHException
+from paramiko.common import PY2
+from paramiko.ssh_exception import SSHException, AuthenticationException
FINGERPRINTS = {
'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c',
'ssh-rsa': b'\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5',
'ecdsa-sha2-nistp256': b'\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60',
+ 'ssh-ed25519': b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80',
}
@@ -60,6 +61,9 @@ class NullServer (paramiko.ServerInterface):
def check_auth_password(self, username, password):
if (username == 'slowdive') and (password == 'pygmalion'):
return paramiko.AUTH_SUCCESSFUL
+ if (username == 'slowdive') and (password == 'unresponsive-server'):
+ time.sleep(5)
+ return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
def check_auth_publickey(self, username, key):
@@ -199,6 +203,9 @@ class SSHClientTest (unittest.TestCase):
"""
self._test_connection(key_filename=test_path('test_ecdsa_256.key'))
+ def test_client_ed25519(self):
+ self._test_connection(key_filename=test_path('test_ed25519.key'))
+
def test_3_multiple_key_files(self):
"""
verify that SSHClient accepts and tries multiple key files.
@@ -384,7 +391,19 @@ class SSHClientTest (unittest.TestCase):
)
self._test_connection(**kwargs)
- def test_9_auth_trickledown_gsskex(self):
+ def test_9_auth_timeout(self):
+ """
+ verify that the SSHClient has a configurable auth timeout
+ """
+ # Connect with a half second auth timeout
+ self.assertRaises(
+ AuthenticationException,
+ self._test_connection,
+ password='unresponsive-server',
+ auth_timeout=0.5,
+ )
+
+ def test_10_auth_trickledown_gsskex(self):
"""
Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding
"""
@@ -396,7 +415,7 @@ class SSHClientTest (unittest.TestCase):
)
self._test_connection(**kwargs)
- def test_10_auth_trickledown_gssauth(self):
+ def test_11_auth_trickledown_gssauth(self):
"""
Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
"""
@@ -408,7 +427,7 @@ class SSHClientTest (unittest.TestCase):
)
self._test_connection(**kwargs)
- def test_11_reject_policy(self):
+ def test_12_reject_policy(self):
"""
verify that SSHClient's RejectPolicy works.
"""
@@ -423,7 +442,7 @@ class SSHClientTest (unittest.TestCase):
password='pygmalion', **self.connect_kwargs
)
- def test_12_reject_policy_gsskex(self):
+ def test_13_reject_policy_gsskex(self):
"""
verify that SSHClient's RejectPolicy works,
even if gssapi-keyex was enabled but not used.
@@ -524,3 +543,20 @@ class SSHClientTest (unittest.TestCase):
'Expected original SSHException in exception')
else:
self.assertFalse(False, 'SSHException was not thrown.')
+
+
+ def test_missing_key_policy_accepts_classes_or_instances(self):
+ """
+ Client.missing_host_key_policy() can take classes or instances.
+ """
+ # AN ACTUAL UNIT TEST?! GOOD LORD
+ # (But then we have to test a private API...meh.)
+ client = paramiko.SSHClient()
+ # Default
+ assert isinstance(client._policy, paramiko.RejectPolicy)
+ # Hand in an instance (classic behavior)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ assert isinstance(client._policy, paramiko.AutoAddPolicy)
+ # Hand in just the class (new behavior)
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
+ assert isinstance(client._policy, paramiko.AutoAddPolicy)
diff --git a/tests/test_ed25519.key b/tests/test_ed25519.key
new file mode 100644
index 00000000..eb9f94c2
--- /dev/null
+++ b/tests/test_ed25519.key
@@ -0,0 +1,8 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
+QyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXwAAAKhjwAdrY8AH
+awAAAAtzc2gtZWQyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXw
+AAAEA9tGQi2IrprbOSbDCF+RmAHd6meNSXBUQ2ekKXm4/8xnr1K9komH/1WBIvQbbtvnFV
+hryd62EfcgRFuLRiokNfAAAAI2FsZXhfZ2F5bm9yQEFsZXhzLU1hY0Jvb2stQWlyLmxvY2
+FsAQI=
+-----END OPENSSH PRIVATE KEY-----
diff --git a/tests/test_ed25519_password.key b/tests/test_ed25519_password.key
new file mode 100644
index 00000000..d178aaae
--- /dev/null
+++ b/tests/test_ed25519_password.key
@@ -0,0 +1,8 @@
+-----BEGIN OPENSSH PRIVATE KEY-----
+b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jYmMAAAAGYmNyeXB0AAAAGAAAABDaKD4ac7
+kieb+UfXaLaw68AAAAEAAAAAEAAAAzAAAAC3NzaC1lZDI1NTE5AAAAIOQn7fjND5ozMSV3
+CvbEtIdT73hWCMRjzS/lRdUDw50xAAAAsE8kLGyYBnl9ihJNqv378y6mO3SkzrDbWXOnK6
+ij0vnuTAvcqvWHAnyu6qBbplu/W2m55ZFeAItgaEcV2/V76sh/sAKlERqrLFyXylN0xoOW
+NU5+zU08aTlbSKGmeNUU2xE/xfJq12U9XClIRuVUkUpYANxNPbmTRpVrbD3fgXMhK97Jrb
+DEn8ca1IqMPiYmd/hpe5+tq3OxyRljXjCUFWTnqkp9VvUdzSTdSGZHsW9i
+-----END OPENSSH PRIVATE KEY-----
diff --git a/tests/test_kex.py b/tests/test_kex.py
index 19804fbf..b7f588f7 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -20,7 +20,7 @@
Some unit tests for the key exchange protocols.
"""
-from binascii import hexlify
+from binascii import hexlify, unhexlify
import os
import unittest
@@ -29,11 +29,24 @@ from paramiko.kex_group1 import KexGroup1
from paramiko.kex_gex import KexGex, KexGexSHA256
from paramiko import Message
from paramiko.common import byte_chr
+from paramiko.kex_ecdh_nist import KexNistp256
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.asymmetric import ec
def dummy_urandom(n):
return byte_chr(0xcc) * n
+def dummy_generate_key_pair(obj):
+ private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037
+ public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989"
+ public_key_numbers_obj = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers))
+ obj.P = ec.EllipticCurvePrivateNumbers(private_value=private_key_value, public_numbers=public_key_numbers_obj).private_key(default_backend())
+ if obj.transport.server_mode:
+ obj.Q_S = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)).public_key(default_backend())
+ return
+ obj.Q_C = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)).public_key(default_backend())
+
class FakeKey (object):
def __str__(self):
@@ -93,9 +106,12 @@ class KexTest (unittest.TestCase):
def setUp(self):
self._original_urandom = os.urandom
os.urandom = dummy_urandom
+ self._original_generate_key_pair = KexNistp256._generate_key_pair
+ KexNistp256._generate_key_pair = dummy_generate_key_pair
def tearDown(self):
os.urandom = self._original_urandom
+ KexNistp256._generate_key_pair = self._original_generate_key_pair
def test_1_group1_client(self):
transport = FakeTransport()
@@ -369,4 +385,43 @@ class KexTest (unittest.TestCase):
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
+ def test_11_kex_nistp256_client(self):
+ K = 91610929826364598472338906427792435253694642563583721654249504912114314269754
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexNistp256(transport)
+ kex.start_kex()
+ self.assertEqual((paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY,), transport._expect)
+
+ #fake reply
+ msg = Message()
+ msg.add_string('fake-host-key')
+ Q_S = unhexlify("043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210")
+ msg.add_string(Q_S)
+ msg.add_string('fake-sig')
+ msg.rewind()
+ kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY, msg)
+ H = b'BAF7CE243A836037EB5D2221420F35C02B9AB6C957FE3BDE3369307B9612570A'
+ self.assertEqual(K, kex.transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify)
+ self.assertTrue(transport._activated)
+
+ def test_12_kex_nistp256_server(self):
+ K = 91610929826364598472338906427792435253694642563583721654249504912114314269754
+ transport = FakeTransport()
+ transport.server_mode = True
+ kex = KexNistp256(transport)
+ kex.start_kex()
+ self.assertEqual((paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT,), transport._expect)
+ #fake init
+ msg=Message()
+ Q_C = unhexlify("043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210")
+ H = b'2EF4957AFD530DD3F05DBEABF68D724FACC060974DA9704F2AEE4C3DE861E7CA'
+ msg.add_string(Q_C)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT, msg)
+ self.assertEqual(K, transport._K)
+ self.assertTrue(transport._activated)
+ self.assertEqual(H, hexlify(transport._H).upper())
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index 3bf788da..af342a7c 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -93,7 +93,7 @@ class GSSKexTest(unittest.TestCase):
server = NullServer()
self.ts.start_server(self.event, server)
- def test_1_gsskex_and_auth(self):
+ def _test_gsskex_and_auth(self, gss_host, rekey=False):
"""
Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated
Diffie-Hellman Key Exchange and user authentication with the GSS-API
@@ -106,16 +106,19 @@ class GSSKexTest(unittest.TestCase):
self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port),
'ssh-rsa', public_host_key)
self.tc.connect(self.hostname, self.port, username=self.username,
- gss_auth=True, gss_kex=True)
+ gss_auth=True, gss_kex=True, gss_host=gss_host)
self.event.wait(1.0)
self.assert_(self.event.is_set())
self.assert_(self.ts.is_active())
self.assertEquals(self.username, self.ts.get_username())
self.assertEquals(True, self.ts.is_authenticated())
+ self.assertEquals(True, self.tc.get_transport().gss_kex_used)
stdin, stdout, stderr = self.tc.exec_command('yes')
schan = self.ts.accept(1.0)
+ if rekey:
+ self.tc.get_transport().renegotiate_keys()
schan.send('Hello there.\n')
schan.send_stderr('This is on stderr.\n')
@@ -129,3 +132,17 @@ class GSSKexTest(unittest.TestCase):
stdin.close()
stdout.close()
stderr.close()
+
+ def test_1_gsskex_and_auth(self):
+ """
+ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated
+ Diffie-Hellman Key Exchange and user authentication with the GSS-API
+ context created during key exchange.
+ """
+ self._test_gsskex_and_auth(gss_host=None)
+
+ def test_2_gsskex_and_auth_rekey(self):
+ """
+ Verify that Paramiko can rekey.
+ """
+ self._test_gsskex_and_auth(gss_host=None, rekey=True)
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 394a2cf4..7ffc4b41 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -27,7 +27,7 @@ from binascii import hexlify
from hashlib import md5
import base64
-from paramiko import RSAKey, DSSKey, ECDSAKey, Message, util
+from paramiko import RSAKey, DSSKey, ECDSAKey, Ed25519Key, Message, util
from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2
from tests.util import test_path
@@ -112,7 +112,7 @@ TEST_KEY_BYTESTR_2 = '\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x81\x
TEST_KEY_BYTESTR_3 = '\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\x07k%<\x1fT$E#>ғfD\x18 \x0cae#̬S#VlE\x1epvo\x17M߉DUXL<\x06\x10דw\u2bd5ٿw˟0)#y{\x10l\tPru\t\x19Π\u070e/f0yFmm\x1f'
-class KeyTest (unittest.TestCase):
+class KeyTest(unittest.TestCase):
def setUp(self):
pass
@@ -450,6 +450,32 @@ class KeyTest (unittest.TestCase):
comparable = TEST_KEY_BYTESTR_2 if PY2 else TEST_KEY_BYTESTR_3
self.assertEqual(str(key), comparable)
+ def test_ed25519(self):
+ key1 = Ed25519Key.from_private_key_file(test_path('test_ed25519.key'))
+ key2 = Ed25519Key.from_private_key_file(
+ test_path('test_ed25519_password.key'), b'abc123'
+ )
+ self.assertNotEqual(key1.asbytes(), key2.asbytes())
+
+ def test_ed25519_compare(self):
+ # verify that the private & public keys compare equal
+ key = Ed25519Key.from_private_key_file(test_path('test_ed25519.key'))
+ self.assertEqual(key, key)
+ pub = Ed25519Key(data=key.asbytes())
+ self.assertTrue(key.can_sign())
+ self.assertTrue(not pub.can_sign())
+ self.assertEqual(key, pub)
+
+ def test_ed25519_nonbytes_password(self):
+ # https://github.com/paramiko/paramiko/issues/1039
+ key = Ed25519Key.from_private_key_file(
+ test_path('test_ed25519_password.key'),
+ # NOTE: not a bytes. Amusingly, the test above for same key DOES
+ # explicitly cast to bytes...code smell!
+ 'abc123',
+ )
+ # No exception -> it's good. Meh.
+
def test_keyfile_is_actually_encrypted(self):
# Read an existing encrypted private key
file_ = test_path('test_rsa_password.key')
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index 98a9cebb..b3c7bf98 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -277,6 +277,39 @@ class SFTPTest (unittest.TestCase):
except:
pass
+
+ def test_5a_posix_rename(self):
+ """Test posix-rename@openssh.com protocol extension."""
+ try:
+ # first check that the normal rename works as specified
+ with sftp.open(FOLDER + '/a', 'w') as f:
+ f.write('one')
+ sftp.rename(FOLDER + '/a', FOLDER + '/b')
+ with sftp.open(FOLDER + '/a', 'w') as f:
+ f.write('two')
+ try:
+ sftp.rename(FOLDER + '/a', FOLDER + '/b')
+ self.assertTrue(False, 'no exception when rename-ing onto existing file')
+ except (OSError, IOError):
+ pass
+
+ # now check with the posix_rename
+ sftp.posix_rename(FOLDER + '/a', FOLDER + '/b')
+ with sftp.open(FOLDER + '/b', 'r') as f:
+ data = u(f.read())
+ self.assertEqual('two', data, "Contents of renamed file not the same as original file")
+
+ finally:
+ try:
+ sftp.remove(FOLDER + '/a')
+ except:
+ pass
+ try:
+ sftp.remove(FOLDER + '/b')
+ except:
+ pass
+
+
def test_6_folder(self):
"""
create a temporary folder, verify that we can create a file in it, then