summaryrefslogtreecommitdiffhomepage
path: root/tests/test_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_client.py')
-rw-r--r--tests/test_client.py100
1 files changed, 53 insertions, 47 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
index 7058f394..0483cf6f 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -23,22 +23,27 @@ Some unit tests for SSHClient.
from __future__ import with_statement
import gc
+import os
import platform
import socket
-from tempfile import mkstemp
import threading
+import time
import unittest
-import weakref
import warnings
-import os
-import time
-from tests.util import test_path
+import weakref
+from tempfile import mkstemp
import paramiko
from paramiko.pkey import PublicBlob
from paramiko.common import PY2
from paramiko.ssh_exception import SSHException, AuthenticationException
+from .util import _support, slow
+
+
+requires_gss_auth = unittest.skipUnless(
+ paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available"
+)
FINGERPRINTS = {
'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c',
@@ -131,10 +136,10 @@ class SSHClientTest (unittest.TestCase):
allowed_keys = FINGERPRINTS.keys()
self.socks, addr = self.sockl.accept()
self.ts = paramiko.Transport(self.socks)
- keypath = test_path('test_rsa.key')
+ keypath = _support('test_rsa.key')
host_key = paramiko.RSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
- keypath = test_path('test_ecdsa_256.key')
+ keypath = _support('test_ecdsa_256.key')
host_key = paramiko.ECDSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob)
@@ -154,7 +159,7 @@ class SSHClientTest (unittest.TestCase):
run_kwargs[key] = kwargs.pop(key, None)
# Server setup
threading.Thread(target=self._run, kwargs=run_kwargs).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
# Client setup
@@ -200,22 +205,22 @@ class SSHClientTest (unittest.TestCase):
"""
verify that SSHClient works with a DSA key.
"""
- self._test_connection(key_filename=test_path('test_dss.key'))
+ self._test_connection(key_filename=_support('test_dss.key'))
def test_client_rsa(self):
"""
verify that SSHClient works with an RSA key.
"""
- self._test_connection(key_filename=test_path('test_rsa.key'))
+ self._test_connection(key_filename=_support('test_rsa.key'))
def test_2_5_client_ecdsa(self):
"""
verify that SSHClient works with an ECDSA key.
"""
- self._test_connection(key_filename=test_path('test_ecdsa_256.key'))
+ self._test_connection(key_filename=_support('test_ecdsa_256.key'))
def test_client_ed25519(self):
- self._test_connection(key_filename=test_path('test_ed25519.key'))
+ self._test_connection(key_filename=_support('test_ed25519.key'))
def test_3_multiple_key_files(self):
"""
@@ -238,7 +243,7 @@ class SSHClientTest (unittest.TestCase):
try:
self._test_connection(
key_filename=[
- test_path('test_{0}.key'.format(x)) for x in attempt
+ _support('test_{}.key'.format(x)) for x in attempt
],
allowed_keys=[types_[x] for x in accept],
)
@@ -256,7 +261,7 @@ class SSHClientTest (unittest.TestCase):
# various platforms trigger different errors here >_<
self.assertRaises(SSHException,
self._test_connection,
- key_filename=[test_path('test_rsa.key')],
+ key_filename=[_support('test_rsa.key')],
allowed_keys=['ecdsa-sha2-nistp256'],
)
@@ -266,8 +271,8 @@ class SSHClientTest (unittest.TestCase):
# server-side behavior is 100% identical.)
# NOTE: only bothered whipping up one cert per overall class/family.
for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'):
- cert_name = 'test_{0}.key-cert.pub'.format(type_)
- cert_path = test_path(os.path.join('cert_support', cert_name))
+ cert_name = 'test_{}.key-cert.pub'.format(type_)
+ cert_path = _support(os.path.join('cert_support', cert_name))
self._test_connection(
key_filename=cert_path,
public_blob=PublicBlob.from_file(cert_path),
@@ -281,12 +286,12 @@ class SSHClientTest (unittest.TestCase):
# that a specific cert was found, along with regular authorization
# succeeding proving that the overall flow works.
for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'):
- key_name = 'test_{0}.key'.format(type_)
- key_path = test_path(os.path.join('cert_support', key_name))
+ key_name = 'test_{}.key'.format(type_)
+ key_path = _support(os.path.join('cert_support', key_name))
self._test_connection(
key_filename=key_path,
public_blob=PublicBlob.from_file(
- '{0}-cert.pub'.format(key_path)
+ '{}-cert.pub'.format(key_path)
),
)
@@ -302,7 +307,7 @@ class SSHClientTest (unittest.TestCase):
"""
threading.Thread(target=self._run).start()
hostname = '[%s]:%d' % (self.addr, self.port)
- key_file = test_path('test_ecdsa_256.key')
+ key_file = _support('test_ecdsa_256.key')
public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file)
self.tc = paramiko.SSHClient()
@@ -325,7 +330,7 @@ class SSHClientTest (unittest.TestCase):
"""
warnings.filterwarnings('ignore', 'tempnam.*')
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
fd, localname = mkstemp()
os.close(fd)
@@ -405,7 +410,7 @@ class SSHClientTest (unittest.TestCase):
"""
# Start the thread with a 1 second wait.
threading.Thread(target=self._run, kwargs={'delay': 1}).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
@@ -432,12 +437,13 @@ class SSHClientTest (unittest.TestCase):
# 'television' as per tests/test_pkey.py). NOTE: must use
# key_filename, loading the actual key here with PKey will except
# immediately; we're testing the try/except crap within Client.
- key_filename=[test_path('test_rsa_password.key')],
+ key_filename=[_support('test_rsa_password.key')],
# Actual password for default 'slowdive' user
password='pygmalion',
)
self._test_connection(**kwargs)
+ @slow
def test_9_auth_timeout(self):
"""
verify that the SSHClient has a configurable auth timeout
@@ -450,27 +456,25 @@ class SSHClientTest (unittest.TestCase):
auth_timeout=0.5,
)
+ @requires_gss_auth
def test_10_auth_trickledown_gsskex(self):
"""
Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding
"""
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
kwargs = dict(
gss_kex=True,
- key_filename=[test_path('test_rsa.key')],
+ key_filename=[_support('test_rsa.key')],
)
self._test_connection(**kwargs)
+ @requires_gss_auth
def test_11_auth_trickledown_gssauth(self):
"""
Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
"""
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
kwargs = dict(
gss_auth=True,
- key_filename=[test_path('test_rsa.key')],
+ key_filename=[_support('test_rsa.key')],
)
self._test_connection(**kwargs)
@@ -489,14 +493,13 @@ class SSHClientTest (unittest.TestCase):
password='pygmalion', **self.connect_kwargs
)
+ @requires_gss_auth
def test_13_reject_policy_gsskex(self):
"""
verify that SSHClient's RejectPolicy works,
even if gssapi-keyex was enabled but not used.
"""
# Test for a bug present in paramiko versions released before 2017-08-01
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
threading.Thread(target=self._run).start()
self.tc = paramiko.SSHClient()
@@ -532,7 +535,7 @@ class SSHClientTest (unittest.TestCase):
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
- host_key = ktype.from_private_key_file(test_path(kfile))
+ host_key = ktype.from_private_key_file(_support(kfile))
known_hosts = self.tc.get_host_keys()
known_hosts.add(hostname, host_key.get_name(), host_key)
@@ -556,10 +559,7 @@ class SSHClientTest (unittest.TestCase):
def test_host_key_negotiation_4(self):
self._client_host_key_good(paramiko.RSAKey, 'test_rsa.key')
- def test_update_environment(self):
- """
- Verify that environment variables can be set by the client.
- """
+ def _setup_for_env(self):
threading.Thread(target=self._run).start()
self.tc = paramiko.SSHClient()
@@ -571,6 +571,11 @@ class SSHClientTest (unittest.TestCase):
self.assertTrue(self.event.isSet())
self.assertTrue(self.ts.is_active())
+ def test_update_environment(self):
+ """
+ Verify that environment variables can be set by the client.
+ """
+ self._setup_for_env()
target_env = {b'A': b'B', b'C': b'd'}
self.tc.exec_command('yes', environment=target_env)
@@ -578,19 +583,20 @@ class SSHClientTest (unittest.TestCase):
self.assertEqual(target_env, getattr(schan, 'env', {}))
schan.close()
- # Cannot use assertRaises in context manager mode as it is not supported
- # in Python 2.6.
- try:
+ @unittest.skip("Clients normally fail silently, thus so do we, for now")
+ def test_env_update_failures(self):
+ self._setup_for_env()
+ with self.assertRaises(SSHException) as manager:
# Verify that a rejection by the server can be detected
self.tc.exec_command('yes', environment={b'INVALID_ENV': b''})
- except SSHException as e:
- self.assertTrue('INVALID_ENV' in str(e),
- 'Expected variable name in error message')
- self.assertTrue(isinstance(e.args[1], SSHException),
- 'Expected original SSHException in exception')
- else:
- self.assertFalse(False, 'SSHException was not thrown.')
-
+ self.assertTrue(
+ 'INVALID_ENV' in str(manager.exception),
+ 'Expected variable name in error message'
+ )
+ self.assertTrue(
+ isinstance(manager.exception.args[1], SSHException),
+ 'Expected original SSHException in exception'
+ )
def test_missing_key_policy_accepts_classes_or_instances(self):
"""