summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--.travis.yml9
-rw-r--r--MANIFEST.in2
-rw-r--r--README.rst5
-rw-r--r--dev-requirements.txt11
-rw-r--r--paramiko/__init__.py13
-rw-r--r--paramiko/_version.py2
-rw-r--r--paramiko/auth_handler.py93
-rw-r--r--paramiko/ber.py8
-rw-r--r--paramiko/channel.py32
-rw-r--r--paramiko/client.py71
-rw-r--r--paramiko/config.py4
-rw-r--r--paramiko/ecdsakey.py8
-rw-r--r--paramiko/file.py2
-rw-r--r--paramiko/hostkeys.py15
-rw-r--r--paramiko/kex_ecdh_nist.py4
-rw-r--r--paramiko/kex_gex.py17
-rw-r--r--paramiko/kex_group1.py3
-rw-r--r--paramiko/kex_gss.py35
-rw-r--r--paramiko/message.py4
-rw-r--r--paramiko/packet.py27
-rw-r--r--paramiko/pkey.py20
-rw-r--r--paramiko/primes.py2
-rw-r--r--paramiko/py3compat.py8
-rw-r--r--paramiko/server.py7
-rw-r--r--paramiko/sftp_attr.py16
-rw-r--r--paramiko/sftp_client.py58
-rw-r--r--paramiko/sftp_file.py15
-rw-r--r--paramiko/sftp_server.py8
-rw-r--r--paramiko/ssh_exception.py6
-rw-r--r--paramiko/ssh_gss.py4
-rw-r--r--paramiko/transport.py112
-rw-r--r--paramiko/util.py13
-rw-r--r--paramiko/win_pageant.py2
-rw-r--r--setup.cfg5
-rw-r--r--setup.py3
-rw-r--r--sites/shared_conf.py4
-rw-r--r--sites/www/changelog.rst22
-rw-r--r--sites/www/index.rst8
-rw-r--r--sites/www/installing.rst4
-rw-r--r--tasks.py38
-rwxr-xr-xtest.py196
-rw-r--r--tests/__init__.py37
-rw-r--r--tests/conftest.py103
-rw-r--r--tests/loop.py6
-rw-r--r--tests/stub_sftp.py1
-rw-r--r--tests/test_auth.py13
-rw-r--r--tests/test_buffered_pipe.py1
-rw-r--r--tests/test_client.py189
-rw-r--r--[-rwxr-xr-x]tests/test_file.py8
-rw-r--r--tests/test_gssapi.py33
-rw-r--r--tests/test_hostkeys.py1
-rw-r--r--tests/test_kex.py5
-rw-r--r--tests/test_kex_gss.py3
-rw-r--r--tests/test_message.py1
-rw-r--r--tests/test_packetizer.py5
-rw-r--r--tests/test_pkey.py67
-rw-r--r--[-rwxr-xr-x]tests/test_sftp.py656
-rw-r--r--tests/test_sftp_big.py163
-rw-r--r--tests/test_ssh_gss.py32
-rw-r--r--tests/test_transport.py22
-rw-r--r--tests/test_util.py11
-rw-r--r--tests/util.py29
63 files changed, 1129 insertions, 1175 deletions
diff --git a/.gitignore b/.gitignore
index 44b45974..4345d86c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,9 +3,9 @@ build/
dist/
.tox/
paramiko.egg-info/
-test.log
docs/
demos/*.log
!sites/docs
_build
.coverage
+.cache
diff --git a/.travis.yml b/.travis.yml
index 2819eb20..73d73484 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -4,9 +4,7 @@ cache:
directories:
- $HOME/.cache/pip
python:
- - "2.6"
- "2.7"
- - "3.3"
- "3.4"
- "3.5"
- "3.6"
@@ -17,16 +15,17 @@ matrix:
- python: "3.7-dev"
- python: "pypy-5.6.0"
install:
- # Ensure modern pip/etc on Python 3.3 workers (not sure WTF, but, eh)
+ # Ensure modern pip/etc to avoid some issues w/ older worker environs
- pip install pip==9.0.1 setuptools==36.6.0
# Self-install for setup.py-driven deps
- pip install -e .
# Dev (doc/test running) requirements
+ # TODO: use pipenv + whatever contexty-type stuff it has
- pip install codecov # For codecov specifically
- pip install -r dev-requirements.txt
script:
- # Main tests, w/ coverage!
- - inv test --coverage
+ # All (including slow) tests, w/ coverage!
+ - inv coverage
# Ensure documentation builds, both sites, maxxed nitpicking
- inv sites
# flake8 is now possible!
diff --git a/MANIFEST.in b/MANIFEST.in
index e718ea24..1eec2054 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -1,4 +1,4 @@
-include LICENSE test.py setup_helper.py
+include LICENSE setup_helper.py
recursive-include docs *
recursive-include tests *.py *.key
recursive-include demos *.py *.key user_rsa_key user_rsa_key.pub
diff --git a/README.rst b/README.rst
index 399dceb9..6e49bd68 100644
--- a/README.rst
+++ b/README.rst
@@ -22,7 +22,7 @@ What
----
"Paramiko" is a combination of the Esperanto words for "paranoid" and
-"friend". It's a module for Python 2.6+/3.3+ that implements the SSH2 protocol
+"friend". It's a module for Python 2.7/3.4+ that implements the SSH2 protocol
for secure (encrypted and authenticated) connections to remote machines. Unlike
SSL (aka TLS), SSH2 protocol does not require hierarchical certificates signed
by a powerful central authority. You may know SSH2 as the protocol that
@@ -132,6 +132,7 @@ doc/ folder.
There are also unit tests here::
- $ python ./test.py
+ $ pip install -r dev-requirements.txt
+ $ pytest
Which will verify that most of the core components are working correctly.
diff --git a/dev-requirements.txt b/dev-requirements.txt
index 2cb0d768..00ff1c6e 100644
--- a/dev-requirements.txt
+++ b/dev-requirements.txt
@@ -1,12 +1,11 @@
-# Older junk
-tox>=1.4,<1.5
-# For newer tasks like building Sphinx docs.
-invoke>=0.13,<=0.21.0
-invocations>=0.13,<=0.20.0
+invoke>=0.13,<2.0
+invocations>=0.13,<2.0
sphinx>=1.1.3,<1.5
alabaster>=0.7.5,<2.0
-releases>=1.1.0,<1.4.0
+releases>=1.1.0,<2.0
semantic_version<3.0
wheel==0.24
twine==1.9.1
flake8==2.6.2
+pytest==3.2.1
+pytest_relaxed==1.0.0
diff --git a/paramiko/__init__.py b/paramiko/__init__.py
index bdc91d51..c4c69a45 100644
--- a/paramiko/__init__.py
+++ b/paramiko/__init__.py
@@ -19,15 +19,6 @@
# flake8: noqa
import sys
from paramiko._version import __version__, __version_info__
-
-if sys.version_info < (2, 6):
- raise RuntimeError('You need Python 2.6+ for this module.')
-
-
-__author__ = "Jeff Forcier <jeff@bitprophet.org>"
-__license__ = "GNU Lesser General Public License (LGPL)"
-
-
from paramiko.transport import SecurityOptions, Transport
from paramiko.client import (
SSHClient, MissingHostKeyPolicy, AutoAddPolicy, RejectPolicy,
@@ -76,6 +67,10 @@ from paramiko.sftp import (
from paramiko.common import io_sleep
+
+__author__ = "Jeff Forcier <jeff@bitprophet.org>"
+__license__ = "GNU Lesser General Public License (LGPL)"
+
__all__ = [
'Transport',
'SSHClient',
diff --git a/paramiko/_version.py b/paramiko/_version.py
index aee80ab4..c1e20464 100644
--- a/paramiko/_version.py
+++ b/paramiko/_version.py
@@ -1,2 +1,2 @@
-__version_info__ = (2, 3, 2)
+__version_info__ = (2, 4, 0)
__version__ = '.'.join(map(str, __version_info__))
diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py
index d2dab3d8..a1ce5e3b 100644
--- a/paramiko/auth_handler.py
+++ b/paramiko/auth_handler.py
@@ -71,6 +71,9 @@ class AuthHandler (object):
self.gss_host = None
self.gss_deleg_creds = True
+ def _log(self, *args):
+ return self.transport._log(*args)
+
def is_authenticated(self):
return self.authenticated
@@ -245,7 +248,7 @@ class AuthHandler (object):
def _parse_service_accept(self, m):
service = m.get_text()
if service == 'ssh-userauth':
- self.transport._log(DEBUG, 'userauth is OK')
+ self._log(DEBUG, 'userauth is OK')
m = Message()
m.add_byte(cMSG_USERAUTH_REQUEST)
m.add_string(self.username)
@@ -320,7 +323,8 @@ class AuthHandler (object):
self.transport.send_message(m)
else:
raise SSHException(
- "Received Package: %s" % MSG_NAMES[ptype])
+ "Received Package: {}".format(MSG_NAMES[ptype])
+ )
m = Message()
m.add_byte(cMSG_USERAUTH_GSSAPI_MIC)
# send the MIC to the server
@@ -336,17 +340,17 @@ class AuthHandler (object):
min_status = m.get_int()
err_msg = m.get_string()
m.get_string() # Lang tag - discarded
- raise SSHException("GSS-API Error:\nMajor Status: %s\n\
- Minor Status: %s\ \nError Message:\
- %s\n") % (str(maj_status),
- str(min_status),
- err_msg)
+ raise SSHException("""GSS-API Error:
+Major Status: {}
+Minor Status: {}
+Error Message: {}
+""".format(maj_status, min_status, err_msg))
elif ptype == MSG_USERAUTH_FAILURE:
self._parse_userauth_failure(m)
return
else:
raise SSHException(
- "Received Package: %s" % MSG_NAMES[ptype])
+ "Received Package: {}".format(MSG_NAMES[ptype]))
elif (
self.auth_method == 'gssapi-keyex' and
self.transport.gss_kex_used
@@ -359,22 +363,22 @@ class AuthHandler (object):
pass
else:
raise SSHException(
- 'Unknown auth method "%s"' % self.auth_method)
+ 'Unknown auth method "{}"'.format(self.auth_method))
self.transport._send_message(m)
else:
- self.transport._log(
+ self._log(
DEBUG,
- 'Service request "%s" accepted (?)' % service)
+ 'Service request "{}" accepted (?)'.format(service))
def _send_auth_result(self, username, method, result):
# okay, send result
m = Message()
if result == AUTH_SUCCESSFUL:
- self.transport._log(INFO, 'Auth granted (%s).' % method)
+ self._log(INFO, 'Auth granted ({}).'.format(method))
m.add_byte(cMSG_USERAUTH_SUCCESS)
self.authenticated = True
else:
- self.transport._log(INFO, 'Auth rejected (%s).' % method)
+ self._log(INFO, 'Auth rejected ({}).'.format(method))
m.add_byte(cMSG_USERAUTH_FAILURE)
m.add_string(
self.transport.server_object.get_allowed_auths(username))
@@ -417,16 +421,18 @@ class AuthHandler (object):
username = m.get_text()
service = m.get_text()
method = m.get_text()
- self.transport._log(
+ self._log(
DEBUG,
- 'Auth request (type=%s) service=%s, username=%s' % (
- method, service, username))
+ 'Auth request (type={}) service={}, username={}'.format(
+ method, service, username
+ )
+ )
if service != 'ssh-connection':
self._disconnect_service_not_available()
return
if ((self.auth_username is not None) and
(self.auth_username != username)):
- self.transport._log(
+ self._log(
WARNING,
'Auth rejected because the client attempted to change username in mid-flight' # noqa
)
@@ -451,7 +457,7 @@ class AuthHandler (object):
# always treated as failure, since we don't support changing
# passwords, but collect the list of valid auth types from
# the callback anyway
- self.transport._log(
+ self._log(
DEBUG,
'Auth request to change passwords (rejected)')
newpassword = m.get_binary()
@@ -470,13 +476,13 @@ class AuthHandler (object):
try:
key = self.transport._key_info[keytype](Message(keyblob))
except SSHException as e:
- self.transport._log(
+ self._log(
INFO,
- 'Auth rejected: public key: %s' % str(e))
+ 'Auth rejected: public key: {}'.format(str(e)))
key = None
except Exception as e:
- msg = 'Auth rejected: unsupported or mangled public key ({0}: {1})' # noqa
- self.transport._log(INFO, msg.format(e.__class__.__name__, e))
+ msg = 'Auth rejected: unsupported or mangled public key ({}: {})' # noqa
+ self._log(INFO, msg.format(e.__class__.__name__, e))
key = None
if key is None:
self._disconnect_no_more_auth()
@@ -498,7 +504,7 @@ class AuthHandler (object):
sig = Message(m.get_binary())
blob = self._get_session_blob(key, service, username)
if not key.verify_ssh_sig(blob, sig):
- self.transport._log(
+ self._log(
INFO,
'Auth rejected: invalid signature')
result = AUTH_FAILED
@@ -519,7 +525,7 @@ class AuthHandler (object):
# We can't accept more than one OID, so if the SSH client sends
# more than one, disconnect.
if mechs > 1:
- self.transport._log(
+ self._log(
INFO,
'Disconnect: Received more than one GSS-API OID mechanism')
self._disconnect_no_more_auth()
@@ -527,7 +533,7 @@ class AuthHandler (object):
mech_ok = sshgss.ssh_check_mech(desired_mech)
# if we don't support the mechanism, disconnect.
if not mech_ok:
- self.transport._log(
+ self._log(
INFO,
'Disconnect: Received an invalid GSS-API OID mechanism')
self._disconnect_no_more_auth()
@@ -569,9 +575,9 @@ class AuthHandler (object):
self._send_auth_result(username, method, result)
def _parse_userauth_success(self, m):
- self.transport._log(
+ self._log(
INFO,
- 'Authentication (%s) successful!' % self.auth_method)
+ 'Authentication ({}) successful!'.format(self.auth_method))
self.authenticated = True
self.transport._auth_trigger()
if self.auth_event is not None:
@@ -581,22 +587,25 @@ class AuthHandler (object):
authlist = m.get_list()
partial = m.get_boolean()
if partial:
- self.transport._log(INFO, 'Authentication continues...')
- self.transport._log(DEBUG, 'Methods: ' + str(authlist))
+ self._log(INFO, 'Authentication continues...')
+ self._log(DEBUG, 'Methods: ' + str(authlist))
self.transport.saved_exception = PartialAuthentication(authlist)
elif self.auth_method not in authlist:
- self.transport._log(
- DEBUG,
- 'Authentication type (%s) not permitted.' % self.auth_method)
- self.transport._log(
- DEBUG,
- 'Allowed methods: ' + str(authlist))
+ for msg in (
+ 'Authentication type ({}) not permitted.'.format(
+ self.auth_method
+ ),
+ 'Allowed methods: {}'.format(authlist),
+ ):
+ self._log(DEBUG, msg)
self.transport.saved_exception = BadAuthenticationType(
- 'Bad authentication type', authlist)
+ 'Bad authentication type', authlist
+ )
else:
- self.transport._log(
+ self._log(
INFO,
- 'Authentication (%s) failed.' % self.auth_method)
+ 'Authentication ({}) failed.'.format(self.auth_method)
+ )
self.authenticated = False
self.username = None
if self.auth_event is not None:
@@ -605,7 +614,7 @@ class AuthHandler (object):
def _parse_userauth_banner(self, m):
banner = m.get_string()
self.banner = banner
- self.transport._log(INFO, 'Auth banner: %s' % banner)
+ self._log(INFO, 'Auth banner: {}'.format(banner))
# who cares.
def _parse_userauth_info_request(self, m):
@@ -646,9 +655,9 @@ class AuthHandler (object):
def _handle_local_gss_failure(self, e):
self.transport.saved_exception = e
- self.transport._log(DEBUG, "GSSAPI failure: %s" % str(e))
- self.transport._log(INFO, 'Authentication (%s) failed.' %
- self.auth_method)
+ self._log(DEBUG, "GSSAPI failure: {}".format(e))
+ self._log(INFO, 'Authentication ({}) failed.'.format(
+ self.auth_method))
self.authenticated = False
self.username = None
if self.auth_event is not None:
diff --git a/paramiko/ber.py b/paramiko/ber.py
index 7725f944..876347e0 100644
--- a/paramiko/ber.py
+++ b/paramiko/ber.py
@@ -88,8 +88,8 @@ class BER(object):
return util.inflate_long(data)
else:
# 1: boolean (00 false, otherwise true)
- raise BERException(
- 'Unknown ber encoding type %d (robey is lazy)' % ident)
+ msg = 'Unknown ber encoding type {:d} (robey is lazy)'
+ raise BERException(msg.format(ident))
@staticmethod
def decode_sequence(data):
@@ -125,7 +125,9 @@ class BER(object):
elif (type(x) is list) or (type(x) is tuple):
self.encode_tlv(0x30, self.encode_sequence(x))
else:
- raise BERException('Unknown type for encoding: %s' % repr(type(x)))
+ raise BERException(
+ 'Unknown type for encoding: {!r}'.format(type(x))
+ )
@staticmethod
def encode_sequence(data):
diff --git a/paramiko/channel.py b/paramiko/channel.py
index c6016a0e..91a8f0df 100644
--- a/paramiko/channel.py
+++ b/paramiko/channel.py
@@ -135,7 +135,7 @@ class Channel (ClosingContextManager):
"""
Return a string representation of this object, for debugging.
"""
- out = '<paramiko.Channel %d' % self.chanid
+ out = '<paramiko.Channel {}'.format(self.chanid)
if self.closed:
out += ' (closed)'
elif self.active:
@@ -143,9 +143,9 @@ class Channel (ClosingContextManager):
out += ' (EOF received)'
if self.eof_sent:
out += ' (EOF sent)'
- out += ' (open) window=%d' % self.out_window_size
+ out += ' (open) window={}'.format(self.out_window_size)
if len(self.in_buffer) > 0:
- out += ' in-buffer=%d' % (len(self.in_buffer),)
+ out += ' in-buffer={}'.format(len(self.in_buffer))
out += ' -> ' + repr(self.transport)
out += '>'
return out
@@ -315,7 +315,7 @@ class Channel (ClosingContextManager):
try:
self.set_environment_variable(name, value)
except SSHException as e:
- err = "Failed to set environment variable \"{0}\"."
+ err = "Failed to set environment variable \"{}\"."
raise SSHException(err.format(name), e)
@open_only
@@ -976,7 +976,7 @@ class Channel (ClosingContextManager):
# a window update
self.in_window_threshold = window_size // 10
self.in_window_sofar = 0
- self._log(DEBUG, 'Max packet in: %d bytes' % max_packet_size)
+ self._log(DEBUG, 'Max packet in: {} bytes'.format(max_packet_size))
def _set_remote_channel(self, chanid, window_size, max_packet_size):
self.remote_chanid = chanid
@@ -985,10 +985,11 @@ class Channel (ClosingContextManager):
max_packet_size
)
self.active = 1
- self._log(DEBUG, 'Max packet out: %d bytes' % self.out_max_packet_size)
+ self._log(DEBUG,
+ 'Max packet out: {} bytes'.format(self.out_max_packet_size))
def _request_success(self, m):
- self._log(DEBUG, 'Sesch channel %d request ok' % self.chanid)
+ self._log(DEBUG, 'Sesch channel {} request ok'.format(self.chanid))
self.event_ready = True
self.event.set()
return
@@ -1017,7 +1018,7 @@ class Channel (ClosingContextManager):
if code != 1:
self._log(
ERROR,
- 'unknown extended_data type %d; discarding' % code
+ 'unknown extended_data type {}; discarding'.format(code)
)
return
if self.combine_stderr:
@@ -1030,7 +1031,7 @@ class Channel (ClosingContextManager):
self.lock.acquire()
try:
if self.ultra_debug:
- self._log(DEBUG, 'window up %d' % nbytes)
+ self._log(DEBUG, 'window up {}'.format(nbytes))
self.out_window_size += nbytes
self.out_buffer_cv.notifyAll()
finally:
@@ -1122,7 +1123,7 @@ class Channel (ClosingContextManager):
else:
ok = server.check_channel_forward_agent_request(self)
else:
- self._log(DEBUG, 'Unhandled channel request "%s"' % key)
+ self._log(DEBUG, 'Unhandled channel request "{}"'.format(key))
ok = False
if want_reply:
m = Message()
@@ -1144,7 +1145,7 @@ class Channel (ClosingContextManager):
self._pipe.set_forever()
finally:
self.lock.release()
- self._log(DEBUG, 'EOF received (%s)', self._name)
+ self._log(DEBUG, 'EOF received ({})'.format(self._name))
def _handle_close(self, m):
self.lock.acquire()
@@ -1216,7 +1217,7 @@ class Channel (ClosingContextManager):
m.add_byte(cMSG_CHANNEL_EOF)
m.add_int(self.remote_chanid)
self.eof_sent = True
- self._log(DEBUG, 'EOF sent (%s)', self._name)
+ self._log(DEBUG, 'EOF sent ({})'.format(self._name))
return m
def _close_internal(self):
@@ -1250,12 +1251,13 @@ class Channel (ClosingContextManager):
if self.closed or self.eof_received or not self.active:
return 0
if self.ultra_debug:
- self._log(DEBUG, 'addwindow %d' % n)
+ self._log(DEBUG, 'addwindow {}'.format(n))
self.in_window_sofar += n
if self.in_window_sofar <= self.in_window_threshold:
return 0
if self.ultra_debug:
- self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar)
+ self._log(DEBUG,
+ 'addwindow send {}'.format(self.in_window_sofar))
out = self.in_window_sofar
self.in_window_sofar = 0
return out
@@ -1298,7 +1300,7 @@ class Channel (ClosingContextManager):
size = self.out_max_packet_size - 64
self.out_window_size -= size
if self.ultra_debug:
- self._log(DEBUG, 'window down to %d' % self.out_window_size)
+ self._log(DEBUG, 'window down to {}'.format(self.out_window_size))
return size
diff --git a/paramiko/client.py b/paramiko/client.py
index 75d295ea..6f0cb847 100644
--- a/paramiko/client.py
+++ b/paramiko/client.py
@@ -143,8 +143,9 @@ class SSHClient (ClosingContextManager):
with open(filename, 'w') as f:
for hostname, keys in self._host_keys.items():
for keytype, key in keys.items():
- f.write('%s %s %s\n' % (
- hostname, keytype, key.get_base64()))
+ f.write('{} {} {}\n'.format(
+ hostname, keytype, key.get_base64()
+ ))
def get_host_keys(self):
"""
@@ -229,6 +230,7 @@ class SSHClient (ClosingContextManager):
banner_timeout=None,
auth_timeout=None,
gss_trust_dns=True,
+ passphrase=None,
):
"""
Connect to an SSH server and authenticate to it. The server's host key
@@ -269,7 +271,10 @@ class SSHClient (ClosingContextManager):
the username to authenticate as (defaults to the current local
username)
:param str password:
- a password to use for authentication or for unlocking a private key
+ Used for password authentication; is also used for private key
+ decryption if ``passphrase`` is not given.
+ :param str passphrase:
+ Used for decrypting private keys.
:param .PKey pkey: an optional private key to use for authentication
:param str key_filename:
the filename, or list of filenames, of optional private key(s)
@@ -315,6 +320,8 @@ class SSHClient (ClosingContextManager):
``gss_deleg_creds`` and ``gss_host`` arguments.
.. versionchanged:: 2.3
Added the ``gss_trust_dns`` argument.
+ .. versionchanged:: 2.4
+ Added the ``passphrase`` argument.
"""
if not sock:
errors = {}
@@ -370,7 +377,7 @@ class SSHClient (ClosingContextManager):
if port == SSH_PORT:
server_hostkey_name = hostname
else:
- server_hostkey_name = "[%s]:%d" % (hostname, port)
+ server_hostkey_name = "[{}]:{}".format(hostname, port)
our_server_keys = None
our_server_keys = self._system_host_keys.get(server_hostkey_name)
@@ -414,6 +421,7 @@ class SSHClient (ClosingContextManager):
self._auth(
username, password, pkey, key_filenames, allow_agent,
look_for_keys, gss_auth, gss_kex, gss_deleg_creds, t.gss_host,
+ passphrase,
)
def close(self):
@@ -544,18 +552,21 @@ class SSHClient (ClosingContextManager):
# TODO: change this to 'Loading' instead of 'Trying' sometime; probably
# when #387 is released, since this is a critical log message users are
# likely testing/filtering for (bah.)
- msg = "Trying discovered key {0} in {1}".format(
+ msg = "Trying discovered key {} in {}".format(
hexlify(key.get_fingerprint()), key_path,
)
self._log(DEBUG, msg)
# Attempt to load cert if it exists.
if os.path.isfile(cert_path):
key.load_certificate(cert_path)
- self._log(DEBUG, "Adding public certificate {0}".format(cert_path))
+ self._log(DEBUG, "Adding public certificate {}".format(cert_path))
return key
- def _auth(self, username, password, pkey, key_filenames, allow_agent,
- look_for_keys, gss_auth, gss_kex, gss_deleg_creds, gss_host):
+ def _auth(
+ self, username, password, pkey, key_filenames, allow_agent,
+ look_for_keys, gss_auth, gss_kex, gss_deleg_creds, gss_host,
+ passphrase,
+ ):
"""
Try, in order:
@@ -565,13 +576,16 @@ class SSHClient (ClosingContextManager):
(if allowed).
- Plain username/password auth, if a password was given.
- (The password might be needed to unlock a private key, or for
- two-factor authentication [for which it is required].)
+ (The password might be needed to unlock a private key [if 'passphrase'
+ isn't also given], or for two-factor authentication [for which it is
+ required].)
"""
saved_exception = None
two_factor = False
allowed_types = set()
- two_factor_types = set(['keyboard-interactive', 'password'])
+ two_factor_types = {'keyboard-interactive', 'password'}
+ if passphrase is None and password is not None:
+ passphrase = password
# If GSS-API support and GSS-PI Key Exchange was performed, we attempt
# authentication with gssapi-keyex.
@@ -598,7 +612,8 @@ class SSHClient (ClosingContextManager):
try:
self._log(
DEBUG,
- 'Trying SSH key %s' % hexlify(pkey.get_fingerprint()))
+ 'Trying SSH key {}'.format(hexlify(pkey.get_fingerprint()))
+ )
allowed_types = set(
self._transport.auth_publickey(username, pkey))
two_factor = (allowed_types & two_factor_types)
@@ -612,7 +627,7 @@ class SSHClient (ClosingContextManager):
for pkey_class in (RSAKey, DSSKey, ECDSAKey, Ed25519Key):
try:
key = self._key_from_filepath(
- key_filename, pkey_class, password,
+ key_filename, pkey_class, passphrase,
)
allowed_types = set(
self._transport.auth_publickey(username, key))
@@ -629,10 +644,8 @@ class SSHClient (ClosingContextManager):
for key in self._agent.get_keys():
try:
- self._log(
- DEBUG,
- 'Trying SSH agent key %s' % hexlify(
- key.get_fingerprint()))
+ id_ = hexlify(key.get_fingerprint())
+ self._log(DEBUG, 'Trying SSH agent key {}'.format(id_))
# for 2-factor auth a successfully auth'd key password
# will return an allowed 2fac auth method
allowed_types = set(
@@ -656,7 +669,7 @@ class SSHClient (ClosingContextManager):
# ~/ssh/ is for windows
for directory in [".ssh", "ssh"]:
full_path = os.path.expanduser(
- "~/%s/id_%s" % (directory, name)
+ "~/{}/id_{}".format(directory, name)
)
if os.path.isfile(full_path):
# TODO: only do this append if below did not run
@@ -670,7 +683,7 @@ class SSHClient (ClosingContextManager):
for pkey_class, filename in keyfiles:
try:
key = self._key_from_filepath(
- filename, pkey_class, password,
+ filename, pkey_class, passphrase,
)
# for 2-factor auth a successfully auth'd key will result
# in ['password']
@@ -736,8 +749,9 @@ class AutoAddPolicy (MissingHostKeyPolicy):
client._host_keys.add(hostname, key.get_name(), key)
if client._host_keys_filename is not None:
client.save_host_keys(client._host_keys_filename)
- client._log(DEBUG, 'Adding %s host key for %s: %s' %
- (key.get_name(), hostname, hexlify(key.get_fingerprint())))
+ client._log(DEBUG, 'Adding {} host key for {}: {}'.format(
+ key.get_name(), hostname, hexlify(key.get_fingerprint()),
+ ))
class RejectPolicy (MissingHostKeyPolicy):
@@ -747,9 +761,12 @@ class RejectPolicy (MissingHostKeyPolicy):
"""
def missing_host_key(self, client, hostname, key):
- client._log(DEBUG, 'Rejecting %s host key for %s: %s' %
- (key.get_name(), hostname, hexlify(key.get_fingerprint())))
- raise SSHException('Server %r not found in known_hosts' % hostname)
+ client._log(DEBUG, 'Rejecting {} host key for {}: {}'.format(
+ key.get_name(), hostname, hexlify(key.get_fingerprint()),
+ ))
+ raise SSHException(
+ 'Server {!r} not found in known_hosts'.format(hostname)
+ )
class WarningPolicy (MissingHostKeyPolicy):
@@ -758,6 +775,6 @@ class WarningPolicy (MissingHostKeyPolicy):
accepting it. This is used by `.SSHClient`.
"""
def missing_host_key(self, client, hostname, key):
- warnings.warn('Unknown %s host key for %s: %s' %
- (key.get_name(), hostname, hexlify(
- key.get_fingerprint())))
+ warnings.warn('Unknown {} host key for {}: {}'.format(
+ key.get_name(), hostname, hexlify(key.get_fingerprint()),
+ ))
diff --git a/paramiko/config.py b/paramiko/config.py
index 073abb36..038d84ea 100644
--- a/paramiko/config.py
+++ b/paramiko/config.py
@@ -65,7 +65,7 @@ class SSHConfig (object):
match = re.match(self.SETTINGS_REGEX, line)
if not match:
- raise Exception("Unparsable line %s" % line)
+ raise Exception("Unparsable line {}".format(line))
key = match.group(1).lower()
value = match.group(2)
@@ -239,7 +239,7 @@ class SSHConfig (object):
try:
return shlex.split(host)
except ValueError:
- raise Exception("Unparsable host %s" % host)
+ raise Exception("Unparsable host {}".format(host))
class LazyFqdn(object):
diff --git a/paramiko/ecdsakey.py b/paramiko/ecdsakey.py
index 1bb5676f..92e01a75 100644
--- a/paramiko/ecdsakey.py
+++ b/paramiko/ecdsakey.py
@@ -134,7 +134,7 @@ class ECDSAKey(PKey):
)
key_types = self._ECDSA_CURVES.get_key_format_identifier_list()
cert_types = [
- '{0}-cert-v01@openssh.com'.format(x)
+ '{}-cert-v01@openssh.com'.format(x)
for x in key_types
]
self._check_type_and_load_cert(
@@ -144,7 +144,9 @@ class ECDSAKey(PKey):
)
curvename = msg.get_text()
if curvename != self.ecdsa_curve.nist_name:
- raise SSHException("Can't handle curve of type %s" % curvename)
+ raise SSHException(
+ "Can't handle curve of type {}".format(curvename)
+ )
pointinfo = msg.get_binary()
try:
@@ -249,7 +251,7 @@ class ECDSAKey(PKey):
if bits is not None:
curve = cls._ECDSA_CURVES.get_by_key_length(bits)
if curve is None:
- raise ValueError("Unsupported key length: %d" % bits)
+ raise ValueError("Unsupported key length: {:d}".format(bits))
curve = curve.curve_class()
private_key = ec.generate_private_key(curve, backend=default_backend())
diff --git a/paramiko/file.py b/paramiko/file.py
index a1bdafbe..df9cdac7 100644
--- a/paramiko/file.py
+++ b/paramiko/file.py
@@ -338,7 +338,7 @@ class BufferedFile (ClosingContextManager):
after rounding up to an internal buffer size) are read.
:param int sizehint: desired maximum number of bytes to read.
- :returns: `list` of lines read from the file.
+ :returns: list of lines read from the file.
"""
lines = []
byte_count = 0
diff --git a/paramiko/hostkeys.py b/paramiko/hostkeys.py
index d023b33d..ca185273 100644
--- a/paramiko/hostkeys.py
+++ b/paramiko/hostkeys.py
@@ -300,7 +300,7 @@ class HostKeys (MutableMapping):
salt = decodebytes(b(salt))
assert len(salt) == sha1().digest_size
hmac = HMAC(salt, b(hostname), sha1).digest()
- hostkey = '|1|%s|%s' % (u(encodebytes(salt)), u(encodebytes(hmac)))
+ hostkey = '|1|{}|{}'.format(u(encodebytes(salt)), u(encodebytes(hmac)))
return hostkey.replace('\n', '')
@@ -338,8 +338,8 @@ class HostKeyEntry:
fields = line.split(' ')
if len(fields) < 3:
# Bad number of fields
- log.info("Not enough fields found in known_hosts in line %s (%r)" %
- (lineno, line))
+ msg = "Not enough fields found in known_hosts in line {} ({!r})"
+ log.info(msg.format(lineno, line))
return None
fields = fields[:3]
@@ -359,7 +359,7 @@ class HostKeyEntry:
elif keytype == 'ssh-ed25519':
key = Ed25519Key(data=decodebytes(key))
else:
- log.info("Unable to handle key of type %s" % (keytype,))
+ log.info("Unable to handle key of type {}".format(keytype))
return None
except binascii.Error as e:
@@ -374,11 +374,12 @@ class HostKeyEntry:
included.
"""
if self.valid:
- return '%s %s %s\n' % (
+ return '{} {} {}\n'.format(
','.join(self.hostnames),
self.key.get_name(),
- self.key.get_base64())
+ self.key.get_base64(),
+ )
return None
def __repr__(self):
- return '<HostKeyEntry %r: %r>' % (self.hostnames, self.key)
+ return '<HostKeyEntry {!r}: {!r}>'.format(self.hostnames, self.key)
diff --git a/paramiko/kex_ecdh_nist.py b/paramiko/kex_ecdh_nist.py
index 702a872d..4e8ff35d 100644
--- a/paramiko/kex_ecdh_nist.py
+++ b/paramiko/kex_ecdh_nist.py
@@ -45,7 +45,9 @@ class KexNistp256():
return self._parse_kexecdh_init(m)
elif not self.transport.server_mode and (ptype == _MSG_KEXECDH_REPLY):
return self._parse_kexecdh_reply(m)
- raise SSHException('KexECDH asked to handle packet type %d' % ptype)
+ raise SSHException(
+ 'KexECDH asked to handle packet type {:d}'.format(ptype)
+ )
def _generate_key_pair(self):
self.P = ec.generate_private_key(self.curve, default_backend())
diff --git a/paramiko/kex_gex.py b/paramiko/kex_gex.py
index ba45da18..44030569 100644
--- a/paramiko/kex_gex.py
+++ b/paramiko/kex_gex.py
@@ -91,8 +91,8 @@ class KexGex (object):
return self._parse_kexdh_gex_reply(m)
elif ptype == _MSG_KEXDH_GEX_REQUEST_OLD:
return self._parse_kexdh_gex_request_old(m)
- raise SSHException(
- 'KexGex %s asked to handle packet type %d' % self.name, ptype)
+ msg = "KexGex {} asked to handle packet type {:d}"
+ raise SSHException(msg.format(self.name, ptype))
# ...internals...
@@ -141,8 +141,10 @@ class KexGex (object):
'Can\'t do server-side gex with no modulus pack')
self.transport._log(
DEBUG,
- 'Picking p (%d <= %d <= %d bits)' % (
- minbits, preferredbits, maxbits))
+ 'Picking p ({} <= {} <= {} bits)'.format(
+ minbits, preferredbits, maxbits,
+ )
+ )
self.g, self.p = pack.get_modulus(minbits, preferredbits, maxbits)
m = Message()
m.add_byte(c_MSG_KEXDH_GEX_GROUP)
@@ -166,7 +168,8 @@ class KexGex (object):
raise SSHException(
'Can\'t do server-side gex with no modulus pack')
self.transport._log(
- DEBUG, 'Picking p (~ %d bits)' % (self.preferred_bits,))
+ DEBUG, 'Picking p (~ {} bits)'.format(self.preferred_bits)
+ )
self.g, self.p = pack.get_modulus(
self.min_bits, self.preferred_bits, self.max_bits)
m = Message()
@@ -185,8 +188,8 @@ class KexGex (object):
if (bitlen < 1024) or (bitlen > 8192):
raise SSHException(
'Server-generated gex p (don\'t ask) is out of range '
- '(%d bits)' % bitlen)
- self.transport._log(DEBUG, 'Got server p (%d bits)' % bitlen)
+ '({} bits)'.format(bitlen))
+ self.transport._log(DEBUG, 'Got server p ({} bits)'.format(bitlen))
self._generate_x()
# now compute e = g^x mod p
self.e = pow(self.g, self.x, self.p)
diff --git a/paramiko/kex_group1.py b/paramiko/kex_group1.py
index e8f042b1..1bebd375 100644
--- a/paramiko/kex_group1.py
+++ b/paramiko/kex_group1.py
@@ -73,7 +73,8 @@ class KexGroup1(object):
return self._parse_kexdh_init(m)
elif not self.transport.server_mode and (ptype == _MSG_KEXDH_REPLY):
return self._parse_kexdh_reply(m)
- raise SSHException('KexGroup1 asked to handle packet type %d' % ptype)
+ msg = "KexGroup1 asked to handle packet type {:d}"
+ raise SSHException(msg.format(ptype))
# ...internals...
diff --git a/paramiko/kex_gss.py b/paramiko/kex_gss.py
index a2ea9fca..e21620fe 100644
--- a/paramiko/kex_gss.py
+++ b/paramiko/kex_gss.py
@@ -120,8 +120,8 @@ class KexGSSGroup1(object):
return self._parse_kexgss_complete(m)
elif ptype == MSG_KEXGSS_ERROR:
return self._parse_kexgss_error(m)
- raise SSHException('GSS KexGroup1 asked to handle packet type %d'
- % ptype)
+ msg = 'GSS KexGroup1 asked to handle packet type {:d}'
+ raise SSHException(msg.format(ptype))
# ## internals...
@@ -282,10 +282,11 @@ class KexGSSGroup1(object):
min_status = m.get_int()
err_msg = m.get_string()
m.get_string() # we don't care about the language!
- raise SSHException("GSS-API Error:\nMajor Status: %s\nMinor Status: %s\
- \nError Message: %s\n") % (str(maj_status),
- str(min_status),
- err_msg)
+ raise SSHException("""GSS-API Error:
+Major Status: {}
+Minor Status: {}
+Error Message: {}
+""".format(maj_status, min_status, err_msg))
class KexGSSGroup14(KexGSSGroup1):
@@ -361,7 +362,8 @@ class KexGSSGex(object):
return self._parse_kexgss_complete(m)
elif ptype == MSG_KEXGSS_ERROR:
return self._parse_kexgss_error(m)
- raise SSHException('KexGex asked to handle packet type %d' % ptype)
+ msg = 'KexGex asked to handle packet type {:d}'
+ raise SSHException(msg.format(ptype))
# ## internals...
@@ -416,8 +418,10 @@ class KexGSSGex(object):
'Can\'t do server-side gex with no modulus pack')
self.transport._log(
DEBUG, # noqa
- 'Picking p (%d <= %d <= %d bits)' % (
- minbits, preferredbits, maxbits))
+ 'Picking p ({} <= {} <= {} bits)'.format(
+ minbits, preferredbits, maxbits,
+ )
+ )
self.g, self.p = pack.get_modulus(minbits, preferredbits, maxbits)
m = Message()
m.add_byte(c_MSG_KEXGSS_GROUP)
@@ -439,8 +443,8 @@ class KexGSSGex(object):
if (bitlen < 1024) or (bitlen > 8192):
raise SSHException(
'Server-generated gex p (don\'t ask) is out of range '
- '(%d bits)' % bitlen)
- self.transport._log(DEBUG, 'Got server p (%d bits)' % bitlen) # noqa
+ '({} bits)'.format(bitlen))
+ self.transport._log(DEBUG, 'Got server p ({} bits)'.format(bitlen)) # noqa
self._generate_x()
# now compute e = g^x mod p
self.e = pow(self.g, self.x, self.p)
@@ -603,10 +607,11 @@ class KexGSSGex(object):
min_status = m.get_int()
err_msg = m.get_string()
m.get_string() # we don't care about the language (lang_tag)!
- raise SSHException("GSS-API Error:\nMajor Status: %s\nMinor Status: %s\
- \nError Message: %s\n") % (str(maj_status),
- str(min_status),
- err_msg)
+ raise SSHException("""GSS-API Error:
+Major Status: {}
+Minor Status: {}
+Error Message: {}
+""".format(maj_status, min_status, err_msg))
class NullHostKey(object):
diff --git a/paramiko/message.py b/paramiko/message.py
index f8ed6170..9af841da 100644
--- a/paramiko/message.py
+++ b/paramiko/message.py
@@ -187,7 +187,7 @@ class Message (object):
def get_list(self):
"""
- Fetch a `list` of `strings <str>` from the stream.
+ Fetch a list of `strings <str>` from the stream.
These are trivially encoded as comma-separated values in a string.
"""
@@ -281,7 +281,7 @@ class Message (object):
a single string of values separated by commas. (Yes, really, that's
how SSH2 does it.)
- :param list l: list of strings to add
+ :param l: list of strings to add
"""
self.add_string(','.join(l))
return self
diff --git a/paramiko/packet.py b/paramiko/packet.py
index 95a26c6e..2a1e91e2 100644
--- a/paramiko/packet.py
+++ b/paramiko/packet.py
@@ -368,7 +368,7 @@ class Packetizer (object):
if cmd in MSG_NAMES:
cmd_name = MSG_NAMES[cmd]
else:
- cmd_name = '$%x' % cmd
+ cmd_name = '${:x}'.format(cmd)
orig_len = len(data)
self.__write_lock.acquire()
try:
@@ -378,7 +378,8 @@ class Packetizer (object):
if self.__dump_packets:
self._log(
DEBUG,
- 'Write packet <%s>, length %d' % (cmd_name, orig_len))
+ 'Write packet <{}>, length {}'.format(cmd_name, orig_len)
+ )
self._log(DEBUG, util.format_binary(packet, 'OUT: '))
if self.__block_engine_out is not None:
out = self.__block_engine_out.update(packet)
@@ -404,8 +405,10 @@ class Packetizer (object):
)
if sent_too_much and not self.__need_rekey:
# only ask once for rekeying
- self._log(DEBUG, 'Rekeying (hit %d packets, %d bytes sent)' %
- (self.__sent_packets, self.__sent_bytes))
+ msg = "Rekeying (hit {} packets, {} bytes sent)"
+ self._log(DEBUG, msg.format(
+ self.__sent_packets, self.__sent_bytes,
+ ))
self.__received_bytes_overflow = 0
self.__received_packets_overflow = 0
self._trigger_rekey()
@@ -456,7 +459,10 @@ class Packetizer (object):
if self.__dump_packets:
self._log(
DEBUG,
- 'Got payload (%d bytes, %d padding)' % (packet_size, padding))
+ 'Got payload ({} bytes, {} padding)'.format(
+ packet_size, padding
+ )
+ )
if self.__compress_engine_in is not None:
payload = self.__compress_engine_in(payload)
@@ -483,8 +489,10 @@ class Packetizer (object):
elif (self.__received_packets >= self.REKEY_PACKETS) or \
(self.__received_bytes >= self.REKEY_BYTES):
# only ask once for rekeying
- self._log(DEBUG, 'Rekeying (hit %d packets, %d bytes received)' %
- (self.__received_packets, self.__received_bytes))
+ err = "Rekeying (hit {} packets, {} bytes received)"
+ self._log(DEBUG, err.format(
+ self.__received_packets, self.__received_bytes,
+ ))
self.__received_bytes_overflow = 0
self.__received_packets_overflow = 0
self._trigger_rekey()
@@ -493,11 +501,12 @@ class Packetizer (object):
if cmd in MSG_NAMES:
cmd_name = MSG_NAMES[cmd]
else:
- cmd_name = '$%x' % cmd
+ cmd_name = '${:x}'.format(cmd)
if self.__dump_packets:
self._log(
DEBUG,
- 'Read packet <%s>, length %d' % (cmd_name, len(payload)))
+ 'Read packet <{}>, length {}'.format(cmd_name, len(payload))
+ )
return cmd, msg
# ...protected...
diff --git a/paramiko/pkey.py b/paramiko/pkey.py
index 67723be2..808215f8 100644
--- a/paramiko/pkey.py
+++ b/paramiko/pkey.py
@@ -310,16 +310,18 @@ class PKey(object):
# unencryped: done
return data
# encrypted keyfile: will need a password
- if headers['proc-type'] != '4,ENCRYPTED':
+ proc_type = headers['proc-type']
+ if proc_type != '4,ENCRYPTED':
raise SSHException(
- 'Unknown private key structure "%s"' % headers['proc-type'])
+ 'Unknown private key structure "{}"'.format(proc_type)
+ )
try:
encryption_type, saltstr = headers['dek-info'].split(',')
except:
raise SSHException("Can't parse DEK-info in private key file")
if encryption_type not in self._CIPHER_TABLE:
raise SSHException(
- 'Unknown private key cipher "%s"' % encryption_type)
+ 'Unknown private key cipher "{}"'.format(encryption_type))
# if no password was passed in,
# raise an exception pointing out that we need one
if password is None:
@@ -409,7 +411,7 @@ class PKey(object):
# (requires going back into per-type subclasses.)
msg.get_string()
else:
- err = 'Invalid key (class: {0}, data type: {1}'
+ err = 'Invalid key (class: {}, data type: {}'
raise SSHException(err.format(self.__class__.__name__, type_))
def load_certificate(self, value):
@@ -439,7 +441,7 @@ class PKey(object):
constructor = 'from_string'
blob = getattr(PublicBlob, constructor)(value)
if not blob.key_type.startswith(self.get_name()):
- err = "PublicBlob type {0} incompatible with key type {1}"
+ err = "PublicBlob type {} incompatible with key type {}"
raise ValueError(err.format(blob.key_type, self.get_name()))
self.public_blob = blob
@@ -490,7 +492,7 @@ class PublicBlob(object):
"""
fields = string.split(None, 2)
if len(fields) < 2:
- msg = "Not enough fields for public blob: {0}"
+ msg = "Not enough fields for public blob: {}"
raise ValueError(msg.format(fields))
key_type = fields[0]
key_blob = decodebytes(b(fields[1]))
@@ -503,7 +505,7 @@ class PublicBlob(object):
m = Message(key_blob)
blob_type = m.get_text()
if blob_type != key_type:
- msg = "Invalid PublicBlob contents: key type={0!r}, but blob type={1!r}" # noqa
+ msg = "Invalid PublicBlob contents: key type={!r}, but blob type={!r}" # noqa
raise ValueError(msg.format(key_type, blob_type))
# All good? All good.
return cls(type_=key_type, blob=key_blob, comment=comment)
@@ -520,9 +522,9 @@ class PublicBlob(object):
return cls(type_=type_, blob=message.asbytes())
def __str__(self):
- ret = '{0} public key/certificate'.format(self.key_type)
+ ret = '{} public key/certificate'.format(self.key_type)
if self.comment:
- ret += "- {0}".format(self.comment)
+ ret += "- {}".format(self.comment)
return ret
def __eq__(self, other):
diff --git a/paramiko/primes.py b/paramiko/primes.py
index 65617914..ca8f9bec 100644
--- a/paramiko/primes.py
+++ b/paramiko/primes.py
@@ -91,7 +91,7 @@ class ModulusPack (object):
bl = util.bit_length(modulus)
if (bl != size) and (bl != size + 1):
self.discarded.append(
- (modulus, 'incorrectly reported bit length %d' % size))
+ (modulus, 'incorrectly reported bit length {}'.format(size)))
return
if bl not in self.pack:
self.pack[bl] = []
diff --git a/paramiko/py3compat.py b/paramiko/py3compat.py
index 6703ace8..cb9de412 100644
--- a/paramiko/py3compat.py
+++ b/paramiko/py3compat.py
@@ -46,7 +46,7 @@ if PY2:
elif isinstance(s, buffer): # NOQA
return s
else:
- raise TypeError("Expected unicode or bytes, got %r" % s)
+ raise TypeError("Expected unicode or bytes, got {!r}".format(s))
def u(s, encoding='utf8'): # NOQA
@@ -58,7 +58,7 @@ if PY2:
elif isinstance(s, buffer): # NOQA
return s.decode(encoding)
else:
- raise TypeError("Expected unicode or bytes, got %r" % s)
+ raise TypeError("Expected unicode or bytes, got {!r}".format(s))
def b2s(s):
@@ -135,7 +135,7 @@ else:
elif isinstance(s, str):
return s.encode(encoding)
else:
- raise TypeError("Expected unicode or bytes, got %r" % s)
+ raise TypeError("Expected unicode or bytes, got {!r}".format(s))
def u(s, encoding='utf8'):
"""cast bytes or unicode to unicode"""
@@ -144,7 +144,7 @@ else:
elif isinstance(s, str):
return s
else:
- raise TypeError("Expected unicode or bytes, got %r" % s)
+ raise TypeError("Expected unicode or bytes, got {!r}".format(s))
def b2s(s):
return s.decode() if isinstance(s, bytes) else s
diff --git a/paramiko/server.py b/paramiko/server.py
index c96126e9..a7117815 100644
--- a/paramiko/server.py
+++ b/paramiko/server.py
@@ -223,7 +223,7 @@ class ServerInterface (object):
The default implementation always returns ``AUTH_FAILED``.
- :param list responses: list of `str` responses from the client
+ :param responses: list of `str` responses from the client
:return:
``AUTH_FAILED`` if the authentication fails; ``AUTH_SUCCESSFUL`` if
it succeeds; ``AUTH_PARTIALLY_SUCCESSFUL`` if the interactive auth
@@ -667,12 +667,13 @@ class SubsystemHandler (threading.Thread):
def _run(self):
try:
self.__transport._log(
- DEBUG, 'Starting handler for subsystem %s' % self.__name)
+ DEBUG, 'Starting handler for subsystem {}'.format(self.__name)
+ )
self.start_subsystem(self.__name, self.__transport, self.__channel)
except Exception as e:
self.__transport._log(
ERROR,
- 'Exception in subsystem handler for "{0}": {1}'.format(
+ 'Exception in subsystem handler for "{}": {}'.format(
self.__name, e
)
)
diff --git a/paramiko/sftp_attr.py b/paramiko/sftp_attr.py
index 5597948a..ea12b2f6 100644
--- a/paramiko/sftp_attr.py
+++ b/paramiko/sftp_attr.py
@@ -82,7 +82,7 @@ class SFTPAttributes (object):
return attr
def __repr__(self):
- return '<SFTPAttributes: %s>' % self._debug_str()
+ return '<SFTPAttributes: {}>'.format(self._debug_str())
# ...internals...
@classmethod
@@ -146,15 +146,15 @@ class SFTPAttributes (object):
def _debug_str(self):
out = '[ '
if self.st_size is not None:
- out += 'size=%d ' % self.st_size
+ out += 'size={} '.format(self.st_size)
if (self.st_uid is not None) and (self.st_gid is not None):
- out += 'uid=%d gid=%d ' % (self.st_uid, self.st_gid)
+ out += 'uid={} gid={} '.format(self.st_uid, self.st_gid)
if self.st_mode is not None:
out += 'mode=' + oct(self.st_mode) + ' '
if (self.st_atime is not None) and (self.st_mtime is not None):
- out += 'atime=%d mtime=%d ' % (self.st_atime, self.st_mtime)
+ out += 'atime={} mtime={} '.format(self.st_atime, self.st_mtime)
for k, v in self.attr.items():
- out += '"%s"=%r ' % (str(k), v)
+ out += '"{}"={!r} '.format(str(k), v)
out += ']'
return out
@@ -222,8 +222,12 @@ class SFTPAttributes (object):
if size is None:
size = 0
+ # TODO: not sure this actually worked as expected beforehand, leaving
+ # it untouched for the time being, re: .format() upgrade, until someone
+ # has time to doublecheck
return '%s 1 %-8d %-8d %8d %-12s %s' % (
- ks, uid, gid, size, datestr, filename)
+ ks, uid, gid, size, datestr, filename,
+ )
def asbytes(self):
return b(str(self))
diff --git a/paramiko/sftp_client.py b/paramiko/sftp_client.py
index 14b8b58a..b344dff3 100644
--- a/paramiko/sftp_client.py
+++ b/paramiko/sftp_client.py
@@ -105,7 +105,8 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
raise SSHException('EOF during negotiation')
self._log(
INFO,
- 'Opened sftp connection (server version %d)' % server_version)
+ 'Opened sftp connection (server version {})'.format(server_version)
+ )
@classmethod
def from_transport(cls, t, window_size=None, max_packet_size=None):
@@ -143,6 +144,8 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
for m in msg:
self._log(level, m, *args)
else:
+ # NOTE: these bits MUST continue using %-style format junk because
+ # logging.Logger.log() explicitly requires it. Grump.
# escape '%' in msg (they could come from file or directory names)
# before logging
msg = msg.replace('%', '%%')
@@ -200,7 +203,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
.. versionadded:: 1.2
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, 'listdir(%r)' % path)
+ self._log(DEBUG, 'listdir({!r})'.format(path))
t, msg = self._request(CMD_OPENDIR, path)
if t != CMD_HANDLE:
raise SFTPError('Expected handle')
@@ -239,7 +242,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
.. versionadded:: 1.15
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, 'listdir(%r)' % path)
+ self._log(DEBUG, 'listdir({!r})'.format(path))
t, msg = self._request(CMD_OPENDIR, path)
if t != CMD_HANDLE:
@@ -322,7 +325,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:raises: ``IOError`` -- if the file could not be opened.
"""
filename = self._adjust_cwd(filename)
- self._log(DEBUG, 'open(%r, %r)' % (filename, mode))
+ self._log(DEBUG, 'open({!r}, {!r})'.format(filename, mode))
imode = 0
if ('r' in mode) or ('+' in mode):
imode |= SFTP_FLAG_READ
@@ -341,7 +344,8 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
handle = msg.get_binary()
self._log(
DEBUG,
- 'open(%r, %r) -> %s' % (filename, mode, u(hexlify(handle))))
+ 'open({!r}, {!r}) -> {}'.format(filename, mode, u(hexlify(handle)))
+ )
return SFTPFile(self, handle, mode, bufsize)
# Python continues to vacillate about "open" vs "file"...
@@ -357,7 +361,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:raises: ``IOError`` -- if the path refers to a folder (directory)
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, 'remove(%r)' % path)
+ self._log(DEBUG, 'remove({!r})'.format(path))
self._request(CMD_REMOVE, path)
unlink = remove
@@ -382,7 +386,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
"""
oldpath = self._adjust_cwd(oldpath)
newpath = self._adjust_cwd(newpath)
- self._log(DEBUG, 'rename(%r, %r)' % (oldpath, newpath))
+ self._log(DEBUG, 'rename({!r}, {!r})'.format(oldpath, newpath))
self._request(CMD_RENAME, oldpath, newpath)
def posix_rename(self, oldpath, newpath):
@@ -402,7 +406,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
"""
oldpath = self._adjust_cwd(oldpath)
newpath = self._adjust_cwd(newpath)
- self._log(DEBUG, 'posix_rename(%r, %r)' % (oldpath, newpath))
+ self._log(DEBUG, 'posix_rename({!r}, {!r})'.format(oldpath, newpath))
self._request(
CMD_EXTENDED, "posix-rename@openssh.com", oldpath, newpath
)
@@ -417,7 +421,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:param int mode: permissions (posix-style) for the newly-created folder
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, 'mkdir(%r, %r)' % (path, mode))
+ self._log(DEBUG, 'mkdir({!r}, {!r})'.format(path, mode))
attr = SFTPAttributes()
attr.st_mode = mode
self._request(CMD_MKDIR, path, attr)
@@ -429,7 +433,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:param str path: name of the folder to remove
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, 'rmdir(%r)' % path)
+ self._log(DEBUG, 'rmdir({!r})'.format(path))
self._request(CMD_RMDIR, path)
def stat(self, path):
@@ -452,7 +456,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
file
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, 'stat(%r)' % path)
+ self._log(DEBUG, 'stat({!r})'.format(path))
t, msg = self._request(CMD_STAT, path)
if t != CMD_ATTRS:
raise SFTPError('Expected attributes')
@@ -470,7 +474,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
file
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, 'lstat(%r)' % path)
+ self._log(DEBUG, 'lstat({!r})'.format(path))
t, msg = self._request(CMD_LSTAT, path)
if t != CMD_ATTRS:
raise SFTPError('Expected attributes')
@@ -484,7 +488,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:param str dest: path of the newly created symlink
"""
dest = self._adjust_cwd(dest)
- self._log(DEBUG, 'symlink(%r, %r)' % (source, dest))
+ self._log(DEBUG, 'symlink({!r}, {!r})'.format(source, dest))
source = bytestring(source)
self._request(CMD_SYMLINK, source, dest)
@@ -498,7 +502,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:param int mode: new permissions
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, 'chmod(%r, %r)' % (path, mode))
+ self._log(DEBUG, 'chmod({!r}, {!r})'.format(path, mode))
attr = SFTPAttributes()
attr.st_mode = mode
self._request(CMD_SETSTAT, path, attr)
@@ -515,7 +519,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:param int gid: new group id
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, 'chown(%r, %r, %r)' % (path, uid, gid))
+ self._log(DEBUG, 'chown({!r}, {!r}, {!r})'.format(path, uid, gid))
attr = SFTPAttributes()
attr.st_uid, attr.st_gid = uid, gid
self._request(CMD_SETSTAT, path, attr)
@@ -537,7 +541,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
path = self._adjust_cwd(path)
if times is None:
times = (time.time(), time.time())
- self._log(DEBUG, 'utime(%r, %r)' % (path, times))
+ self._log(DEBUG, 'utime({!r}, {!r})'.format(path, times))
attr = SFTPAttributes()
attr.st_atime, attr.st_mtime = times
self._request(CMD_SETSTAT, path, attr)
@@ -552,7 +556,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:param int size: the new size of the file
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, 'truncate(%r, %r)' % (path, size))
+ self._log(DEBUG, 'truncate({!r}, {!r})'.format(path, size))
attr = SFTPAttributes()
attr.st_size = size
self._request(CMD_SETSTAT, path, attr)
@@ -567,7 +571,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:return: target path, as a `str`
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, 'readlink(%r)' % path)
+ self._log(DEBUG, 'readlink({!r})'.format(path))
t, msg = self._request(CMD_READLINK, path)
if t != CMD_NAME:
raise SFTPError('Expected name response')
@@ -575,7 +579,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
if count == 0:
return None
if count != 1:
- raise SFTPError('Readlink returned %d results' % count)
+ raise SFTPError('Readlink returned {} results'.format(count))
return _to_unicode(msg.get_string())
def normalize(self, path):
@@ -591,13 +595,13 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
:raises: ``IOError`` -- if the path can't be resolved on the server
"""
path = self._adjust_cwd(path)
- self._log(DEBUG, 'normalize(%r)' % path)
+ self._log(DEBUG, 'normalize({!r})'.format(path))
t, msg = self._request(CMD_REALPATH, path)
if t != CMD_NAME:
raise SFTPError('Expected name response')
count = msg.get_int()
if count != 1:
- raise SFTPError('Realpath returned %d results' % count)
+ raise SFTPError('Realpath returned {} results'.format(count))
return msg.get_text()
def chdir(self, path=None):
@@ -620,8 +624,10 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
self._cwd = None
return
if not stat.S_ISDIR(self.stat(path).st_mode):
+ code = errno.ENOTDIR
raise SFTPError(
- errno.ENOTDIR, "%s: %s" % (os.strerror(errno.ENOTDIR), path))
+ code, "{}: {}".format(os.strerror(code), path)
+ )
self._cwd = b(self.normalize(path))
def getcwd(self):
@@ -683,7 +689,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
s = self.stat(remotepath)
if s.st_size != size:
raise IOError(
- 'size mismatch in put! %d != %d' % (s.st_size, size))
+ 'size mismatch in put! {} != {}'.format(s.st_size, size))
else:
s = SFTPAttributes()
return s
@@ -765,7 +771,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
s = os.stat(localpath)
if s.st_size != size:
raise IOError(
- 'size mismatch in get! %d != %d' % (s.st_size, size))
+ 'size mismatch in get! {} != {}'.format(s.st_size, size))
# ...internals...
@@ -803,7 +809,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
try:
t, data = self._read_packet()
except EOFError as e:
- raise SSHException('Server connection dropped: %s' % str(e))
+ raise SSHException('Server connection dropped: {}'.format(e))
msg = Message(data)
num = msg.get_int()
self._lock.acquire()
@@ -811,7 +817,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
if num not in self._expecting:
# might be response for a file that was closed before
# responses came back
- self._log(DEBUG, 'Unexpected response #%d' % (num,))
+ self._log(DEBUG, 'Unexpected response #{}'.format(num))
if waitfor is None:
# just doing a single check
break
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py
index bc34db94..52f2bde8 100644
--- a/paramiko/sftp_file.py
+++ b/paramiko/sftp_file.py
@@ -83,7 +83,7 @@ class SFTPFile (BufferedFile):
# __del__.)
if self._closed:
return
- self.sftp._log(DEBUG, 'close(%s)' % u(hexlify(self.handle)))
+ self.sftp._log(DEBUG, 'close({})'.format(u(hexlify(self.handle))))
if self.pipelined:
self.sftp._finish_responses(self)
BufferedFile.close(self)
@@ -288,7 +288,8 @@ class SFTPFile (BufferedFile):
:param int mode: new permissions
"""
- self.sftp._log(DEBUG, 'chmod(%s, %r)' % (hexlify(self.handle), mode))
+ self.sftp._log(DEBUG, 'chmod({}, {!r})'.format(
+ hexlify(self.handle), mode))
attr = SFTPAttributes()
attr.st_mode = mode
self.sftp._request(CMD_FSETSTAT, self.handle, attr)
@@ -305,7 +306,7 @@ class SFTPFile (BufferedFile):
"""
self.sftp._log(
DEBUG,
- 'chown(%s, %r, %r)' % (hexlify(self.handle), uid, gid))
+ 'chown({}, {!r}, {!r})'.format(hexlify(self.handle), uid, gid))
attr = SFTPAttributes()
attr.st_uid, attr.st_gid = uid, gid
self.sftp._request(CMD_FSETSTAT, self.handle, attr)
@@ -325,7 +326,8 @@ class SFTPFile (BufferedFile):
"""
if times is None:
times = (time.time(), time.time())
- self.sftp._log(DEBUG, 'utime(%s, %r)' % (hexlify(self.handle), times))
+ self.sftp._log(DEBUG, 'utime({}, {!r})'.format(
+ hexlify(self.handle), times))
attr = SFTPAttributes()
attr.st_atime, attr.st_mtime = times
self.sftp._request(CMD_FSETSTAT, self.handle, attr)
@@ -340,7 +342,7 @@ class SFTPFile (BufferedFile):
"""
self.sftp._log(
DEBUG,
- 'truncate(%s, %r)' % (hexlify(self.handle), size))
+ 'truncate({}, {!r})'.format(hexlify(self.handle), size))
attr = SFTPAttributes()
attr.st_size = size
self.sftp._request(CMD_FSETSTAT, self.handle, attr)
@@ -473,7 +475,8 @@ class SFTPFile (BufferedFile):
.. versionadded:: 1.5.4
"""
- self.sftp._log(DEBUG, 'readv(%s, %r)' % (hexlify(self.handle), chunks))
+ self.sftp._log(DEBUG, 'readv({}, {!r})'.format(
+ hexlify(self.handle), chunks))
read_chunks = []
for offset, size in chunks:
diff --git a/paramiko/sftp_server.py b/paramiko/sftp_server.py
index f7d1c657..f8c4f727 100644
--- a/paramiko/sftp_server.py
+++ b/paramiko/sftp_server.py
@@ -100,7 +100,7 @@ class SFTPServer (BaseSFTP, SubsystemHandler):
def start_subsystem(self, name, transport, channel):
self.sock = channel
- self._log(DEBUG, 'Started sftp server on channel %s' % repr(channel))
+ self._log(DEBUG, 'Started sftp server on channel {!r}'.format(channel))
self._send_server_version()
self.server.session_started()
while True:
@@ -200,7 +200,7 @@ class SFTPServer (BaseSFTP, SubsystemHandler):
item._pack(msg)
else:
raise Exception(
- 'unknown type for {0!r} type {1!r}'.format(
+ 'unknown type for {!r} type {!r}'.format(
item, type(item)))
self._send_packet(t, msg)
@@ -209,7 +209,7 @@ class SFTPServer (BaseSFTP, SubsystemHandler):
# must be error code
self._send_status(request_number, handle)
return
- handle._set_name(b('hx%d' % self.next_handle))
+ handle._set_name(b('hx{:d}'.format(self.next_handle)))
self.next_handle += 1
if folder:
self.folder_table[handle._get_name()] = handle
@@ -334,7 +334,7 @@ class SFTPServer (BaseSFTP, SubsystemHandler):
return flags
def _process(self, t, request_number, msg):
- self._log(DEBUG, 'Request: %s' % CMD_NAMES[t])
+ self._log(DEBUG, 'Request: {}'.format(CMD_NAMES[t]))
if t == CMD_OPEN:
path = msg.get_text()
flags = self._convert_pflags(msg.get_int())
diff --git a/paramiko/ssh_exception.py b/paramiko/ssh_exception.py
index e9ab8d66..2df84b65 100644
--- a/paramiko/ssh_exception.py
+++ b/paramiko/ssh_exception.py
@@ -63,7 +63,7 @@ class BadAuthenticationType (AuthenticationException):
self.args = (explanation, types, )
def __str__(self):
- return '{0} (allowed_types={1!r})'.format(
+ return '{} (allowed_types={!r})'.format(
SSHException.__str__(self), self.allowed_types
)
@@ -107,7 +107,7 @@ class BadHostKeyException (SSHException):
.. versionadded:: 1.6
"""
def __init__(self, hostname, got_key, expected_key):
- message = 'Host key for server {0} does not match: got {1}, expected {2}' # noqa
+ message = 'Host key for server {} does not match: got {}, expected {}' # noqa
message = message.format(
hostname, got_key.get_base64(),
expected_key.get_base64())
@@ -128,7 +128,7 @@ class ProxyCommandFailure (SSHException):
"""
def __init__(self, command, error):
SSHException.__init__(self,
- '"ProxyCommand (%s)" returned non-zero exit status: %s' % (
+ '"ProxyCommand ({})" returned non-zero exit status: {}'.format(
command, error
)
)
diff --git a/paramiko/ssh_gss.py b/paramiko/ssh_gss.py
index b3c3f72b..88dedf7e 100644
--- a/paramiko/ssh_gss.py
+++ b/paramiko/ssh_gss.py
@@ -284,7 +284,7 @@ class _SSH_GSSAPI(_SSH_GSSAuth):
else:
token = self._gss_ctxt.step(recv_token)
except gssapi.GSSException:
- message = "{0} Target: {1}".format(
+ message = "{} Target: {}".format(
sys.exc_info()[1], self._gss_host)
raise gssapi.GSSException(message)
self._gss_ctxt_status = self._gss_ctxt.established
@@ -444,7 +444,7 @@ class _SSH_SSPI(_SSH_GSSAuth):
error, token = self._gss_ctxt.authorize(recv_token)
token = token[0].Buffer
except pywintypes.error as e:
- e.strerror += ", Target: {1}".format(e, self._gss_host)
+ e.strerror += ", Target: {}".format(e, self._gss_host)
raise
if error == 0:
diff --git a/paramiko/transport.py b/paramiko/transport.py
index c2ef4fcc..ddcb2912 100644
--- a/paramiko/transport.py
+++ b/paramiko/transport.py
@@ -100,7 +100,7 @@ class Transport(threading.Thread, ClosingContextManager):
_DECRYPT = object()
_PROTO_ID = '2.0'
- _CLIENT_ID = 'paramiko_%s' % paramiko.__version__
+ _CLIENT_ID = 'paramiko_{}'.format(paramiko.__version__)
# These tuples of algorithm identifiers are in preference order; do not
# reorder without reason!
@@ -329,7 +329,7 @@ class Transport(threading.Thread, ClosingContextManager):
break
else:
raise SSHException(
- 'Unable to connect to %s: %s' % (hostname, reason))
+ 'Unable to connect to {}: {}'.format(hostname, reason))
# okay, normal socket-ish flow here...
threading.Thread.__init__(self)
self.setDaemon(True)
@@ -415,17 +415,19 @@ class Transport(threading.Thread, ClosingContextManager):
"""
Returns a string representation of this object, for debugging.
"""
- out = '<paramiko.Transport at %s' % hex(long(id(self)) & xffffffff)
+ id_ = hex(long(id(self)) & xffffffff)
+ out = '<paramiko.Transport at {}'.format(id_)
if not self.active:
out += ' (unconnected)'
else:
if self.local_cipher != '':
- out += ' (cipher %s, %d bits)' % (
+ out += ' (cipher {}, {:d} bits)'.format(
self.local_cipher,
self._cipher_info[self.local_cipher]['key-size'] * 8
)
if self.is_authenticated():
- out += ' (active; %d open channel(s))' % len(self._channels)
+ out += ' (active; {} open channel(s))'.format(
+ len(self._channels))
elif self.initial_kex_done:
out += ' (connected; awaiting auth)'
else:
@@ -1061,7 +1063,7 @@ class Transport(threading.Thread, ClosingContextManager):
m.add_boolean(wait)
if data is not None:
m.add(*data)
- self._log(DEBUG, 'Sending global request "%s"' % kind)
+ self._log(DEBUG, 'Sending global request "{}"'.format(kind))
self._send_user_message(m)
if not wait:
return None
@@ -1179,14 +1181,15 @@ class Transport(threading.Thread, ClosingContextManager):
key.asbytes() != hostkey.asbytes()
):
self._log(DEBUG, 'Bad host key from server')
- self._log(DEBUG, 'Expected: %s: %s' % (
- hostkey.get_name(), repr(hostkey.asbytes()))
- )
- self._log(DEBUG, 'Got : %s: %s' % (
- key.get_name(), repr(key.asbytes()))
- )
+ self._log(DEBUG, 'Expected: {}: {}'.format(
+ hostkey.get_name(), repr(hostkey.asbytes()),
+ ))
+ self._log(DEBUG, 'Got : {}: {}'.format(
+ key.get_name(), repr(key.asbytes()),
+ ))
raise SSHException('Bad host key from server')
- self._log(DEBUG, 'Host key verified (%s)' % hostkey.get_name())
+ self._log(DEBUG, 'Host key verified ({})'.format(
+ hostkey.get_name()))
if (pkey is not None) or (password is not None) or gss_auth or gss_kex:
if gss_auth:
@@ -1295,7 +1298,7 @@ class Transport(threading.Thread, ClosingContextManager):
:param str username: the username to authenticate as
:return:
- `list` of auth types permissible for the next stage of
+ list of auth types permissible for the next stage of
authentication (normally empty)
:raises:
@@ -1350,7 +1353,7 @@ class Transport(threading.Thread, ClosingContextManager):
``True`` if an attempt at an automated "interactive" password auth
should be made if the server doesn't support normal password auth
:return:
- `list` of auth types permissible for the next stage of
+ list of auth types permissible for the next stage of
authentication (normally empty)
:raises:
@@ -1421,7 +1424,7 @@ class Transport(threading.Thread, ClosingContextManager):
an event to trigger when the authentication attempt is complete
(whether it was successful or not)
:return:
- `list` of auth types permissible for the next stage of
+ list of auth types permissible for the next stage of
authentication (normally empty)
:raises:
@@ -1479,7 +1482,7 @@ class Transport(threading.Thread, ClosingContextManager):
:param callable handler: a handler for responding to server questions
:param str submethods: a string list of desired submethods (optional)
:return:
- `list` of auth types permissible for the next stage of
+ list of auth types permissible for the next stage of
authentication (normally empty).
:raises: `.BadAuthenticationType` -- if public-key authentication isn't
@@ -1529,7 +1532,6 @@ class Transport(threading.Thread, ClosingContextManager):
:param bool gss_deleg_creds: Delegate credentials or not
:return: list of auth types permissible for the next stage of
authentication (normally empty)
- :rtype: list
:raises: `.BadAuthenticationType` -- if gssapi-with-mic isn't
allowed by the server (and no event was passed in)
:raises:
@@ -1553,7 +1555,7 @@ class Transport(threading.Thread, ClosingContextManager):
:param str username: The username to authenticate as.
:returns:
- a `list` of auth types permissible for the next stage of
+ a list of auth types permissible for the next stage of
authentication (normally empty)
:raises: `.BadAuthenticationType` --
if GSS-API Key Exchange was not performed (and no event was passed
@@ -1746,7 +1748,7 @@ class Transport(threading.Thread, ClosingContextManager):
if key is None:
raise SSHException('Unknown host key type')
if not key.verify_ssh_sig(self.H, Message(sig)):
- raise SSHException('Signature verification (%s) failed.' % self.host_key_type) # noqa
+ raise SSHException('Signature verification ({}) failed.'.format(self.host_key_type)) # noqa
self.host_key = key
def _compute_key(self, id, nbytes):
@@ -1759,8 +1761,8 @@ class Transport(threading.Thread, ClosingContextManager):
# Fallback to SHA1 for kex engines that fail to specify a hex
# algorithm, or for e.g. transport tests that don't run kexinit.
hash_algo = getattr(self.kex_engine, 'hash_algo', None)
- hash_select_msg = "kex engine %s specified hash_algo %r" % (
- self.kex_engine.__class__.__name__, hash_algo
+ hash_select_msg = "kex engine {} specified hash_algo {!r}".format(
+ self.kex_engine.__class__.__name__, hash_algo,
)
if hash_algo is None:
hash_algo = sha1
@@ -1881,13 +1883,13 @@ class Transport(threading.Thread, ClosingContextManager):
_active_threads.append(self)
tid = hex(long(id(self)) & xffffffff)
if self.server_mode:
- self._log(DEBUG, 'starting thread (server mode): %s' % tid)
+ self._log(DEBUG, 'starting thread (server mode): {}'.format(tid))
else:
- self._log(DEBUG, 'starting thread (client mode): %s' % tid)
+ self._log(DEBUG, 'starting thread (client mode): {}'.format(tid))
try:
try:
self.packetizer.write_all(b(self.local_version + '\r\n'))
- self._log(DEBUG, 'Local version/idstring: %s' % self.local_version) # noqa
+ self._log(DEBUG, 'Local version/idstring: {}'.format(self.local_version)) # noqa
self._check_banner()
# The above is actually very much part of the handshake, but
# sometimes the banner can be read but the machine is not
@@ -1917,7 +1919,7 @@ class Transport(threading.Thread, ClosingContextManager):
continue
if len(self._expected_packet) > 0:
if ptype not in self._expected_packet:
- raise SSHException('Expecting packet from %r, got %d' % (self._expected_packet, ptype)) # noqa
+ raise SSHException('Expecting packet from {!r}, got {:d}'.format(self._expected_packet, ptype)) # noqa
self._expected_packet = tuple()
if (ptype >= 30) and (ptype <= 41):
self.kex_engine.parse_next(ptype, m)
@@ -1935,9 +1937,9 @@ class Transport(threading.Thread, ClosingContextManager):
if chan is not None:
self._channel_handler_table[ptype](chan, m)
elif chanid in self.channels_seen:
- self._log(DEBUG, 'Ignoring message for dead channel %d' % chanid) # noqa
+ self._log(DEBUG, 'Ignoring message for dead channel {:d}'.format(chanid)) # noqa
else:
- self._log(ERROR, 'Channel request for unknown channel %d' % chanid) # noqa
+ self._log(ERROR, 'Channel request for unknown channel {:d}'.format(chanid)) # noqa
break
elif (
self.auth_handler is not None and
@@ -1948,7 +1950,8 @@ class Transport(threading.Thread, ClosingContextManager):
if len(self._expected_packet) > 0:
continue
else:
- self._log(WARNING, 'Oops, unhandled type %d' % ptype)
+ err = 'Oops, unhandled type {:d}'.format(ptype)
+ self._log(WARNING, err)
msg = Message()
msg.add_byte(cMSG_UNIMPLEMENTED)
msg.add_int(m.seqno)
@@ -1964,7 +1967,7 @@ class Transport(threading.Thread, ClosingContextManager):
except socket.error as e:
if type(e.args) is tuple:
if e.args:
- emsg = '%s (%d)' % (e.args[1], e.args[0])
+ emsg = '{} ({:d})'.format(e.args[1], e.args[0])
else: # empty tuple, e.g. socket.timeout
emsg = str(e) or repr(e)
else:
@@ -2006,11 +2009,11 @@ class Transport(threading.Thread, ClosingContextManager):
# Log useful, non-duplicative line re: an agreed-upon algorithm.
# Old code implied algorithms could be asymmetrical (different for
# inbound vs outbound) so we preserve that possibility.
- msg = "{0} agreed: ".format(which)
+ msg = "{} agreed: ".format(which)
if local == remote:
msg += local
else:
- msg += "local={0}, remote={1}".format(local, remote)
+ msg += "local={}, remote={}".format(local, remote)
self._log(DEBUG, msg)
# protocol stages
@@ -2052,7 +2055,7 @@ class Transport(threading.Thread, ClosingContextManager):
raise SSHException('Indecipherable protocol version "' + buf + '"')
# save this server version string for later
self.remote_version = buf
- self._log(DEBUG, 'Remote version/idstring: %s' % buf)
+ self._log(DEBUG, 'Remote version/idstring: {}'.format(buf))
# pull off any attached comment
# NOTE: comment used to be stored in a variable and then...never used.
# since 2003. ca 877cd974b8182d26fa76d566072917ea67b64e67
@@ -2066,9 +2069,9 @@ class Transport(threading.Thread, ClosingContextManager):
version = segs[1]
client = segs[2]
if version != '1.99' and version != '2.0':
- msg = 'Incompatible version ({0} instead of 2.0)'
+ msg = 'Incompatible version ({} instead of 2.0)'
raise SSHException(msg.format(version))
- msg = 'Connected (version {0}, client {1})'.format(version, client)
+ msg = 'Connected (version {}, client {})'.format(version, client)
self._log(INFO, msg)
def _send_kex_init(self):
@@ -2171,7 +2174,7 @@ class Transport(threading.Thread, ClosingContextManager):
if len(agreed_kex) == 0:
raise SSHException('Incompatible ssh peer (no acceptable kex algorithm)') # noqa
self.kex_engine = self._kex_info[agreed_kex[0]](self)
- self._log(DEBUG, "Kex agreed: %s" % agreed_kex[0])
+ self._log(DEBUG, "Kex agreed: {}".format(agreed_kex[0]))
if self.server_mode:
available_server_keys = list(filter(
@@ -2264,7 +2267,7 @@ class Transport(threading.Thread, ClosingContextManager):
len(agreed_local_compression) == 0 or
len(agreed_remote_compression) == 0
):
- msg = 'Incompatible ssh server (no acceptable compression) {0!r} {1!r} {2!r}' # noqa
+ msg = 'Incompatible ssh server (no acceptable compression) {!r} {!r} {!r}' # noqa
raise SSHException(msg.format(
agreed_local_compression, agreed_remote_compression,
self._preferred_compression,
@@ -2407,16 +2410,16 @@ class Transport(threading.Thread, ClosingContextManager):
def _parse_disconnect(self, m):
code = m.get_int()
desc = m.get_text()
- self._log(INFO, 'Disconnect (code %d): %s' % (code, desc))
+ self._log(INFO, 'Disconnect (code {:d}): {}'.format(code, desc))
def _parse_global_request(self, m):
kind = m.get_text()
- self._log(DEBUG, 'Received global request "%s"' % kind)
+ self._log(DEBUG, 'Received global request "{}"'.format(kind))
want_reply = m.get_boolean()
if not self.server_mode:
self._log(
DEBUG,
- 'Rejecting "%s" global request from server.' % kind
+ 'Rejecting "{}" global request from server.'.format(kind)
)
ok = False
elif kind == 'tcpip-forward':
@@ -2470,7 +2473,7 @@ class Transport(threading.Thread, ClosingContextManager):
try:
chan._set_remote_channel(
server_chanid, server_window_size, server_max_packet_size)
- self._log(DEBUG, 'Secsh channel %d opened.' % chanid)
+ self._log(DEBUG, 'Secsh channel {:d} opened.'.format(chanid))
if chanid in self.channel_events:
self.channel_events[chanid].set()
del self.channel_events[chanid]
@@ -2486,8 +2489,9 @@ class Transport(threading.Thread, ClosingContextManager):
reason_text = CONNECTION_FAILED_CODE.get(reason, '(unknown code)')
self._log(
ERROR,
- 'Secsh channel %d open FAILED: %s: %s' % (
- chanid, reason_str, reason_text)
+ 'Secsh channel {:d} open FAILED: {}: {}'.format(
+ chanid, reason_str, reason_text,
+ )
)
self.lock.acquire()
try:
@@ -2522,8 +2526,9 @@ class Transport(threading.Thread, ClosingContextManager):
origin_port = m.get_int()
self._log(
DEBUG,
- 'Incoming x11 connection from %s:%d' % (
- origin_addr, origin_port)
+ 'Incoming x11 connection from {}:{:d}'.format(
+ origin_addr, origin_port,
+ )
)
self.lock.acquire()
try:
@@ -2537,8 +2542,9 @@ class Transport(threading.Thread, ClosingContextManager):
origin_port = m.get_int()
self._log(
DEBUG,
- 'Incoming tcp forwarded connection from %s:%d' % (
- origin_addr, origin_port)
+ 'Incoming tcp forwarded connection from {}:{:d}'.format(
+ origin_addr, origin_port,
+ )
)
self.lock.acquire()
try:
@@ -2548,7 +2554,7 @@ class Transport(threading.Thread, ClosingContextManager):
elif not self.server_mode:
self._log(
DEBUG,
- 'Rejecting "%s" channel request from server.' % kind)
+ 'Rejecting "{}" channel request from server.'.format(kind))
reject = True
reason = OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
else:
@@ -2574,7 +2580,7 @@ class Transport(threading.Thread, ClosingContextManager):
if reason != OPEN_SUCCEEDED:
self._log(
DEBUG,
- 'Rejecting "%s" channel request from client.' % kind)
+ 'Rejecting "{}" channel request from client.'.format(kind))
reject = True
if reject:
msg = Message()
@@ -2605,7 +2611,9 @@ class Transport(threading.Thread, ClosingContextManager):
m.add_int(self.default_window_size)
m.add_int(self.default_max_packet_size)
self._send_message(m)
- self._log(DEBUG, 'Secsh channel %d (%s) opened.', my_chanid, kind)
+ self._log(DEBUG,
+ 'Secsh channel {:d} ({}) opened.'.format(my_chanid, kind)
+ )
if kind == 'auth-agent@openssh.com':
self._forward_agent_handler(chan)
elif kind == 'x11':
@@ -2624,7 +2632,7 @@ class Transport(threading.Thread, ClosingContextManager):
m.get_boolean() # always_display
msg = m.get_string()
m.get_string() # language
- self._log(DEBUG, 'Debug msg: {0}'.format(util.safe_string(msg)))
+ self._log(DEBUG, 'Debug msg: {}'.format(util.safe_string(msg)))
def _get_subsystem_handler(self, name):
try:
@@ -2679,7 +2687,7 @@ class SecurityOptions (object):
"""
Returns a string representation of this object, for debugging.
"""
- return '<paramiko.SecurityOptions for %s>' % repr(self._transport)
+ return '<paramiko.SecurityOptions for {!r}>'.format(self._transport)
def _set(self, name, orig, x):
if type(x) is list:
diff --git a/paramiko/util.py b/paramiko/util.py
index de099c0c..2854ef98 100644
--- a/paramiko/util.py
+++ b/paramiko/util.py
@@ -102,19 +102,22 @@ def format_binary(data, prefix=''):
def format_binary_line(data):
- left = ' '.join(['%02X' % byte_ord(c) for c in data])
- right = ''.join([('.%c..' % c)[(byte_ord(c) + 63) // 95] for c in data])
- return '%-50s %s' % (left, right)
+ left = ' '.join(['{:02X}'.format(byte_ord(c)) for c in data])
+ right = ''.join([
+ '.{:c}..'.format(byte_ord(c))[(byte_ord(c) + 63) // 95]
+ for c in data
+ ])
+ return '{:50s} {}'.format(left, right)
def safe_string(s):
- out = b('')
+ out = b''
for c in s:
i = byte_ord(c)
if 32 <= i <= 127:
out += byte_chr(i)
else:
- out += b('%%%02X' % i)
+ out += b('%{:02X}'.format(i))
return out
diff --git a/paramiko/win_pageant.py b/paramiko/win_pageant.py
index fda3b9c1..661ba575 100644
--- a/paramiko/win_pageant.py
+++ b/paramiko/win_pageant.py
@@ -44,7 +44,7 @@ win32con_WM_COPYDATA = 74
def _get_pageant_window_object():
- return ctypes.windll.user32.FindWindowA(b('Pageant'), b('Pageant'))
+ return ctypes.windll.user32.FindWindowA(b'Pageant', b'Pageant')
def can_talk_to_agent():
diff --git a/setup.cfg b/setup.cfg
index 8cc271fe..00e34ca3 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -11,3 +11,8 @@ omit = paramiko/_winapi.py
exclude = sites,.git,build,dist,demos,tests
ignore = E124,E125,E128,E261,E301,E302,E303,E402
max-line-length = 79
+
+[tool:pytest]
+# We use pytest-relaxed just for its utils at the moment, so disable it at the
+# plugin level until we adapt test organization to really use it.
+addopts = -p no:relaxed
diff --git a/setup.py b/setup.py
index 1234bfa5..6e1f0e0e 100644
--- a/setup.py
+++ b/setup.py
@@ -65,11 +65,8 @@ setup(
'Topic :: Security :: Cryptography',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
- 'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
- 'Programming Language :: Python :: 3.2',
- 'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
diff --git a/sites/shared_conf.py b/sites/shared_conf.py
index 99fab315..cf0d77ff 100644
--- a/sites/shared_conf.py
+++ b/sites/shared_conf.py
@@ -26,13 +26,13 @@ html_sidebars = {
# Everything intersphinx's to Python
intersphinx_mapping = {
- 'python': ('http://docs.python.org/2.6', None),
+ 'python': ('https://docs.python.org/2.7/', None),
}
# Regular settings
project = 'Paramiko'
year = datetime.now().year
-copyright = '%d Jeff Forcier' % year
+copyright = '{} Jeff Forcier'.format(year)
master_doc = 'index'
templates_path = ['_templates']
exclude_trees = ['_build']
diff --git a/sites/www/changelog.rst b/sites/www/changelog.rst
index e3d779af..07ee39fa 100644
--- a/sites/www/changelog.rst
+++ b/sites/www/changelog.rst
@@ -14,10 +14,32 @@ Changelog
* :bug:`1039` Ed25519 auth key decryption raised an unexpected exception when
given a unicode password string (typical in python 3). Report by Theodor van
Nahl and fix by Pierce Lopez.
+* :release:`2.4.0 <2017-11-14>`
+* :feature:`-` Add a new ``passphrase`` kwarg to `SSHClient.connect
+ <paramiko.client.SSHClient.connect>` so users may disambiguate key-decryption
+ passphrases from password-auth passwords. (This is a backwards compatible
+ change; ``password`` will still pull double duty as a passphrase when
+ ``passphrase`` is not given.)
+* :support:`-` Update ``tearDown`` of client test suite to avoid hangs due to
+ eternally blocking ``accept()`` calls on the internal server thread (which
+ can occur when test code raises an exception before actually connecting to
+ the server.)
* :bug:`1108 (1.17+)` Rename a private method keyword argument (which was named
``async``) so that we're compatible with the upcoming Python 3.7 release
(where ``async`` is a new keyword.) Thanks to ``@vEpiphyte`` for the report.
+* :support:`1100` Updated the test suite & related docs/metadata/config to be
+ compatible with pytest instead of using the old, custom, crufty
+ unittest-based ``test.py``.
+
+ This includes marking known-slow tests (mostly the SFTP ones) so they can be
+ filtered out by ``inv test``'s default behavior; as well as other minor
+ tweaks to test collection and/or display (for example, GSSAPI tests are
+ collected, but skipped, instead of not even being collected by default as in
+ ``test.py``.)
* :support:`- backported` Include LICENSE file in wheel archives.
+* :support:`1070` Drop Python 2.6 and Python 3.3 support; now only 2.7 and 3.4+
+ are supported. If you're unable to upgrade from 2.6 or 3.3, please stick to
+ the Paramiko 2.3.x (or below) release lines.
* :release:`2.3.1 <2017-09-22>`
* :bug:`1071` Certificate support broke the no-certificate case for Ed25519
keys (symptom is an ``AttributeError`` about ``public_blob``.) This went
diff --git a/sites/www/index.rst b/sites/www/index.rst
index f0a5db8a..26961f24 100644
--- a/sites/www/index.rst
+++ b/sites/www/index.rst
@@ -1,11 +1,11 @@
Welcome to Paramiko!
====================
-Paramiko is a Python (2.6+, 3.3+) implementation of the SSHv2 protocol [#]_,
+Paramiko is a Python (2.7, 3.4+) implementation of the SSHv2 protocol [#]_,
providing both client and server functionality. While it leverages a Python C
-extension for low level cryptography
-(`Cryptography <https://cryptography.io>`_), Paramiko itself is a pure Python
-interface around SSH networking concepts.
+extension for low level cryptography (`Cryptography
+<https://cryptography.io>`_), Paramiko itself is a pure Python interface around
+SSH networking concepts.
This website covers project information for Paramiko such as the changelog,
contribution guidelines, development roadmap, news/blog, and so forth. Detailed
diff --git a/sites/www/installing.rst b/sites/www/installing.rst
index f335a9e7..e6db2dca 100644
--- a/sites/www/installing.rst
+++ b/sites/www/installing.rst
@@ -19,8 +19,8 @@ via `pip <http://pip-installer.org>`_::
$ pip install paramiko
-We currently support **Python 2.6, 2.7, 3.3+, and PyPy**. Users on Python 2.5
-or older (or 3.2 or older) are urged to upgrade.
+We currently support **Python 2.7, 3.4+, and PyPy**. Users on Python 2.6 or
+older (or 3.3 or older) are urged to upgrade.
Paramiko has only one direct hard dependency: the Cryptography library. See
:ref:`cryptography`.
diff --git a/tasks.py b/tasks.py
index 4ddeeee3..3b34818c 100644
--- a/tasks.py
+++ b/tasks.py
@@ -8,26 +8,46 @@ from invocations.packaging.release import ns as release_coll, publish
from invocations.testing import count_errors
-# Until we move to spec-based testing
@task
-def test(ctx, coverage=False, flags=""):
- if "--verbose" not in flags.split():
- flags += " --verbose"
- runner = "python"
+def test(ctx, verbose=True, coverage=False, include_slow=False, opts=""):
+ """
+ Run unit tests via pytest.
+
+ By default, known-slow parts of the suite are SKIPPED unless
+ ``--include-slow`` is given. (Note that ``--include-slow`` does not mesh
+ well with explicit ``--opts="-m=xxx"`` - if ``-m`` is found in ``--opts``,
+ ``--include-slow`` will be ignored!)
+ """
+ if verbose and '--verbose' not in opts and '-v' not in opts:
+ opts += " --verbose"
+ if '-m' not in opts and not include_slow:
+ opts += " -m 'not slow'"
+ runner = "pytest"
if coverage:
- runner = "coverage run --source=paramiko"
+ # Leverage how pytest can be run as 'python -m pytest', and then how
+ # coverage can be told to run things in that manner instead of
+ # expecting a literal .py file.
+ # TODO: get pytest's coverage plugin working, IIRC it has issues?
+ runner = "coverage run --source=paramiko -m pytest"
# Strip SSH_AUTH_SOCK from parent env to avoid pollution by interactive
# users.
+ # TODO: once pytest coverage plugin works, see if there's a pytest-native
+ # way to handle the env stuff too, then we can remove these tasks entirely
+ # in favor of just "run pytest"?
env = dict(os.environ)
if 'SSH_AUTH_SOCK' in env:
del env['SSH_AUTH_SOCK']
- cmd = "{0} test.py {1}".format(runner, flags)
+ cmd = "{} {}".format(runner, opts)
+ # NOTE: we have a pytest.ini and tend to use that over PYTEST_ADDOPTS.
ctx.run(cmd, pty=True, env=env, replace_env=True)
@task
-def coverage(ctx):
- ctx.run("coverage run --source=paramiko test.py --verbose")
+def coverage(ctx, opts=""):
+ """
+ Execute all tests (normal and slow) with coverage enabled.
+ """
+ return test(ctx, coverage=True, include_slow=True, opts=opts)
# Until we stop bundling docs w/ releases. Need to discover use cases first.
diff --git a/test.py b/test.py
deleted file mode 100755
index 7849c149..00000000
--- a/test.py
+++ /dev/null
@@ -1,196 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
-#
-# This file is part of paramiko.
-#
-# Paramiko is free software; you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation; either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
-# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
-# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
-
-"""
-do the unit tests!
-"""
-
-# flake8: noqa
-import os
-import re
-import sys
-import unittest
-from optparse import OptionParser
-import paramiko
-import threading
-from paramiko.py3compat import PY2
-
-sys.path.append('tests')
-
-from tests.test_message import MessageTest
-from tests.test_file import BufferedFileTest
-from tests.test_buffered_pipe import BufferedPipeTest
-from tests.test_util import UtilTest
-from tests.test_hostkeys import HostKeysTest
-from tests.test_pkey import KeyTest
-from tests.test_kex import KexTest
-from tests.test_packetizer import PacketizerTest
-from tests.test_auth import AuthTest
-from tests.test_transport import TransportTest
-from tests.test_ssh_exception import NoValidConnectionsErrorTest
-from tests.test_client import SSHClientTest
-from test_client import SSHClientTest # XXX why shadow the above import?
-from test_gssapi import GSSAPITest
-from test_ssh_gss import GSSAuthTest
-from test_kex_gss import GSSKexTest
-
-default_host = 'localhost'
-default_user = os.environ.get('USER', 'nobody')
-default_keyfile = os.path.join(os.environ.get('HOME', '/'), '.ssh/id_rsa')
-default_passwd = None
-
-
-def iter_suite_tests(suite):
- """Return all tests in a suite, recursing through nested suites"""
- for item in suite._tests:
- if isinstance(item, unittest.TestCase):
- yield item
- elif isinstance(item, unittest.TestSuite):
- for r in iter_suite_tests(item):
- yield r
- else:
- raise Exception('unknown object %r inside test suite %r'
- % (item, suite))
-
-
-def filter_suite_by_re(suite, pattern):
- result = unittest.TestSuite()
- filter_re = re.compile(pattern)
- for test in iter_suite_tests(suite):
- if filter_re.search(test.id()):
- result.addTest(test)
- return result
-
-
-def main():
- parser = OptionParser('usage: %prog [options]')
- parser.add_option('--verbose', action='store_true', dest='verbose', default=False,
- help='verbose display (one line per test)')
- parser.add_option('--no-pkey', action='store_false', dest='use_pkey', default=True,
- help='skip RSA/DSS private key tests (which can take a while)')
- parser.add_option('--no-transport', action='store_false', dest='use_transport', default=True,
- help='skip transport tests (which can take a while)')
- parser.add_option('--no-sftp', action='store_false', dest='use_sftp', default=True,
- help='skip SFTP client/server tests, which can be slow')
- parser.add_option('--no-big-file', action='store_false', dest='use_big_file', default=True,
- help='skip big file SFTP tests, which are slow as molasses')
- parser.add_option('--gssapi-test', action='store_true', dest='gssapi_test', default=False,
- help='Test the used APIs for GSS-API / SSPI authentication')
- parser.add_option('--test-gssauth', action='store_true', dest='test_gssauth', default=False,
- help='Test GSS-API / SSPI authentication for SSHv2. To test this, you need kerberos a infrastructure.\
- Note: Paramiko needs access to your krb5.keytab file. Make it readable for Paramiko or\
- copy the used key to another file and set the environment variable KRB5_KTNAME to this file.')
- parser.add_option('--test-gssapi-keyex', action='store_true', dest='test_gsskex', default=False,
- help='Test GSS-API / SSPI authenticated iffie-Hellman Key Exchange and user\
- authentication. To test this, you need kerberos a infrastructure.\
- Note: Paramiko needs access to your krb5.keytab file. Make it readable for Paramiko or\
- copy the used key to another file and set the environment variable KRB5_KTNAME to this file.')
- parser.add_option('-R', action='store_false', dest='use_loopback_sftp', default=True,
- help='perform SFTP tests against a remote server (by default, SFTP tests ' +
- 'are done through a loopback socket)')
- parser.add_option('-H', '--sftp-host', dest='hostname', type='string', default=default_host,
- metavar='<host>',
- help='[with -R] host for remote sftp tests (default: %s)' % default_host)
- parser.add_option('-U', '--sftp-user', dest='username', type='string', default=default_user,
- metavar='<username>',
- help='[with -R] username for remote sftp tests (default: %s)' % default_user)
- parser.add_option('-K', '--sftp-key', dest='keyfile', type='string', default=default_keyfile,
- metavar='<keyfile>',
- help='[with -R] location of private key for remote sftp tests (default: %s)' %
- default_keyfile)
- parser.add_option('-P', '--sftp-passwd', dest='password', type='string', default=default_passwd,
- metavar='<password>',
- help='[with -R] (optional) password to unlock the private key for remote sftp tests')
- parser.add_option('--krb5_principal', dest='krb5_principal', type='string',
- metavar='<krb5_principal>',
- help='The krb5 principal (your username) for GSS-API / SSPI authentication')
- parser.add_option('--targ_name', dest='targ_name', type='string',
- metavar='<targ_name>',
- help='Target name for GSS-API / SSPI authentication.\
- This is the hosts name you are running the test on in the kerberos database.')
- parser.add_option('--server_mode', action='store_true', dest='server_mode', default=False,
- help='Usage with --gssapi-test. Test the available GSS-API / SSPI server mode to.\
- Note: you need to have access to the kerberos keytab file.')
-
- options, args = parser.parse_args()
-
- # setup logging
- paramiko.util.log_to_file('test.log')
-
- if options.use_sftp:
- from tests.test_sftp import SFTPTest
- if options.use_loopback_sftp:
- SFTPTest.init_loopback()
- else:
- SFTPTest.init(options.hostname, options.username, options.keyfile, options.password)
- if not options.use_big_file:
- SFTPTest.set_big_file_test(False)
- if options.use_big_file:
- from tests.test_sftp_big import BigSFTPTest
-
- suite = unittest.TestSuite()
- suite.addTest(unittest.makeSuite(MessageTest))
- suite.addTest(unittest.makeSuite(BufferedFileTest))
- suite.addTest(unittest.makeSuite(BufferedPipeTest))
- suite.addTest(unittest.makeSuite(UtilTest))
- suite.addTest(unittest.makeSuite(HostKeysTest))
- if options.use_pkey:
- suite.addTest(unittest.makeSuite(KeyTest))
- suite.addTest(unittest.makeSuite(KexTest))
- suite.addTest(unittest.makeSuite(PacketizerTest))
- if options.use_transport:
- suite.addTest(unittest.makeSuite(AuthTest))
- suite.addTest(unittest.makeSuite(TransportTest))
- suite.addTest(unittest.makeSuite(NoValidConnectionsErrorTest))
- suite.addTest(unittest.makeSuite(SSHClientTest))
- if options.use_sftp:
- suite.addTest(unittest.makeSuite(SFTPTest))
- if options.use_big_file:
- suite.addTest(unittest.makeSuite(BigSFTPTest))
- if options.gssapi_test:
- GSSAPITest.init(options.targ_name, options.server_mode)
- suite.addTest(unittest.makeSuite(GSSAPITest))
- if options.test_gssauth:
- GSSAuthTest.init(options.krb5_principal, options.targ_name)
- suite.addTest(unittest.makeSuite(GSSAuthTest))
- if options.test_gsskex:
- GSSKexTest.init(options.krb5_principal, options.targ_name)
- suite.addTest(unittest.makeSuite(GSSKexTest))
- verbosity = 1
- if options.verbose:
- verbosity = 2
-
- runner = unittest.TextTestRunner(verbosity=verbosity)
- if len(args) > 0:
- filter = '|'.join(args)
- suite = filter_suite_by_re(suite, filter)
- result = runner.run(suite)
- # Clean up stale threads from poorly cleaned-up tests.
- # TODO: make that not a problem, jeez
- for thread in threading.enumerate():
- if thread is not threading.currentThread():
- thread.join(timeout=1)
- # Exit correctly
- if not result.wasSuccessful():
- sys.exit(1)
-
-
-if __name__ == '__main__':
- main()
diff --git a/tests/__init__.py b/tests/__init__.py
index 8878f14d..be1d2daa 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -1,36 +1 @@
-# Copyright (C) 2017 Martin Packman <gzlist@googlemail.com>
-#
-# This file is part of paramiko.
-#
-# Paramiko is free software; you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation; either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
-# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
-# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
-
-"""Base classes and helpers for testing paramiko."""
-
-import unittest
-
-from paramiko.py3compat import (
- builtins,
- )
-
-
-def skipUnlessBuiltin(name):
- """Skip decorated test if builtin name does not exist."""
- if getattr(builtins, name, None) is None:
- skip = getattr(unittest, "skip", None)
- if skip is None:
- # Python 2.6 pseudo-skip
- return lambda func: None
- return skip("No builtin " + repr(name))
- return lambda func: func
+# This file's just here so test modules can use explicit-relative imports.
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 00000000..d1967a73
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,103 @@
+import logging
+import os
+import shutil
+import threading
+
+import pytest
+from paramiko import RSAKey, SFTPServer, SFTP, Transport
+
+from .loop import LoopSocket
+from .stub_sftp import StubServer, StubSFTPServer
+from .util import _support
+
+
+# TODO: not a huge fan of conftest.py files, see if we can move these somewhere
+# 'nicer'.
+
+
+# Perform logging by default; pytest will capture and thus hide it normally,
+# presenting it on error/failure. (But also allow turning it off when doing
+# very pinpoint debugging - e.g. using breakpoints, so you don't want output
+# hiding enabled, but also don't want all the logging to gum up the terminal.)
+if not os.environ.get('DISABLE_LOGGING', False):
+ logging.basicConfig(
+ level=logging.DEBUG,
+ # Also make sure to set up timestamping for more sanity when debugging.
+ format="[%(relativeCreated)s]\t%(levelname)s:%(name)s:%(message)s",
+ datefmt="%H:%M:%S",
+ )
+
+
+def make_sftp_folder():
+ """
+ Ensure expected target temp folder exists on the remote end.
+
+ Will clean it out if it already exists.
+ """
+ # TODO: go back to using the sftp functionality itself for folder setup so
+ # we can test against live SFTP servers again someday. (Not clear if anyone
+ # is/was using the old capability for such, though...)
+ # TODO: something that would play nicer with concurrent testing (but
+ # probably e.g. using thread ID or UUIDs or something; not the "count up
+ # until you find one not used!" crap from before...)
+ # TODO: if we want to lock ourselves even harder into localhost-only
+ # testing (probably not?) could use tempdir modules for this for improved
+ # safety. Then again...why would someone have such a folder???
+ path = os.environ.get('TEST_FOLDER', 'paramiko-test-target')
+ # Forcibly nuke this directory locally, since at the moment, the below
+ # fixtures only ever run with a locally scoped stub test server.
+ shutil.rmtree(path, ignore_errors=True)
+ # Then create it anew, again locally, for the same reason.
+ os.mkdir(path)
+ return path
+
+
+@pytest.fixture#(scope='session')
+def sftp_server():
+ """
+ Set up an in-memory SFTP server thread. Yields the client Transport/socket.
+
+ The resulting client Transport (along with all the server components) will
+ be the same object throughout the test session; the `sftp` fixture then
+ creates new higher level client objects wrapped around the client
+ Transport, as necessary.
+ """
+ # Sockets & transports
+ socks = LoopSocket()
+ sockc = LoopSocket()
+ sockc.link(socks)
+ tc = Transport(sockc)
+ ts = Transport(socks)
+ # Auth
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
+ ts.add_server_key(host_key)
+ # Server setup
+ event = threading.Event()
+ server = StubServer()
+ ts.set_subsystem_handler('sftp', SFTPServer, StubSFTPServer)
+ ts.start_server(event, server)
+ # Wait (so client has time to connect? Not sure. Old.)
+ event.wait(1.0)
+ # Make & yield connection.
+ tc.connect(username='slowdive', password='pygmalion')
+ yield tc
+ # TODO: any need for shutdown? Why didn't old suite do so? Or was that the
+ # point of the "join all threads from threading module" crap in test.py?
+
+
+@pytest.fixture
+def sftp(sftp_server):
+ """
+ Yield an SFTP client connected to the global in-session SFTP server thread.
+ """
+ # Client setup
+ client = SFTP.from_transport(sftp_server)
+ # Work in 'remote' folder setup (as it wants to use the client)
+ # TODO: how cleanest to make this available to tests? Doing it this way is
+ # marginally less bad than the previous 'global'-using setup, but not by
+ # much?
+ client.FOLDER = make_sftp_folder()
+ # Yield client to caller
+ yield client
+ # Clean up - as in make_sftp_folder, we assume local-only exec for now.
+ shutil.rmtree(client.FOLDER, ignore_errors=True)
diff --git a/tests/loop.py b/tests/loop.py
index e805ad96..6c432867 100644
--- a/tests/loop.py
+++ b/tests/loop.py
@@ -16,11 +16,9 @@
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
-"""
-...
-"""
+import socket
+import threading
-import threading, socket
from paramiko.common import asbytes
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py
index 0d673091..19545865 100644
--- a/tests/stub_sftp.py
+++ b/tests/stub_sftp.py
@@ -22,6 +22,7 @@ A stub SFTP server for loopback SFTP testing.
import os
import sys
+
from paramiko import (
ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes,
SFTPHandle, SFTP_OK, SFTP_FAILURE, AUTH_SUCCESSFUL, OPEN_SUCCEEDED,
diff --git a/tests/test_auth.py b/tests/test_auth.py
index e78397c6..4eade610 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -31,8 +31,10 @@ from paramiko import (
)
from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL
from paramiko.py3compat import u
-from tests.loop import LoopSocket
-from tests.util import test_path
+
+from .loop import LoopSocket
+from .util import _support, slow
+
_pwd = u('\u2022')
@@ -40,7 +42,7 @@ _pwd = u('\u2022')
class NullServer (ServerInterface):
paranoid_did_password = False
paranoid_did_public_key = False
- paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key'))
+ paranoid_key = DSSKey.from_private_key_file(_support('test_dss.key'))
def get_allowed_auths(self, username):
if username == 'slowdive':
@@ -118,7 +120,7 @@ class AuthTest (unittest.TestCase):
self.sockc.close()
def start_server(self):
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
self.public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
self.event = threading.Event()
@@ -170,7 +172,7 @@ class AuthTest (unittest.TestCase):
self.tc.connect(hostkey=self.public_host_key)
remain = self.tc.auth_password(username='paranoid', password='paranoid')
self.assertEqual(['publickey'], remain)
- key = DSSKey.from_private_key_file(test_path('test_dss.key'))
+ key = DSSKey.from_private_key_file(_support('test_dss.key'))
remain = self.tc.auth_publickey(username='paranoid', key=key)
self.assertEqual([], remain)
self.verify_finished()
@@ -238,6 +240,7 @@ class AuthTest (unittest.TestCase):
etype, evalue, etb = sys.exc_info()
self.assertTrue(issubclass(etype, AuthenticationException))
+ @slow
def test_9_auth_non_responsive(self):
"""
verify that authentication times out if server takes to long to
diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py
index eeb4d0ad..03616c55 100644
--- a/tests/test_buffered_pipe.py
+++ b/tests/test_buffered_pipe.py
@@ -23,6 +23,7 @@ Some unit tests for BufferedPipe.
import threading
import time
import unittest
+
from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
from paramiko import pipe
from paramiko.py3compat import b
diff --git a/tests/test_client.py b/tests/test_client.py
index 7058f394..7163fdcf 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -20,25 +20,32 @@
Some unit tests for SSHClient.
"""
-from __future__ import with_statement
+from __future__ import with_statement, print_function
import gc
+import os
import platform
import socket
-from tempfile import mkstemp
import threading
+import time
import unittest
-import weakref
import warnings
-import os
-import time
-from tests.util import test_path
+import weakref
+from tempfile import mkstemp
+
+from pytest_relaxed import raises
import paramiko
from paramiko.pkey import PublicBlob
from paramiko.common import PY2
from paramiko.ssh_exception import SSHException, AuthenticationException
+from .util import _support, slow
+
+
+requires_gss_auth = unittest.skipUnless(
+ paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available"
+)
FINGERPRINTS = {
'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c',
@@ -106,8 +113,7 @@ class NullServer(paramiko.ServerInterface):
return True
-class SSHClientTest (unittest.TestCase):
-
+class ClientTest(unittest.TestCase):
def setUp(self):
self.sockl = socket.socket()
self.sockl.bind(('localhost', 0))
@@ -120,21 +126,47 @@ class SSHClientTest (unittest.TestCase):
look_for_keys=False,
)
self.event = threading.Event()
+ self.kill_event = threading.Event()
def tearDown(self):
- for attr in "tc ts socks sockl".split():
- if hasattr(self, attr):
- getattr(self, attr).close()
-
- def _run(self, allowed_keys=None, delay=0, public_blob=None):
+ # Shut down client Transport
+ if hasattr(self, 'tc'):
+ self.tc.close()
+ # Shut down shared socket
+ if hasattr(self, 'sockl'):
+ # Signal to server thread that it should shut down early; it checks
+ # this immediately after accept(). (In scenarios where connection
+ # actually succeeded during the test, this becomes a no-op.)
+ self.kill_event.set()
+ # Forcibly connect to server sock in case the server thread is
+ # hanging out in its accept() (e.g. if the client side of the test
+ # fails before it even gets to connecting); there's no other good
+ # way to force an accept() to exit.
+ put_a_sock_in_it = socket.socket()
+ put_a_sock_in_it.connect((self.addr, self.port))
+ put_a_sock_in_it.close()
+ # Then close "our" end of the socket (which _should_ cause the
+ # accept() to bail out, but does not, for some reason. I blame
+ # threading.)
+ self.sockl.close()
+
+ def _run(
+ self, allowed_keys=None, delay=0, public_blob=None, kill_event=None,
+ ):
if allowed_keys is None:
allowed_keys = FINGERPRINTS.keys()
self.socks, addr = self.sockl.accept()
+ # If the kill event was set at this point, it indicates an early
+ # shutdown, so bail out now and don't even try setting up a Transport
+ # (which will just verbosely die.)
+ if kill_event and kill_event.is_set():
+ self.socks.close()
+ return
self.ts = paramiko.Transport(self.socks)
- keypath = test_path('test_rsa.key')
+ keypath = _support('test_rsa.key')
host_key = paramiko.RSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
- keypath = test_path('test_ecdsa_256.key')
+ keypath = _support('test_ecdsa_256.key')
host_key = paramiko.ECDSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob)
@@ -149,12 +181,12 @@ class SSHClientTest (unittest.TestCase):
The exception is ``allowed_keys`` which is stripped and handed to the
``NullServer`` used for testing.
"""
- run_kwargs = {}
+ run_kwargs = {'kill_event': self.kill_event}
for key in ('allowed_keys', 'public_blob'):
run_kwargs[key] = kwargs.pop(key, None)
# Server setup
threading.Thread(target=self._run, kwargs=run_kwargs).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
# Client setup
@@ -190,6 +222,8 @@ class SSHClientTest (unittest.TestCase):
stdout.close()
stderr.close()
+
+class SSHClientTest(ClientTest):
def test_1_client(self):
"""
verify that the SSHClient stuff works too.
@@ -200,22 +234,22 @@ class SSHClientTest (unittest.TestCase):
"""
verify that SSHClient works with a DSA key.
"""
- self._test_connection(key_filename=test_path('test_dss.key'))
+ self._test_connection(key_filename=_support('test_dss.key'))
def test_client_rsa(self):
"""
verify that SSHClient works with an RSA key.
"""
- self._test_connection(key_filename=test_path('test_rsa.key'))
+ self._test_connection(key_filename=_support('test_rsa.key'))
def test_2_5_client_ecdsa(self):
"""
verify that SSHClient works with an ECDSA key.
"""
- self._test_connection(key_filename=test_path('test_ecdsa_256.key'))
+ self._test_connection(key_filename=_support('test_ecdsa_256.key'))
def test_client_ed25519(self):
- self._test_connection(key_filename=test_path('test_ed25519.key'))
+ self._test_connection(key_filename=_support('test_ed25519.key'))
def test_3_multiple_key_files(self):
"""
@@ -238,7 +272,7 @@ class SSHClientTest (unittest.TestCase):
try:
self._test_connection(
key_filename=[
- test_path('test_{0}.key'.format(x)) for x in attempt
+ _support('test_{}.key'.format(x)) for x in attempt
],
allowed_keys=[types_[x] for x in accept],
)
@@ -256,7 +290,7 @@ class SSHClientTest (unittest.TestCase):
# various platforms trigger different errors here >_<
self.assertRaises(SSHException,
self._test_connection,
- key_filename=[test_path('test_rsa.key')],
+ key_filename=[_support('test_rsa.key')],
allowed_keys=['ecdsa-sha2-nistp256'],
)
@@ -266,8 +300,8 @@ class SSHClientTest (unittest.TestCase):
# server-side behavior is 100% identical.)
# NOTE: only bothered whipping up one cert per overall class/family.
for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'):
- cert_name = 'test_{0}.key-cert.pub'.format(type_)
- cert_path = test_path(os.path.join('cert_support', cert_name))
+ cert_name = 'test_{}.key-cert.pub'.format(type_)
+ cert_path = _support(os.path.join('cert_support', cert_name))
self._test_connection(
key_filename=cert_path,
public_blob=PublicBlob.from_file(cert_path),
@@ -281,12 +315,12 @@ class SSHClientTest (unittest.TestCase):
# that a specific cert was found, along with regular authorization
# succeeding proving that the overall flow works.
for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'):
- key_name = 'test_{0}.key'.format(type_)
- key_path = test_path(os.path.join('cert_support', key_name))
+ key_name = 'test_{}.key'.format(type_)
+ key_path = _support(os.path.join('cert_support', key_name))
self._test_connection(
key_filename=key_path,
public_blob=PublicBlob.from_file(
- '{0}-cert.pub'.format(key_path)
+ '{}-cert.pub'.format(key_path)
),
)
@@ -302,7 +336,7 @@ class SSHClientTest (unittest.TestCase):
"""
threading.Thread(target=self._run).start()
hostname = '[%s]:%d' % (self.addr, self.port)
- key_file = test_path('test_ecdsa_256.key')
+ key_file = _support('test_ecdsa_256.key')
public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file)
self.tc = paramiko.SSHClient()
@@ -325,7 +359,7 @@ class SSHClientTest (unittest.TestCase):
"""
warnings.filterwarnings('ignore', 'tempnam.*')
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
fd, localname = mkstemp()
os.close(fd)
@@ -405,7 +439,7 @@ class SSHClientTest (unittest.TestCase):
"""
# Start the thread with a 1 second wait.
threading.Thread(target=self._run, kwargs={'delay': 1}).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
@@ -432,12 +466,13 @@ class SSHClientTest (unittest.TestCase):
# 'television' as per tests/test_pkey.py). NOTE: must use
# key_filename, loading the actual key here with PKey will except
# immediately; we're testing the try/except crap within Client.
- key_filename=[test_path('test_rsa_password.key')],
+ key_filename=[_support('test_rsa_password.key')],
# Actual password for default 'slowdive' user
password='pygmalion',
)
self._test_connection(**kwargs)
+ @slow
def test_9_auth_timeout(self):
"""
verify that the SSHClient has a configurable auth timeout
@@ -450,27 +485,25 @@ class SSHClientTest (unittest.TestCase):
auth_timeout=0.5,
)
+ @requires_gss_auth
def test_10_auth_trickledown_gsskex(self):
"""
Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding
"""
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
kwargs = dict(
gss_kex=True,
- key_filename=[test_path('test_rsa.key')],
+ key_filename=[_support('test_rsa.key')],
)
self._test_connection(**kwargs)
+ @requires_gss_auth
def test_11_auth_trickledown_gssauth(self):
"""
Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
"""
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
kwargs = dict(
gss_auth=True,
- key_filename=[test_path('test_rsa.key')],
+ key_filename=[_support('test_rsa.key')],
)
self._test_connection(**kwargs)
@@ -489,14 +522,13 @@ class SSHClientTest (unittest.TestCase):
password='pygmalion', **self.connect_kwargs
)
+ @requires_gss_auth
def test_13_reject_policy_gsskex(self):
"""
verify that SSHClient's RejectPolicy works,
even if gssapi-keyex was enabled but not used.
"""
# Test for a bug present in paramiko versions released before 2017-08-01
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
threading.Thread(target=self._run).start()
self.tc = paramiko.SSHClient()
@@ -532,7 +564,7 @@ class SSHClientTest (unittest.TestCase):
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
- host_key = ktype.from_private_key_file(test_path(kfile))
+ host_key = ktype.from_private_key_file(_support(kfile))
known_hosts = self.tc.get_host_keys()
known_hosts.add(hostname, host_key.get_name(), host_key)
@@ -556,10 +588,7 @@ class SSHClientTest (unittest.TestCase):
def test_host_key_negotiation_4(self):
self._client_host_key_good(paramiko.RSAKey, 'test_rsa.key')
- def test_update_environment(self):
- """
- Verify that environment variables can be set by the client.
- """
+ def _setup_for_env(self):
threading.Thread(target=self._run).start()
self.tc = paramiko.SSHClient()
@@ -571,6 +600,11 @@ class SSHClientTest (unittest.TestCase):
self.assertTrue(self.event.isSet())
self.assertTrue(self.ts.is_active())
+ def test_update_environment(self):
+ """
+ Verify that environment variables can be set by the client.
+ """
+ self._setup_for_env()
target_env = {b'A': b'B', b'C': b'd'}
self.tc.exec_command('yes', environment=target_env)
@@ -578,19 +612,20 @@ class SSHClientTest (unittest.TestCase):
self.assertEqual(target_env, getattr(schan, 'env', {}))
schan.close()
- # Cannot use assertRaises in context manager mode as it is not supported
- # in Python 2.6.
- try:
+ @unittest.skip("Clients normally fail silently, thus so do we, for now")
+ def test_env_update_failures(self):
+ self._setup_for_env()
+ with self.assertRaises(SSHException) as manager:
# Verify that a rejection by the server can be detected
self.tc.exec_command('yes', environment={b'INVALID_ENV': b''})
- except SSHException as e:
- self.assertTrue('INVALID_ENV' in str(e),
- 'Expected variable name in error message')
- self.assertTrue(isinstance(e.args[1], SSHException),
- 'Expected original SSHException in exception')
- else:
- self.assertFalse(False, 'SSHException was not thrown.')
-
+ self.assertTrue(
+ 'INVALID_ENV' in str(manager.exception),
+ 'Expected variable name in error message'
+ )
+ self.assertTrue(
+ isinstance(manager.exception.args[1], SSHException),
+ 'Expected original SSHException in exception'
+ )
def test_missing_key_policy_accepts_classes_or_instances(self):
"""
@@ -607,3 +642,45 @@ class SSHClientTest (unittest.TestCase):
# Hand in just the class (new behavior)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
assert isinstance(client._policy, paramiko.AutoAddPolicy)
+
+
+class PasswordPassphraseTests(ClientTest):
+ # TODO: most of these could reasonably be set up to use mocks/assertions
+ # (e.g. "gave passphrase -> expect PKey was given it as the passphrase")
+ # instead of suffering a real connection cycle.
+ # TODO: in that case, move the below to be part of an integration suite?
+
+ def test_password_kwarg_works_for_password_auth(self):
+ # Straightforward / duplicate of earlier basic password test.
+ self._test_connection(password='pygmalion')
+
+ # TODO: more granular exception pending #387; should be signaling "no auth
+ # methods available" because no key and no password
+ @raises(SSHException)
+ def test_passphrase_kwarg_not_used_for_password_auth(self):
+ # Using the "right" password in the "wrong" field shouldn't work.
+ self._test_connection(passphrase='pygmalion')
+
+ def test_passphrase_kwarg_used_for_key_passphrase(self):
+ # Straightforward again, with new passphrase kwarg.
+ self._test_connection(
+ key_filename=_support('test_rsa_password.key'),
+ passphrase='television',
+ )
+
+ def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given(self): # noqa
+ # Backwards compatibility: passphrase in the password field.
+ self._test_connection(
+ key_filename=_support('test_rsa_password.key'),
+ password='television',
+ )
+
+ @raises(AuthenticationException) # TODO: more granular
+ def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given(self): # noqa
+ # Sanity: if we're given both fields, the password field is NOT used as
+ # a passphrase.
+ self._test_connection(
+ key_filename=_support('test_rsa_password.key'),
+ password='television',
+ passphrase='wat? lol no',
+ )
diff --git a/tests/test_file.py b/tests/test_file.py
index b33ecd51..3d2c94e6 100755..100644
--- a/tests/test_file.py
+++ b/tests/test_file.py
@@ -27,7 +27,7 @@ from paramiko.common import linefeed_byte, crlf, cr_byte
from paramiko.file import BufferedFile
from paramiko.py3compat import BytesIO
-from tests import skipUnlessBuiltin
+from .util import needs_builtin
class LoopbackFile (BufferedFile):
@@ -198,13 +198,13 @@ class BufferedFileTest (unittest.TestCase):
f.write(text)
self.assertEqual(f.read(), text.encode("utf-8"))
- @skipUnlessBuiltin('memoryview')
+ @needs_builtin('memoryview')
def test_write_bytearray(self):
with LoopbackFile('rb+') as f:
f.write(bytearray(12))
self.assertEqual(f.read(), 12 * b"\0")
- @skipUnlessBuiltin('buffer')
+ @needs_builtin('buffer')
def test_write_buffer(self):
data = 3 * b"pretend giant block of data\n"
offsets = range(0, len(data), 8)
@@ -213,7 +213,7 @@ class BufferedFileTest (unittest.TestCase):
f.write(buffer(data, offset, 8))
self.assertEqual(f.read(), data)
- @skipUnlessBuiltin('memoryview')
+ @needs_builtin('memoryview')
def test_write_memoryview(self):
data = 3 * b"pretend giant block of data\n"
offsets = range(0, len(data), 8)
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index bc220108..d4b632be 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -25,14 +25,17 @@ Test the used APIs for GSS-API / SSPI authentication
import unittest
import socket
+from .util import needs_gssapi
+
+@needs_gssapi
class GSSAPITest(unittest.TestCase):
- @staticmethod
- def init(hostname=None, srv_mode=False):
- global krb5_mech, targ_name, server_mode
- krb5_mech = "1.2.840.113554.1.2.2"
- targ_name = hostname
- server_mode = srv_mode
+ def setup():
+ # TODO: these vars should all come from os.environ or whatever the
+ # approved pytest method is for runtime-configuring test data.
+ self.krb5_mech = "1.2.840.113554.1.2.2"
+ self.targ_name = "hostname"
+ self.server_mode = False
def test_1_pyasn1(self):
"""
@@ -40,9 +43,9 @@ class GSSAPITest(unittest.TestCase):
"""
from pyasn1.type.univ import ObjectIdentifier
from pyasn1.codec.der import encoder, decoder
- oid = encoder.encode(ObjectIdentifier(krb5_mech))
+ oid = encoder.encode(ObjectIdentifier(self.krb5_mech))
mech, __ = decoder.decode(oid)
- self.assertEquals(krb5_mech, mech.__str__())
+ self.assertEquals(self.krb5_mech, mech.__str__())
def test_2_gssapi_sspi(self):
"""
@@ -61,7 +64,7 @@ class GSSAPITest(unittest.TestCase):
mic_msg = b"G'day Mate!"
if _API == "MIT":
- if server_mode:
+ if self.server_mode:
gss_flags = (gssapi.C_PROT_READY_FLAG,
gssapi.C_INTEG_FLAG,
gssapi.C_MUTUAL_FLAG,
@@ -73,13 +76,13 @@ class GSSAPITest(unittest.TestCase):
# Initialize a GSS-API context.
ctx = gssapi.Context()
ctx.flags = gss_flags
- krb5_oid = gssapi.OID.mech_from_string(krb5_mech)
- target_name = gssapi.Name("host@" + targ_name,
+ krb5_oid = gssapi.OID.mech_from_string(self.krb5_mech)
+ target_name = gssapi.Name("host@" + self.targ_name,
gssapi.C_NT_HOSTBASED_SERVICE)
gss_ctxt = gssapi.InitContext(peer_name=target_name,
mech_type=krb5_oid,
req_flags=ctx.flags)
- if server_mode:
+ if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.established
self.assertEquals(False, gss_ctxt_status)
@@ -99,7 +102,7 @@ class GSSAPITest(unittest.TestCase):
# Build MIC
mic_token = gss_ctxt.get_mic(mic_msg)
- if server_mode:
+ if self.server_mode:
# Check MIC
status = gss_srv_ctxt.verify_mic(mic_msg, mic_token)
self.assertEquals(0, status)
@@ -110,11 +113,11 @@ class GSSAPITest(unittest.TestCase):
sspicon.ISC_REQ_DELEGATE
)
# Initialize a GSS-API context.
- target_name = "host/" + socket.getfqdn(targ_name)
+ target_name = "host/" + socket.getfqdn(self.targ_name)
gss_ctxt = sspi.ClientAuth("Kerberos",
scflags=gss_flags,
targetspn=target_name)
- if server_mode:
+ if self.server_mode:
error, token = gss_ctxt.authorize(c_token)
c_token = token[0].Buffer
self.assertEquals(0, error)
diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py
index 2c7ceeb9..cd75f8ab 100644
--- a/tests/test_hostkeys.py
+++ b/tests/test_hostkeys.py
@@ -23,6 +23,7 @@ Some unit tests for HostKeys.
from binascii import hexlify
import os
import unittest
+
import paramiko
from paramiko.py3compat import decodebytes
diff --git a/tests/test_kex.py b/tests/test_kex.py
index b7f588f7..b5808e7e 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -24,14 +24,15 @@ from binascii import hexlify, unhexlify
import os
import unittest
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.asymmetric import ec
+
import paramiko.util
from paramiko.kex_group1 import KexGroup1
from paramiko.kex_gex import KexGex, KexGexSHA256
from paramiko import Message
from paramiko.common import byte_chr
from paramiko.kex_ecdh_nist import KexNistp256
-from cryptography.hazmat.backends import default_backend
-from cryptography.hazmat.primitives.asymmetric import ec
def dummy_urandom(n):
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index af342a7c..025d1faa 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -31,6 +31,8 @@ import unittest
import paramiko
+from .util import needs_gssapi
+
class NullServer (paramiko.ServerInterface):
@@ -57,6 +59,7 @@ class NullServer (paramiko.ServerInterface):
return True
+@needs_gssapi
class GSSKexTest(unittest.TestCase):
@staticmethod
def init(username, hostname):
diff --git a/tests/test_message.py b/tests/test_message.py
index f18cae90..645b0509 100644
--- a/tests/test_message.py
+++ b/tests/test_message.py
@@ -21,6 +21,7 @@ Some unit tests for ssh protocol message blocks.
"""
import unittest
+
from paramiko.message import Message
from paramiko.common import byte_chr, zero_byte
diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py
index 02173292..414b7e38 100644
--- a/tests/test_packetizer.py
+++ b/tests/test_packetizer.py
@@ -27,11 +27,12 @@ from hashlib import sha1
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes
-from tests.loop import LoopSocket
-
from paramiko import Message, Packetizer, util
from paramiko.common import byte_chr, zero_byte
+from .loop import LoopSocket
+
+
x55 = byte_chr(0x55)
x1f = byte_chr(0x1f)
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 42d8e6bb..1827d2a9 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -30,7 +30,8 @@ import base64
from paramiko import RSAKey, DSSKey, ECDSAKey, Ed25519Key, Message, util
from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2
-from tests.util import test_path
+from .util import _support
+
# from openssh's ssh-keygen
PUB_RSA = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c='
@@ -138,7 +139,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(exp, key)
def test_2_load_rsa(self):
- key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ key = RSAKey.from_private_key_file(_support('test_rsa.key'))
self.assertEqual('ssh-rsa', key.get_name())
exp_rsa = b(FINGER_RSA.split()[1].replace(':', ''))
my_rsa = hexlify(key.get_fingerprint())
@@ -154,7 +155,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key, key2)
def test_3_load_rsa_password(self):
- key = RSAKey.from_private_key_file(test_path('test_rsa_password.key'), 'television')
+ key = RSAKey.from_private_key_file(_support('test_rsa_password.key'), 'television')
self.assertEqual('ssh-rsa', key.get_name())
exp_rsa = b(FINGER_RSA.split()[1].replace(':', ''))
my_rsa = hexlify(key.get_fingerprint())
@@ -163,7 +164,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(1024, key.get_bits())
def test_4_load_dss(self):
- key = DSSKey.from_private_key_file(test_path('test_dss.key'))
+ key = DSSKey.from_private_key_file(_support('test_dss.key'))
self.assertEqual('ssh-dss', key.get_name())
exp_dss = b(FINGER_DSS.split()[1].replace(':', ''))
my_dss = hexlify(key.get_fingerprint())
@@ -179,7 +180,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key, key2)
def test_5_load_dss_password(self):
- key = DSSKey.from_private_key_file(test_path('test_dss_password.key'), 'television')
+ key = DSSKey.from_private_key_file(_support('test_dss_password.key'), 'television')
self.assertEqual('ssh-dss', key.get_name())
exp_dss = b(FINGER_DSS.split()[1].replace(':', ''))
my_dss = hexlify(key.get_fingerprint())
@@ -189,7 +190,7 @@ class KeyTest(unittest.TestCase):
def test_6_compare_rsa(self):
# verify that the private & public keys compare equal
- key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ key = RSAKey.from_private_key_file(_support('test_rsa.key'))
self.assertEqual(key, key)
pub = RSAKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -198,7 +199,7 @@ class KeyTest(unittest.TestCase):
def test_7_compare_dss(self):
# verify that the private & public keys compare equal
- key = DSSKey.from_private_key_file(test_path('test_dss.key'))
+ key = DSSKey.from_private_key_file(_support('test_dss.key'))
self.assertEqual(key, key)
pub = DSSKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -207,7 +208,7 @@ class KeyTest(unittest.TestCase):
def test_8_sign_rsa(self):
# verify that the rsa private key can sign and verify
- key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ key = RSAKey.from_private_key_file(_support('test_rsa.key'))
msg = key.sign_ssh_data(b'ice weasels')
self.assertTrue(type(msg) is Message)
msg.rewind()
@@ -220,7 +221,7 @@ class KeyTest(unittest.TestCase):
def test_9_sign_dss(self):
# verify that the dss private key can sign and verify
- key = DSSKey.from_private_key_file(test_path('test_dss.key'))
+ key = DSSKey.from_private_key_file(_support('test_dss.key'))
msg = key.sign_ssh_data(b'ice weasels')
self.assertTrue(type(msg) is Message)
msg.rewind()
@@ -275,7 +276,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key.get_name(), 'ecdsa-sha2-nistp521')
def test_10_load_ecdsa_256(self):
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_256.key'))
self.assertEqual('ecdsa-sha2-nistp256', key.get_name())
exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
@@ -291,7 +292,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key, key2)
def test_11_load_ecdsa_password_256(self):
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_256.key'), b'television')
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_password_256.key'), b'television')
self.assertEqual('ecdsa-sha2-nistp256', key.get_name())
exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
@@ -301,7 +302,7 @@ class KeyTest(unittest.TestCase):
def test_12_compare_ecdsa_256(self):
# verify that the private & public keys compare equal
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_256.key'))
self.assertEqual(key, key)
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -310,7 +311,7 @@ class KeyTest(unittest.TestCase):
def test_13_sign_ecdsa_256(self):
# verify that the rsa private key can sign and verify
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_256.key'))
msg = key.sign_ssh_data(b'ice weasels')
self.assertTrue(type(msg) is Message)
msg.rewind()
@@ -325,7 +326,7 @@ class KeyTest(unittest.TestCase):
self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg))
def test_14_load_ecdsa_384(self):
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_384.key'))
self.assertEqual('ecdsa-sha2-nistp384', key.get_name())
exp_ecdsa = b(FINGER_ECDSA_384.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
@@ -341,7 +342,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key, key2)
def test_15_load_ecdsa_password_384(self):
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_384.key'), b'television')
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_password_384.key'), b'television')
self.assertEqual('ecdsa-sha2-nistp384', key.get_name())
exp_ecdsa = b(FINGER_ECDSA_384.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
@@ -351,7 +352,7 @@ class KeyTest(unittest.TestCase):
def test_16_compare_ecdsa_384(self):
# verify that the private & public keys compare equal
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_384.key'))
self.assertEqual(key, key)
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -360,7 +361,7 @@ class KeyTest(unittest.TestCase):
def test_17_sign_ecdsa_384(self):
# verify that the rsa private key can sign and verify
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_384.key'))
msg = key.sign_ssh_data(b'ice weasels')
self.assertTrue(type(msg) is Message)
msg.rewind()
@@ -375,7 +376,7 @@ class KeyTest(unittest.TestCase):
self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg))
def test_18_load_ecdsa_521(self):
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_521.key'))
self.assertEqual('ecdsa-sha2-nistp521', key.get_name())
exp_ecdsa = b(FINGER_ECDSA_521.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
@@ -394,7 +395,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key, key2)
def test_19_load_ecdsa_password_521(self):
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_521.key'), b'television')
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_password_521.key'), b'television')
self.assertEqual('ecdsa-sha2-nistp521', key.get_name())
exp_ecdsa = b(FINGER_ECDSA_521.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
@@ -404,7 +405,7 @@ class KeyTest(unittest.TestCase):
def test_20_compare_ecdsa_521(self):
# verify that the private & public keys compare equal
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_521.key'))
self.assertEqual(key, key)
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -413,7 +414,7 @@ class KeyTest(unittest.TestCase):
def test_21_sign_ecdsa_521(self):
# verify that the rsa private key can sign and verify
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_521.key'))
msg = key.sign_ssh_data(b'ice weasels')
self.assertTrue(type(msg) is Message)
msg.rewind()
@@ -429,7 +430,7 @@ class KeyTest(unittest.TestCase):
def test_salt_size(self):
# Read an existing encrypted private key
- file_ = test_path('test_rsa_password.key')
+ file_ = _support('test_rsa_password.key')
password = 'television'
newfile = file_ + '.new'
newpassword = 'radio'
@@ -446,20 +447,20 @@ class KeyTest(unittest.TestCase):
os.remove(newfile)
def test_stringification(self):
- key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ key = RSAKey.from_private_key_file(_support('test_rsa.key'))
comparable = TEST_KEY_BYTESTR_2 if PY2 else TEST_KEY_BYTESTR_3
self.assertEqual(str(key), comparable)
def test_ed25519(self):
- key1 = Ed25519Key.from_private_key_file(test_path('test_ed25519.key'))
+ key1 = Ed25519Key.from_private_key_file(_support('test_ed25519.key'))
key2 = Ed25519Key.from_private_key_file(
- test_path('test_ed25519_password.key'), b'abc123'
+ _support('test_ed25519_password.key'), b'abc123'
)
self.assertNotEqual(key1.asbytes(), key2.asbytes())
def test_ed25519_compare(self):
# verify that the private & public keys compare equal
- key = Ed25519Key.from_private_key_file(test_path('test_ed25519.key'))
+ key = Ed25519Key.from_private_key_file(_support('test_ed25519.key'))
self.assertEqual(key, key)
pub = Ed25519Key(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -469,7 +470,7 @@ class KeyTest(unittest.TestCase):
def test_ed25519_nonbytes_password(self):
# https://github.com/paramiko/paramiko/issues/1039
key = Ed25519Key.from_private_key_file(
- test_path('test_ed25519_password.key'),
+ _support('test_ed25519_password.key'),
# NOTE: not a bytes. Amusingly, the test above for same key DOES
# explicitly cast to bytes...code smell!
'abc123',
@@ -477,14 +478,14 @@ class KeyTest(unittest.TestCase):
# No exception -> it's good. Meh.
def test_ed25519_load_from_file_obj(self):
- with open(test_path('test_ed25519.key')) as pkey_fileobj:
+ with open(_support('test_ed25519.key')) as pkey_fileobj:
key = Ed25519Key.from_private_key(pkey_fileobj)
self.assertEqual(key, key)
self.assertTrue(key.can_sign())
def test_keyfile_is_actually_encrypted(self):
# Read an existing encrypted private key
- file_ = test_path('test_rsa_password.key')
+ file_ = _support('test_rsa_password.key')
password = 'television'
newfile = file_ + '.new'
newpassword = 'radio'
@@ -502,10 +503,10 @@ class KeyTest(unittest.TestCase):
# test_client.py; this and nearby cert tests are more about the gritty
# details.
# PKey.load_certificate
- key_path = test_path(os.path.join('cert_support', 'test_rsa.key'))
+ key_path = _support(os.path.join('cert_support', 'test_rsa.key'))
key = RSAKey.from_private_key_file(key_path)
self.assertTrue(key.public_blob is None)
- cert_path = test_path(
+ cert_path = _support(
os.path.join('cert_support', 'test_rsa.key-cert.pub')
)
key.load_certificate(cert_path)
@@ -524,10 +525,10 @@ class KeyTest(unittest.TestCase):
self.assertEqual(msg.get_int64(), 1234)
# Prevented from loading certificate that doesn't match
- key_path = test_path(os.path.join('cert_support', 'test_ed25519.key'))
+ key_path = _support(os.path.join('cert_support', 'test_ed25519.key'))
key1 = Ed25519Key.from_private_key_file(key_path)
self.assertRaises(
ValueError,
key1.load_certificate,
- test_path('test_rsa.key-cert.pub'),
+ _support('test_rsa.key-cert.pub'),
)
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index b3c7bf98..09a50453 100755..100644
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -32,16 +32,19 @@ import warnings
from binascii import hexlify
from tempfile import mkstemp
+import pytest
+
import paramiko
+import paramiko.util
from paramiko.py3compat import PY2, b, u, StringIO
from paramiko.common import o777, o600, o666, o644
-from tests import skipUnlessBuiltin
-from tests.stub_sftp import StubServer, StubSFTPServer
-from tests.loop import LoopSocket
-from tests.util import test_path
-import paramiko.util
from paramiko.sftp_attr import SFTPAttributes
+from .util import needs_builtin
+from .stub_sftp import StubServer, StubSFTPServer
+from .util import _support, slow
+
+
ARTICLE = '''
Insulin sensitivity and liver insulin receptor structure in ducks from two
genera
@@ -81,303 +84,201 @@ decreased compared with chicken.
# Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation byte"
NON_UTF8_DATA = b'\xC3\xC3'
-FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000')
-
-sftp = None
-tc = None
-g_big_file_test = True
-# we need to use eval(compile()) here because Py3.2 doesn't support the 'u' marker for unicode
-# this test is the only line in the entire program that has to be treated specially to support Py3.2
-unicode_folder = eval(compile(r"u'\u00fcnic\u00f8de'" if PY2 else r"'\u00fcnic\u00f8de'", 'test_sftp.py', 'eval'))
+unicode_folder = u'\u00fcnic\u00f8de' if PY2 else '\u00fcnic\u00f8de'
utf8_folder = b'/\xc3\xbcnic\xc3\xb8\x64\x65'
-def get_sftp():
- global sftp
- return sftp
-
-
-class SFTPTest (unittest.TestCase):
- @staticmethod
- def init(hostname, username, keyfile, passwd):
- global sftp, tc
-
- t = paramiko.Transport(hostname)
- tc = t
- try:
- key = paramiko.RSAKey.from_private_key_file(keyfile, passwd)
- except paramiko.PasswordRequiredException:
- sys.stderr.write('\n\nparamiko.RSAKey.from_private_key_file REQUIRES PASSWORD.\n')
- sys.stderr.write('You have two options:\n')
- sys.stderr.write('* Use the "-K" option to point to a different (non-password-protected)\n')
- sys.stderr.write(' private key file.\n')
- sys.stderr.write('* Use the "-P" option to provide the password needed to unlock this private\n')
- sys.stderr.write(' key.\n')
- sys.stderr.write('\n')
- sys.exit(1)
- try:
- t.connect(username=username, pkey=key)
- except paramiko.SSHException:
- t.close()
- sys.stderr.write('\n\nparamiko.Transport.connect FAILED.\n')
- sys.stderr.write('There are several possible reasons why it might fail so quickly:\n\n')
- sys.stderr.write('* The host to connect to (%s) is not a valid SSH server.\n' % hostname)
- sys.stderr.write(' (Use the "-H" option to change the host.)\n')
- sys.stderr.write('* The username to auth as (%s) is invalid.\n' % username)
- sys.stderr.write(' (Use the "-U" option to change the username.)\n')
- sys.stderr.write('* The private key given (%s) is not accepted by the server.\n' % keyfile)
- sys.stderr.write(' (Use the "-K" option to provide a different key file.)\n')
- sys.stderr.write('\n')
- sys.exit(1)
- sftp = paramiko.SFTP.from_transport(t)
-
- @staticmethod
- def init_loopback():
- global sftp, tc
-
- socks = LoopSocket()
- sockc = LoopSocket()
- sockc.link(socks)
- tc = paramiko.Transport(sockc)
- ts = paramiko.Transport(socks)
-
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
- ts.add_server_key(host_key)
- event = threading.Event()
- server = StubServer()
- ts.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer)
- ts.start_server(event, server)
- tc.connect(username='slowdive', password='pygmalion')
- event.wait(1.0)
-
- sftp = paramiko.SFTP.from_transport(tc)
-
- @staticmethod
- def set_big_file_test(onoff):
- global g_big_file_test
- g_big_file_test = onoff
-
- def setUp(self):
- global FOLDER
- for i in range(1000):
- FOLDER = FOLDER[:-3] + '%03d' % i
- try:
- sftp.mkdir(FOLDER)
- break
- except (IOError, OSError):
- pass
-
- def tearDown(self):
- #sftp.chdir()
- sftp.rmdir(FOLDER)
-
- def test_1_file(self):
+@slow
+class TestSFTP(object):
+ def test_1_file(self, sftp):
"""
verify that we can create a file.
"""
- f = sftp.open(FOLDER + '/test', 'w')
+ f = sftp.open(sftp.FOLDER + '/test', 'w')
try:
- self.assertEqual(f.stat().st_size, 0)
+ assert f.stat().st_size == 0
finally:
f.close()
- sftp.remove(FOLDER + '/test')
+ sftp.remove(sftp.FOLDER + '/test')
- def test_2_close(self):
+ def test_2_close(self, sftp):
"""
- verify that closing the sftp session doesn't do anything bad, and that
- a new one can be opened.
+ Verify that SFTP session close() causes a socket error on next action.
"""
- global sftp
sftp.close()
- try:
- sftp.open(FOLDER + '/test2', 'w')
- self.fail('expected exception')
- except:
- pass
- sftp = paramiko.SFTP.from_transport(tc)
+ with pytest.raises(socket.error, match='Socket is closed'):
+ sftp.open(sftp.FOLDER + '/test2', 'w')
- def test_2_sftp_can_be_used_as_context_manager(self):
+ def test_2_sftp_can_be_used_as_context_manager(self, sftp):
"""
verify that the sftp session is closed when exiting the context manager
"""
- global sftp
with sftp:
pass
- try:
- sftp.open(FOLDER + '/test2', 'w')
- self.fail('expected exception')
- except (EOFError, socket.error):
- pass
- finally:
- sftp = paramiko.SFTP.from_transport(tc)
+ with pytest.raises(socket.error, match='Socket is closed'):
+ sftp.open(sftp.FOLDER + '/test2', 'w')
- def test_3_write(self):
+ def test_3_write(self, sftp):
"""
verify that a file can be created and written, and the size is correct.
"""
try:
- with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f:
f.write(ARTICLE)
- self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
+ assert sftp.stat(sftp.FOLDER + '/duck.txt').st_size == 1483
finally:
- sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
- def test_3_sftp_file_can_be_used_as_context_manager(self):
+ def test_3_sftp_file_can_be_used_as_context_manager(self, sftp):
"""
verify that an opened file can be used as a context manager
"""
try:
- with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f:
f.write(ARTICLE)
- self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
+ assert sftp.stat(sftp.FOLDER + '/duck.txt').st_size == 1483
finally:
- sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
- def test_4_append(self):
+ def test_4_append(self, sftp):
"""
verify that a file can be opened for append, and tell() still works.
"""
try:
- with sftp.open(FOLDER + '/append.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/append.txt', 'w') as f:
f.write('first line\nsecond line\n')
- self.assertEqual(f.tell(), 23)
+ assert f.tell() == 23
- with sftp.open(FOLDER + '/append.txt', 'a+') as f:
+ with sftp.open(sftp.FOLDER + '/append.txt', 'a+') as f:
f.write('third line!!!\n')
- self.assertEqual(f.tell(), 37)
- self.assertEqual(f.stat().st_size, 37)
+ assert f.tell() == 37
+ assert f.stat().st_size == 37
f.seek(-26, f.SEEK_CUR)
- self.assertEqual(f.readline(), 'second line\n')
+ assert f.readline() == 'second line\n'
finally:
- sftp.remove(FOLDER + '/append.txt')
+ sftp.remove(sftp.FOLDER + '/append.txt')
- def test_5_rename(self):
+ def test_5_rename(self, sftp):
"""
verify that renaming a file works.
"""
try:
- with sftp.open(FOLDER + '/first.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/first.txt', 'w') as f:
f.write('content!\n')
- sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt')
- try:
- sftp.open(FOLDER + '/first.txt', 'r')
- self.assertTrue(False, 'no exception on reading nonexistent file')
- except IOError:
- pass
- with sftp.open(FOLDER + '/second.txt', 'r') as f:
+ sftp.rename(sftp.FOLDER + '/first.txt', sftp.FOLDER + '/second.txt')
+ with pytest.raises(IOError, match='No such file'):
+ sftp.open(sftp.FOLDER + '/first.txt', 'r')
+ with sftp.open(sftp.FOLDER + '/second.txt', 'r') as f:
f.seek(-6, f.SEEK_END)
- self.assertEqual(u(f.read(4)), 'tent')
+ assert u(f.read(4)) == 'tent'
finally:
+ # TODO: this is gross, make some sort of 'remove if possible' / 'rm
+ # -f' a-like, jeez
try:
- sftp.remove(FOLDER + '/first.txt')
+ sftp.remove(sftp.FOLDER + '/first.txt')
except:
pass
try:
- sftp.remove(FOLDER + '/second.txt')
+ sftp.remove(sftp.FOLDER + '/second.txt')
except:
pass
- def test_5a_posix_rename(self):
+ def test_5a_posix_rename(self, sftp):
"""Test posix-rename@openssh.com protocol extension."""
try:
# first check that the normal rename works as specified
- with sftp.open(FOLDER + '/a', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/a', 'w') as f:
f.write('one')
- sftp.rename(FOLDER + '/a', FOLDER + '/b')
- with sftp.open(FOLDER + '/a', 'w') as f:
+ sftp.rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b')
+ with sftp.open(sftp.FOLDER + '/a', 'w') as f:
f.write('two')
- try:
- sftp.rename(FOLDER + '/a', FOLDER + '/b')
- self.assertTrue(False, 'no exception when rename-ing onto existing file')
- except (OSError, IOError):
- pass
+ with pytest.raises(IOError): # actual message seems generic
+ sftp.rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b')
# now check with the posix_rename
- sftp.posix_rename(FOLDER + '/a', FOLDER + '/b')
- with sftp.open(FOLDER + '/b', 'r') as f:
+ sftp.posix_rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b')
+ with sftp.open(sftp.FOLDER + '/b', 'r') as f:
data = u(f.read())
- self.assertEqual('two', data, "Contents of renamed file not the same as original file")
+ err = "Contents of renamed file not the same as original file"
+ assert 'two' == data, err
finally:
try:
- sftp.remove(FOLDER + '/a')
+ sftp.remove(sftp.FOLDER + '/a')
except:
pass
try:
- sftp.remove(FOLDER + '/b')
+ sftp.remove(sftp.FOLDER + '/b')
except:
pass
- def test_6_folder(self):
+ def test_6_folder(self, sftp):
"""
create a temporary folder, verify that we can create a file in it, then
remove the folder and verify that we can't create a file in it anymore.
"""
- sftp.mkdir(FOLDER + '/subfolder')
- sftp.open(FOLDER + '/subfolder/test', 'w').close()
- sftp.remove(FOLDER + '/subfolder/test')
- sftp.rmdir(FOLDER + '/subfolder')
- try:
- sftp.open(FOLDER + '/subfolder/test')
- # shouldn't be able to create that file
- self.assertTrue(False, 'no exception at dummy file creation')
- except IOError:
- pass
+ sftp.mkdir(sftp.FOLDER + '/subfolder')
+ sftp.open(sftp.FOLDER + '/subfolder/test', 'w').close()
+ sftp.remove(sftp.FOLDER + '/subfolder/test')
+ sftp.rmdir(sftp.FOLDER + '/subfolder')
+ # shouldn't be able to create that file if dir removed
+ with pytest.raises(IOError, match="No such file"):
+ sftp.open(sftp.FOLDER + '/subfolder/test')
- def test_7_listdir(self):
+ def test_7_listdir(self, sftp):
"""
verify that a folder can be created, a bunch of files can be placed in
it, and those files show up in sftp.listdir.
"""
try:
- sftp.open(FOLDER + '/duck.txt', 'w').close()
- sftp.open(FOLDER + '/fish.txt', 'w').close()
- sftp.open(FOLDER + '/tertiary.py', 'w').close()
-
- x = sftp.listdir(FOLDER)
- self.assertEqual(len(x), 3)
- self.assertTrue('duck.txt' in x)
- self.assertTrue('fish.txt' in x)
- self.assertTrue('tertiary.py' in x)
- self.assertTrue('random' not in x)
+ sftp.open(sftp.FOLDER + '/duck.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/fish.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/tertiary.py', 'w').close()
+
+ x = sftp.listdir(sftp.FOLDER)
+ assert len(x) == 3
+ assert 'duck.txt' in x
+ assert 'fish.txt' in x
+ assert 'tertiary.py' in x
+ assert 'random' not in x
finally:
- sftp.remove(FOLDER + '/duck.txt')
- sftp.remove(FOLDER + '/fish.txt')
- sftp.remove(FOLDER + '/tertiary.py')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/fish.txt')
+ sftp.remove(sftp.FOLDER + '/tertiary.py')
- def test_7_5_listdir_iter(self):
+ def test_7_5_listdir_iter(self, sftp):
"""
listdir_iter version of above test
"""
try:
- sftp.open(FOLDER + '/duck.txt', 'w').close()
- sftp.open(FOLDER + '/fish.txt', 'w').close()
- sftp.open(FOLDER + '/tertiary.py', 'w').close()
-
- x = [x.filename for x in sftp.listdir_iter(FOLDER)]
- self.assertEqual(len(x), 3)
- self.assertTrue('duck.txt' in x)
- self.assertTrue('fish.txt' in x)
- self.assertTrue('tertiary.py' in x)
- self.assertTrue('random' not in x)
+ sftp.open(sftp.FOLDER + '/duck.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/fish.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/tertiary.py', 'w').close()
+
+ x = [x.filename for x in sftp.listdir_iter(sftp.FOLDER)]
+ assert len(x) == 3
+ assert 'duck.txt' in x
+ assert 'fish.txt' in x
+ assert 'tertiary.py' in x
+ assert 'random' not in x
finally:
- sftp.remove(FOLDER + '/duck.txt')
- sftp.remove(FOLDER + '/fish.txt')
- sftp.remove(FOLDER + '/tertiary.py')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/fish.txt')
+ sftp.remove(sftp.FOLDER + '/tertiary.py')
- def test_8_setstat(self):
+ def test_8_setstat(self, sftp):
"""
verify that the setstat functions (chown, chmod, utime, truncate) work.
"""
try:
- with sftp.open(FOLDER + '/special', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/special', 'w') as f:
f.write('x' * 1024)
- stat = sftp.stat(FOLDER + '/special')
- sftp.chmod(FOLDER + '/special', (stat.st_mode & ~o777) | o600)
- stat = sftp.stat(FOLDER + '/special')
+ stat = sftp.stat(sftp.FOLDER + '/special')
+ sftp.chmod(sftp.FOLDER + '/special', (stat.st_mode & ~o777) | o600)
+ stat = sftp.stat(sftp.FOLDER + '/special')
expected_mode = o600
if sys.platform == 'win32':
# chmod not really functional on windows
@@ -385,35 +286,35 @@ class SFTPTest (unittest.TestCase):
if sys.platform == 'cygwin':
# even worse.
expected_mode = o644
- self.assertEqual(stat.st_mode & o777, expected_mode)
- self.assertEqual(stat.st_size, 1024)
+ assert stat.st_mode & o777 == expected_mode
+ assert stat.st_size == 1024
mtime = stat.st_mtime - 3600
atime = stat.st_atime - 1800
- sftp.utime(FOLDER + '/special', (atime, mtime))
- stat = sftp.stat(FOLDER + '/special')
- self.assertEqual(stat.st_mtime, mtime)
+ sftp.utime(sftp.FOLDER + '/special', (atime, mtime))
+ stat = sftp.stat(sftp.FOLDER + '/special')
+ assert stat.st_mtime == mtime
if sys.platform not in ('win32', 'cygwin'):
- self.assertEqual(stat.st_atime, atime)
+ assert stat.st_atime == atime
# can't really test chown, since we'd have to know a valid uid.
- sftp.truncate(FOLDER + '/special', 512)
- stat = sftp.stat(FOLDER + '/special')
- self.assertEqual(stat.st_size, 512)
+ sftp.truncate(sftp.FOLDER + '/special', 512)
+ stat = sftp.stat(sftp.FOLDER + '/special')
+ assert stat.st_size == 512
finally:
- sftp.remove(FOLDER + '/special')
+ sftp.remove(sftp.FOLDER + '/special')
- def test_9_fsetstat(self):
+ def test_9_fsetstat(self, sftp):
"""
verify that the fsetstat functions (chown, chmod, utime, truncate)
work on open files.
"""
try:
- with sftp.open(FOLDER + '/special', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/special', 'w') as f:
f.write('x' * 1024)
- with sftp.open(FOLDER + '/special', 'r+') as f:
+ with sftp.open(sftp.FOLDER + '/special', 'r+') as f:
stat = f.stat()
f.chmod((stat.st_mode & ~o777) | o600)
stat = f.stat()
@@ -425,26 +326,26 @@ class SFTPTest (unittest.TestCase):
if sys.platform == 'cygwin':
# even worse.
expected_mode = o644
- self.assertEqual(stat.st_mode & o777, expected_mode)
- self.assertEqual(stat.st_size, 1024)
+ assert stat.st_mode & o777 == expected_mode
+ assert stat.st_size == 1024
mtime = stat.st_mtime - 3600
atime = stat.st_atime - 1800
f.utime((atime, mtime))
stat = f.stat()
- self.assertEqual(stat.st_mtime, mtime)
+ assert stat.st_mtime == mtime
if sys.platform not in ('win32', 'cygwin'):
- self.assertEqual(stat.st_atime, atime)
+ assert stat.st_atime == atime
# can't really test chown, since we'd have to know a valid uid.
f.truncate(512)
stat = f.stat()
- self.assertEqual(stat.st_size, 512)
+ assert stat.st_size == 512
finally:
- sftp.remove(FOLDER + '/special')
+ sftp.remove(sftp.FOLDER + '/special')
- def test_A_readline_seek(self):
+ def test_A_readline_seek(self, sftp):
"""
create a text file and write a bunch of text into it. then count the lines
in the file, and seek around to retrieve particular lines. this should
@@ -452,10 +353,10 @@ class SFTPTest (unittest.TestCase):
buffering is reset on 'seek'.
"""
try:
- with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f:
f.write(ARTICLE)
- with sftp.open(FOLDER + '/duck.txt', 'r+') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'r+') as f:
line_number = 0
loc = 0
pos_list = []
@@ -463,35 +364,35 @@ class SFTPTest (unittest.TestCase):
line_number += 1
pos_list.append(loc)
loc = f.tell()
- self.assertTrue(f.seekable())
+ assert f.seekable()
f.seek(pos_list[6], f.SEEK_SET)
- self.assertEqual(f.readline(), 'Nouzilly, France.\n')
+ assert f.readline(), 'Nouzilly == France.\n'
f.seek(pos_list[17], f.SEEK_SET)
- self.assertEqual(f.readline()[:4], 'duck')
+ assert f.readline()[:4] == 'duck'
f.seek(pos_list[10], f.SEEK_SET)
- self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n')
+ assert f.readline() == 'duck types were equally resistant to exogenous insulin compared with chicken.\n'
finally:
- sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
- def test_B_write_seek(self):
+ def test_B_write_seek(self, sftp):
"""
create a text file, seek back and change part of it, and verify that the
changes worked.
"""
try:
- with sftp.open(FOLDER + '/testing.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/testing.txt', 'w') as f:
f.write('hello kitty.\n')
f.seek(-5, f.SEEK_CUR)
f.write('dd')
- self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13)
- with sftp.open(FOLDER + '/testing.txt', 'r') as f:
+ assert sftp.stat(sftp.FOLDER + '/testing.txt').st_size == 13
+ with sftp.open(sftp.FOLDER + '/testing.txt', 'r') as f:
data = f.read(20)
- self.assertEqual(data, b'hello kiddy.\n')
+ assert data == b'hello kiddy.\n'
finally:
- sftp.remove(FOLDER + '/testing.txt')
+ sftp.remove(sftp.FOLDER + '/testing.txt')
- def test_C_symlink(self):
+ def test_C_symlink(self, sftp):
"""
create a symlink and then check that lstat doesn't follow it.
"""
@@ -500,97 +401,85 @@ class SFTPTest (unittest.TestCase):
return
try:
- with sftp.open(FOLDER + '/original.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/original.txt', 'w') as f:
f.write('original\n')
- sftp.symlink('original.txt', FOLDER + '/link.txt')
- self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt')
+ sftp.symlink('original.txt', sftp.FOLDER + '/link.txt')
+ assert sftp.readlink(sftp.FOLDER + '/link.txt') == 'original.txt'
- with sftp.open(FOLDER + '/link.txt', 'r') as f:
- self.assertEqual(f.readlines(), ['original\n'])
+ with sftp.open(sftp.FOLDER + '/link.txt', 'r') as f:
+ assert f.readlines() == ['original\n']
cwd = sftp.normalize('.')
if cwd[-1] == '/':
cwd = cwd[:-1]
- abs_path = cwd + '/' + FOLDER + '/original.txt'
- sftp.symlink(abs_path, FOLDER + '/link2.txt')
- self.assertEqual(abs_path, sftp.readlink(FOLDER + '/link2.txt'))
+ abs_path = cwd + '/' + sftp.FOLDER + '/original.txt'
+ sftp.symlink(abs_path, sftp.FOLDER + '/link2.txt')
+ assert abs_path == sftp.readlink(sftp.FOLDER + '/link2.txt')
- self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12)
- self.assertEqual(sftp.stat(FOLDER + '/link.txt').st_size, 9)
+ assert sftp.lstat(sftp.FOLDER + '/link.txt').st_size == 12
+ assert sftp.stat(sftp.FOLDER + '/link.txt').st_size == 9
# the sftp server may be hiding extra path members from us, so the
# length may be longer than we expect:
- self.assertTrue(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path))
- self.assertEqual(sftp.stat(FOLDER + '/link2.txt').st_size, 9)
- self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9)
+ assert sftp.lstat(sftp.FOLDER + '/link2.txt').st_size >= len(abs_path)
+ assert sftp.stat(sftp.FOLDER + '/link2.txt').st_size == 9
+ assert sftp.stat(sftp.FOLDER + '/original.txt').st_size == 9
finally:
try:
- sftp.remove(FOLDER + '/link.txt')
+ sftp.remove(sftp.FOLDER + '/link.txt')
except:
pass
try:
- sftp.remove(FOLDER + '/link2.txt')
+ sftp.remove(sftp.FOLDER + '/link2.txt')
except:
pass
try:
- sftp.remove(FOLDER + '/original.txt')
+ sftp.remove(sftp.FOLDER + '/original.txt')
except:
pass
- def test_D_flush_seek(self):
+ def test_D_flush_seek(self, sftp):
"""
verify that buffered writes are automatically flushed on seek.
"""
try:
- with sftp.open(FOLDER + '/happy.txt', 'w', 1) as f:
+ with sftp.open(sftp.FOLDER + '/happy.txt', 'w', 1) as f:
f.write('full line.\n')
f.write('partial')
f.seek(9, f.SEEK_SET)
f.write('?\n')
- with sftp.open(FOLDER + '/happy.txt', 'r') as f:
- self.assertEqual(f.readline(), u('full line?\n'))
- self.assertEqual(f.read(7), b'partial')
+ with sftp.open(sftp.FOLDER + '/happy.txt', 'r') as f:
+ assert f.readline() == u('full line?\n')
+ assert f.read(7) == b'partial'
finally:
try:
- sftp.remove(FOLDER + '/happy.txt')
+ sftp.remove(sftp.FOLDER + '/happy.txt')
except:
pass
- def test_E_realpath(self):
+ def test_E_realpath(self, sftp):
"""
test that realpath is returning something non-empty and not an
error.
"""
pwd = sftp.normalize('.')
- self.assertTrue(len(pwd) > 0)
- f = sftp.normalize('./' + FOLDER)
- self.assertTrue(len(f) > 0)
- self.assertEqual(os.path.join(pwd, FOLDER), f)
+ assert len(pwd) > 0
+ f = sftp.normalize('./' + sftp.FOLDER)
+ assert len(f) > 0
+ assert os.path.join(pwd, sftp.FOLDER) == f
- def test_F_mkdir(self):
+ def test_F_mkdir(self, sftp):
"""
verify that mkdir/rmdir work.
"""
- try:
- sftp.mkdir(FOLDER + '/subfolder')
- except:
- self.assertTrue(False, 'exception creating subfolder')
- try:
- sftp.mkdir(FOLDER + '/subfolder')
- self.assertTrue(False, 'no exception overwriting subfolder')
- except IOError:
- pass
- try:
- sftp.rmdir(FOLDER + '/subfolder')
- except:
- self.assertTrue(False, 'exception removing subfolder')
- try:
- sftp.rmdir(FOLDER + '/subfolder')
- self.assertTrue(False, 'no exception removing nonexistent subfolder')
- except IOError:
- pass
+ sftp.mkdir(sftp.FOLDER + '/subfolder')
+ with pytest.raises(IOError): # generic msg only
+ sftp.mkdir(sftp.FOLDER + '/subfolder')
+ sftp.rmdir(sftp.FOLDER + '/subfolder')
+ with pytest.raises(IOError, match="No such file"):
+ sftp.rmdir(sftp.FOLDER + '/subfolder')
- def test_G_chdir(self):
+ def test_G_chdir(self, sftp):
"""
verify that chdir/getcwd work.
"""
@@ -598,35 +487,35 @@ class SFTPTest (unittest.TestCase):
if root[-1] != '/':
root += '/'
try:
- sftp.mkdir(FOLDER + '/alpha')
- sftp.chdir(FOLDER + '/alpha')
+ sftp.mkdir(sftp.FOLDER + '/alpha')
+ sftp.chdir(sftp.FOLDER + '/alpha')
sftp.mkdir('beta')
- self.assertEqual(root + FOLDER + '/alpha', sftp.getcwd())
- self.assertEqual(['beta'], sftp.listdir('.'))
+ assert root + sftp.FOLDER + '/alpha' == sftp.getcwd()
+ assert ['beta'] == sftp.listdir('.')
sftp.chdir('beta')
with sftp.open('fish', 'w') as f:
f.write('hello\n')
sftp.chdir('..')
- self.assertEqual(['fish'], sftp.listdir('beta'))
+ assert ['fish'] == sftp.listdir('beta')
sftp.chdir('..')
- self.assertEqual(['fish'], sftp.listdir('alpha/beta'))
+ assert ['fish'] == sftp.listdir('alpha/beta')
finally:
sftp.chdir(root)
try:
- sftp.unlink(FOLDER + '/alpha/beta/fish')
+ sftp.unlink(sftp.FOLDER + '/alpha/beta/fish')
except:
pass
try:
- sftp.rmdir(FOLDER + '/alpha/beta')
+ sftp.rmdir(sftp.FOLDER + '/alpha/beta')
except:
pass
try:
- sftp.rmdir(FOLDER + '/alpha')
+ sftp.rmdir(sftp.FOLDER + '/alpha')
except:
pass
- def test_H_get_put(self):
+ def test_H_get_put(self, sftp):
"""
verify that get/put work.
"""
@@ -641,103 +530,102 @@ class SFTPTest (unittest.TestCase):
def progress_callback(x, y):
saved_progress.append((x, y))
- sftp.put(localname, FOLDER + '/bunny.txt', progress_callback)
+ sftp.put(localname, sftp.FOLDER + '/bunny.txt', progress_callback)
- with sftp.open(FOLDER + '/bunny.txt', 'rb') as f:
- self.assertEqual(text, f.read(128))
- self.assertEqual([(41, 41)], saved_progress)
+ with sftp.open(sftp.FOLDER + '/bunny.txt', 'rb') as f:
+ assert text == f.read(128)
+ assert [(41, 41)] == saved_progress
os.unlink(localname)
fd, localname = mkstemp()
os.close(fd)
saved_progress = []
- sftp.get(FOLDER + '/bunny.txt', localname, progress_callback)
+ sftp.get(sftp.FOLDER + '/bunny.txt', localname, progress_callback)
with open(localname, 'rb') as f:
- self.assertEqual(text, f.read(128))
- self.assertEqual([(41, 41)], saved_progress)
+ assert text == f.read(128)
+ assert [(41, 41)] == saved_progress
os.unlink(localname)
- sftp.unlink(FOLDER + '/bunny.txt')
+ sftp.unlink(sftp.FOLDER + '/bunny.txt')
- def test_I_check(self):
+ def test_I_check(self, sftp):
"""
verify that file.check() works against our own server.
(it's an sftp extension that we support, and may be the only ones who
support it.)
"""
- with sftp.open(FOLDER + '/kitty.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/kitty.txt', 'w') as f:
f.write('here kitty kitty' * 64)
try:
- with sftp.open(FOLDER + '/kitty.txt', 'r') as f:
+ with sftp.open(sftp.FOLDER + '/kitty.txt', 'r') as f:
sum = f.check('sha1')
- self.assertEqual('91059CFC6615941378D413CB5ADAF4C5EB293402', u(hexlify(sum)).upper())
+ assert '91059CFC6615941378D413CB5ADAF4C5EB293402' == u(hexlify(sum)).upper()
sum = f.check('md5', 0, 512)
- self.assertEqual('93DE4788FCA28D471516963A1FE3856A', u(hexlify(sum)).upper())
+ assert '93DE4788FCA28D471516963A1FE3856A' == u(hexlify(sum)).upper()
sum = f.check('md5', 0, 0, 510)
- self.assertEqual('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6',
- u(hexlify(sum)).upper())
+ assert u(hexlify(sum)).upper() == 'EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6' # noqa
finally:
- sftp.unlink(FOLDER + '/kitty.txt')
+ sftp.unlink(sftp.FOLDER + '/kitty.txt')
- def test_J_x_flag(self):
+ def test_J_x_flag(self, sftp):
"""
verify that the 'x' flag works when opening a file.
"""
- sftp.open(FOLDER + '/unusual.txt', 'wx').close()
+ sftp.open(sftp.FOLDER + '/unusual.txt', 'wx').close()
try:
try:
- sftp.open(FOLDER + '/unusual.txt', 'wx')
+ sftp.open(sftp.FOLDER + '/unusual.txt', 'wx')
self.fail('expected exception')
except IOError:
pass
finally:
- sftp.unlink(FOLDER + '/unusual.txt')
+ sftp.unlink(sftp.FOLDER + '/unusual.txt')
- def test_K_utf8(self):
+ def test_K_utf8(self, sftp):
"""
verify that unicode strings are encoded into utf8 correctly.
"""
- with sftp.open(FOLDER + '/something', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/something', 'w') as f:
f.write('okay')
try:
- sftp.rename(FOLDER + '/something', FOLDER + '/' + unicode_folder)
- sftp.open(b(FOLDER) + utf8_folder, 'r')
+ sftp.rename(sftp.FOLDER + '/something', sftp.FOLDER + '/' + unicode_folder)
+ sftp.open(b(sftp.FOLDER) + utf8_folder, 'r')
except Exception as e:
self.fail('exception ' + str(e))
- sftp.unlink(b(FOLDER) + utf8_folder)
+ sftp.unlink(b(sftp.FOLDER) + utf8_folder)
- def test_L_utf8_chdir(self):
- sftp.mkdir(FOLDER + '/' + unicode_folder)
+ def test_L_utf8_chdir(self, sftp):
+ sftp.mkdir(sftp.FOLDER + '/' + unicode_folder)
try:
- sftp.chdir(FOLDER + '/' + unicode_folder)
+ sftp.chdir(sftp.FOLDER + '/' + unicode_folder)
with sftp.open('something', 'w') as f:
f.write('okay')
sftp.unlink('something')
finally:
sftp.chdir()
- sftp.rmdir(FOLDER + '/' + unicode_folder)
+ sftp.rmdir(sftp.FOLDER + '/' + unicode_folder)
- def test_M_bad_readv(self):
+ def test_M_bad_readv(self, sftp):
"""
verify that readv at the end of the file doesn't essplode.
"""
- sftp.open(FOLDER + '/zero', 'w').close()
+ sftp.open(sftp.FOLDER + '/zero', 'w').close()
try:
- with sftp.open(FOLDER + '/zero', 'r') as f:
+ with sftp.open(sftp.FOLDER + '/zero', 'r') as f:
f.readv([(0, 12)])
- with sftp.open(FOLDER + '/zero', 'r') as f:
+ with sftp.open(sftp.FOLDER + '/zero', 'r') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
f.read(100)
finally:
- sftp.unlink(FOLDER + '/zero')
+ sftp.unlink(sftp.FOLDER + '/zero')
- def test_N_put_without_confirm(self):
+ def test_N_put_without_confirm(self, sftp):
"""
verify that get/put work without confirmation.
"""
@@ -752,138 +640,132 @@ class SFTPTest (unittest.TestCase):
def progress_callback(x, y):
saved_progress.append((x, y))
- res = sftp.put(localname, FOLDER + '/bunny.txt', progress_callback, False)
+ res = sftp.put(localname, sftp.FOLDER + '/bunny.txt', progress_callback, False)
- self.assertEqual(SFTPAttributes().attr, res.attr)
+ assert SFTPAttributes().attr == res.attr
- with sftp.open(FOLDER + '/bunny.txt', 'r') as f:
- self.assertEqual(text, f.read(128))
- self.assertEqual((41, 41), saved_progress[-1])
+ with sftp.open(sftp.FOLDER + '/bunny.txt', 'r') as f:
+ assert text == f.read(128)
+ assert (41, 41) == saved_progress[-1]
os.unlink(localname)
- sftp.unlink(FOLDER + '/bunny.txt')
+ sftp.unlink(sftp.FOLDER + '/bunny.txt')
- def test_O_getcwd(self):
+ def test_O_getcwd(self, sftp):
"""
verify that chdir/getcwd work.
"""
- self.assertEqual(None, sftp.getcwd())
+ assert sftp.getcwd() == None
root = sftp.normalize('.')
if root[-1] != '/':
root += '/'
try:
- sftp.mkdir(FOLDER + '/alpha')
- sftp.chdir(FOLDER + '/alpha')
- self.assertEqual('/' + FOLDER + '/alpha', sftp.getcwd())
+ sftp.mkdir(sftp.FOLDER + '/alpha')
+ sftp.chdir(sftp.FOLDER + '/alpha')
+ assert sftp.getcwd() == '/' + sftp.FOLDER + '/alpha'
finally:
sftp.chdir(root)
try:
- sftp.rmdir(FOLDER + '/alpha')
+ sftp.rmdir(sftp.FOLDER + '/alpha')
except:
pass
- def XXX_test_M_seek_append(self):
+ def XXX_test_M_seek_append(self, sftp):
"""
verify that seek does't affect writes during append.
does not work except through paramiko. :( openssh fails.
"""
try:
- with sftp.open(FOLDER + '/append.txt', 'a') as f:
+ with sftp.open(sftp.FOLDER + '/append.txt', 'a') as f:
f.write('first line\nsecond line\n')
f.seek(11, f.SEEK_SET)
f.write('third line\n')
- with sftp.open(FOLDER + '/append.txt', 'r') as f:
- self.assertEqual(f.stat().st_size, 34)
- self.assertEqual(f.readline(), 'first line\n')
- self.assertEqual(f.readline(), 'second line\n')
- self.assertEqual(f.readline(), 'third line\n')
+ with sftp.open(sftp.FOLDER + '/append.txt', 'r') as f:
+ assert f.stat().st_size == 34
+ assert f.readline() == 'first line\n'
+ assert f.readline() == 'second line\n'
+ assert f.readline() == 'third line\n'
finally:
- sftp.remove(FOLDER + '/append.txt')
+ sftp.remove(sftp.FOLDER + '/append.txt')
- def test_putfo_empty_file(self):
+ def test_putfo_empty_file(self, sftp):
"""
Send an empty file and confirm it is sent.
"""
- target = FOLDER + '/empty file.txt'
+ target = sftp.FOLDER + '/empty file.txt'
stream = StringIO()
try:
attrs = sftp.putfo(stream, target)
# the returned attributes should not be null
- self.assertNotEqual(attrs, None)
+ assert attrs is not None
finally:
sftp.remove(target)
-
- def test_N_file_with_percent(self):
+ # TODO: this test doesn't actually fail if the regression (removing '%'
+ # expansion to '%%' within sftp.py's def _log()) is removed - stacktraces
+ # appear but they're clearly emitted from subthreads that have no error
+ # handling. No point running it until that is fixed somehow.
+ @pytest.mark.skip("Doesn't prove anything right now")
+ def test_N_file_with_percent(self, sftp):
"""
verify that we can create a file with a '%' in the filename.
( it needs to be properly escaped by _log() )
"""
- self.assertTrue( paramiko.util.get_logger("paramiko").handlers, "This unit test requires logging to be enabled" )
- f = sftp.open(FOLDER + '/test%file', 'w')
+ f = sftp.open(sftp.FOLDER + '/test%file', 'w')
try:
- self.assertEqual(f.stat().st_size, 0)
+ assert f.stat().st_size == 0
finally:
f.close()
- sftp.remove(FOLDER + '/test%file')
-
+ sftp.remove(sftp.FOLDER + '/test%file')
- def test_O_non_utf8_data(self):
+ def test_O_non_utf8_data(self, sftp):
"""Test write() and read() of non utf8 data"""
try:
- with sftp.open('%s/nonutf8data' % FOLDER, 'w') as f:
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'w') as f:
f.write(NON_UTF8_DATA)
- with sftp.open('%s/nonutf8data' % FOLDER, 'r') as f:
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'r') as f:
data = f.read()
- self.assertEqual(data, NON_UTF8_DATA)
- with sftp.open('%s/nonutf8data' % FOLDER, 'wb') as f:
+ assert data == NON_UTF8_DATA
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'wb') as f:
f.write(NON_UTF8_DATA)
- with sftp.open('%s/nonutf8data' % FOLDER, 'rb') as f:
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'rb') as f:
data = f.read()
- self.assertEqual(data, NON_UTF8_DATA)
+ assert data == NON_UTF8_DATA
finally:
- sftp.remove('%s/nonutf8data' % FOLDER)
+ sftp.remove('%s/nonutf8data' % sftp.FOLDER)
- def test_sftp_attributes_empty_str(self):
+ def test_sftp_attributes_empty_str(self, sftp):
sftp_attributes = SFTPAttributes()
- self.assertEqual(str(sftp_attributes), "?--------- 1 0 0 0 (unknown date) ?")
+ assert str(sftp_attributes) == "?--------- 1 0 0 0 (unknown date) ?"
- @skipUnlessBuiltin('buffer')
- def test_write_buffer(self):
+ @needs_builtin('buffer')
+ def test_write_buffer(self, sftp):
"""Test write() using a buffer instance."""
data = 3 * b'A potentially large block of data to chunk up.\n'
try:
- with sftp.open('%s/write_buffer' % FOLDER, 'wb') as f:
+ with sftp.open('%s/write_buffer' % sftp.FOLDER, 'wb') as f:
for offset in range(0, len(data), 8):
f.write(buffer(data, offset, 8))
- with sftp.open('%s/write_buffer' % FOLDER, 'rb') as f:
- self.assertEqual(f.read(), data)
+ with sftp.open('%s/write_buffer' % sftp.FOLDER, 'rb') as f:
+ assert f.read() == data
finally:
- sftp.remove('%s/write_buffer' % FOLDER)
+ sftp.remove('%s/write_buffer' % sftp.FOLDER)
- @skipUnlessBuiltin('memoryview')
- def test_write_memoryview(self):
+ @needs_builtin('memoryview')
+ def test_write_memoryview(self, sftp):
"""Test write() using a memoryview instance."""
data = 3 * b'A potentially large block of data to chunk up.\n'
try:
- with sftp.open('%s/write_memoryview' % FOLDER, 'wb') as f:
+ with sftp.open('%s/write_memoryview' % sftp.FOLDER, 'wb') as f:
view = memoryview(data)
for offset in range(0, len(data), 8):
f.write(view[offset:offset+8])
- with sftp.open('%s/write_memoryview' % FOLDER, 'rb') as f:
- self.assertEqual(f.read(), data)
+ with sftp.open('%s/write_memoryview' % sftp.FOLDER, 'rb') as f:
+ assert f.read() == data
finally:
- sftp.remove('%s/write_memoryview' % FOLDER)
-
-
-if __name__ == '__main__':
- SFTPTest.init_loopback()
- # logging is required by test_N_file_with_percent
- paramiko.util.log_to_file('test_sftp.log')
- from unittest import main
- main()
+ sftp.remove('%s/write_memoryview' % sftp.FOLDER)
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index cfad5682..a659098d 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -31,94 +31,75 @@ import time
import unittest
from paramiko.common import o660
-from tests.test_sftp import get_sftp
-FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000')
+from .util import slow
-class BigSFTPTest (unittest.TestCase):
-
- def setUp(self):
- global FOLDER
- sftp = get_sftp()
- for i in range(1000):
- FOLDER = FOLDER[:-3] + '%03d' % i
- try:
- sftp.mkdir(FOLDER)
- break
- except (IOError, OSError):
- pass
-
- def tearDown(self):
- sftp = get_sftp()
- sftp.rmdir(FOLDER)
-
- def test_1_lots_of_files(self):
+@slow
+class TestBigSFTP(object):
+ def test_1_lots_of_files(self, sftp):
"""
create a bunch of files over the same session.
"""
- sftp = get_sftp()
numfiles = 100
try:
for i in range(numfiles):
- with sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1) as f:
+ with sftp.open('%s/file%d.txt' % (sftp.FOLDER, i), 'w', 1) as f:
f.write('this is file #%d.\n' % i)
- sftp.chmod('%s/file%d.txt' % (FOLDER, i), o660)
+ sftp.chmod('%s/file%d.txt' % (sftp.FOLDER, i), o660)
# now make sure every file is there, by creating a list of filenmes
# and reading them in random order.
numlist = list(range(numfiles))
while len(numlist) > 0:
r = numlist[random.randint(0, len(numlist) - 1)]
- with sftp.open('%s/file%d.txt' % (FOLDER, r)) as f:
- self.assertEqual(f.readline(), 'this is file #%d.\n' % r)
+ with sftp.open('%s/file%d.txt' % (sftp.FOLDER, r)) as f:
+ assert f.readline() == 'this is file #%d.\n' % r
numlist.remove(r)
finally:
for i in range(numfiles):
try:
- sftp.remove('%s/file%d.txt' % (FOLDER, i))
+ sftp.remove('%s/file%d.txt' % (sftp.FOLDER, i))
except:
pass
- def test_2_big_file(self):
+ def test_2_big_file(self, sftp):
"""
write a 1MB file with no buffering.
"""
- sftp = get_sftp()
kblob = (1024 * b'x')
start = time.time()
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w') as f:
for n in range(1024):
f.write(kblob)
if n % 128 == 0:
sys.stderr.write('.')
sys.stderr.write(' ')
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
start = time.time()
- with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f:
for n in range(1024):
data = f.read(1024)
- self.assertEqual(data, kblob)
+ assert data == kblob
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_3_big_file_pipelined(self):
+ def test_3_big_file_pipelined(self, sftp):
"""
write a 1MB file, with no linefeeds, using pipelining.
"""
- sftp = get_sftp()
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
start = time.time()
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -126,12 +107,12 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write('.')
sys.stderr.write(' ')
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
start = time.time()
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
@@ -145,19 +126,18 @@ class BigSFTPTest (unittest.TestCase):
chunk = size - n
data = f.read(chunk)
offset = n % 1024
- self.assertEqual(data, k2blob[offset:offset + chunk])
+ assert data == k2blob[offset:offset + chunk]
n += chunk
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_4_prefetch_seek(self):
- sftp = get_sftp()
+ def test_4_prefetch_seek(self, sftp):
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -165,13 +145,13 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write('.')
sys.stderr.write(' ')
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
start = time.time()
k2blob = kblob + kblob
chunk = 793
for i in range(10):
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
@@ -183,18 +163,17 @@ class BigSFTPTest (unittest.TestCase):
f.seek(offset)
data = f.read(chunk)
n_offset = offset % 1024
- self.assertEqual(data, k2blob[n_offset:n_offset + chunk])
+ assert data == k2blob[n_offset:n_offset + chunk]
offset += chunk
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_5_readv_seek(self):
- sftp = get_sftp()
+ def test_5_readv_seek(self, sftp):
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -202,13 +181,13 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write('.')
sys.stderr.write(' ')
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
start = time.time()
k2blob = kblob + kblob
chunk = 793
for i in range(10):
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
# make a bunch of offsets and put them in random order
offsets = [base_offset + j * chunk for j in range(100)]
@@ -221,21 +200,20 @@ class BigSFTPTest (unittest.TestCase):
for i in range(len(readv_list)):
offset = readv_list[i][0]
n_offset = offset % 1024
- self.assertEqual(next(ret), k2blob[n_offset:n_offset + chunk])
+ assert next(ret) == k2blob[n_offset:n_offset + chunk]
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_6_lots_of_prefetching(self):
+ def test_6_lots_of_prefetching(self, sftp):
"""
prefetch a 1MB file a bunch of times, discarding the file object
without using it, to verify that paramiko doesn't get confused.
"""
- sftp = get_sftp()
kblob = (1024 * b'x')
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -243,32 +221,31 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write('.')
sys.stderr.write(' ')
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
for i in range(10):
- with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
- with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
for n in range(1024):
data = f.read(1024)
- self.assertEqual(data, kblob)
+ assert data == kblob
if n % 128 == 0:
sys.stderr.write('.')
sys.stderr.write(' ')
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_7_prefetch_readv(self):
+ def test_7_prefetch_readv(self, sftp):
"""
verify that prefetch and readv don't conflict with each other.
"""
- sftp = get_sftp()
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -276,13 +253,13 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write('.')
sys.stderr.write(' ')
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
data = f.read(1024)
- self.assertEqual(data, kblob)
+ assert data == kblob
chunk_size = 793
base_offset = 512 * 1024
@@ -290,23 +267,22 @@ class BigSFTPTest (unittest.TestCase):
chunks = [(base_offset + (chunk_size * i), chunk_size) for i in range(20)]
for data in f.readv(chunks):
offset = base_offset % 1024
- self.assertEqual(chunk_size, len(data))
- self.assertEqual(k2blob[offset:offset + chunk_size], data)
+ assert chunk_size == len(data)
+ assert k2blob[offset:offset + chunk_size] == data
base_offset += chunk_size
sys.stderr.write(' ')
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_8_large_readv(self):
+ def test_8_large_readv(self, sftp):
"""
verify that a very large readv is broken up correctly and still
returned as a single blob.
"""
- sftp = get_sftp()
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -314,62 +290,53 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write('.')
sys.stderr.write(' ')
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
data = list(f.readv([(23 * 1024, 128 * 1024)]))
- self.assertEqual(1, len(data))
+ assert len(data) == 1
data = data[0]
- self.assertEqual(128 * 1024, len(data))
+ assert len(data) == 128 * 1024
sys.stderr.write(' ')
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_9_big_file_big_buffer(self):
+ def test_9_big_file_big_buffer(self, sftp):
"""
write a 1MB file, with no linefeeds, and a big buffer.
"""
- sftp = get_sftp()
mblob = (1024 * 1024 * 'x')
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w', 128 * 1024) as f:
f.write(mblob)
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_A_big_file_renegotiate(self):
+ def test_A_big_file_renegotiate(self, sftp):
"""
write a 1MB file, forcing key renegotiation in the middle.
"""
- sftp = get_sftp()
t = sftp.sock.get_transport()
t.packetizer.REKEY_BYTES = 512 * 1024
k32blob = (32 * 1024 * 'x')
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w', 128 * 1024) as f:
for i in range(32):
f.write(k32blob)
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
- self.assertNotEqual(t.H, t.session_id)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
+ assert t.H != t.session_id
# try to read it too.
- with sftp.open('%s/hongry.txt' % FOLDER, 'r', 128 * 1024) as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r', 128 * 1024) as f:
file_size = f.stat().st_size
f.prefetch(file_size)
total = 0
while total < 1024 * 1024:
total += len(f.read(32 * 1024))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
t.packetizer.REKEY_BYTES = pow(2, 30)
-
-
-if __name__ == '__main__':
- from tests.test_sftp import SFTPTest
- SFTPTest.init_loopback()
- from unittest import main
- main()
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index d8d05d2b..f0645e0e 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -29,17 +29,20 @@ import unittest
import paramiko
-from tests.util import test_path
-from tests.test_client import FINGERPRINTS
+from .util import _support, needs_gssapi
+from .test_client import FINGERPRINTS
-class NullServer (paramiko.ServerInterface):
+class NullServer (paramiko.ServerInterface):
def get_allowed_auths(self, username):
return 'gssapi-with-mic,publickey'
- def check_auth_gssapi_with_mic(self, username,
- gss_authenticated=paramiko.AUTH_FAILED,
- cc_file=None):
+ def check_auth_gssapi_with_mic(
+ self,
+ username,
+ gss_authenticated=paramiko.AUTH_FAILED,
+ cc_file=None,
+ ):
if gss_authenticated == paramiko.AUTH_SUCCESSFUL:
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
@@ -66,18 +69,15 @@ class NullServer (paramiko.ServerInterface):
return True
+@needs_gssapi
class GSSAuthTest(unittest.TestCase):
- @staticmethod
- def init(username, hostname):
- global krb5_principal, targ_name
- krb5_principal = username
- targ_name = hostname
-
def setUp(self):
- self.username = krb5_principal
- self.hostname = socket.getfqdn(targ_name)
+ # TODO: username and targ_name should come from os.environ or whatever
+ # the approved pytest method is for runtime-configuring test data.
+ self.username = "krb5_principal"
+ self.hostname = socket.getfqdn("targ_name")
self.sockl = socket.socket()
- self.sockl.bind((targ_name, 0))
+ self.sockl.bind(("targ_name", 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
@@ -148,6 +148,6 @@ class GSSAuthTest(unittest.TestCase):
Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
"""
self.hostname = "this_host_does_not_exists_and_causes_a_GSSAPI-exception"
- self._test_connection(key_filename=[test_path('test_rsa.key')],
+ self._test_connection(key_filename=[_support('test_rsa.key')],
allow_agent=False,
look_for_keys=False)
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 99cbc3e0..9474acfc 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -43,9 +43,9 @@ from paramiko.common import (
)
from paramiko.py3compat import bytes
from paramiko.message import Message
-from tests import skipUnlessBuiltin
-from tests.loop import LoopSocket
-from tests.util import test_path
+
+from .util import needs_builtin, _support, slow
+from .loop import LoopSocket
LONG_BANNER = """\
@@ -64,7 +64,7 @@ Maybe.
class NullServer (ServerInterface):
paranoid_did_password = False
paranoid_did_public_key = False
- paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key'))
+ paranoid_key = DSSKey.from_private_key_file(_support('test_dss.key'))
def get_allowed_auths(self, username):
if username == 'slowdive':
@@ -136,7 +136,7 @@ class TransportTest(unittest.TestCase):
def setup_test_server(
self, client_options=None, server_options=None, connect_kwargs=None,
):
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
@@ -200,7 +200,7 @@ class TransportTest(unittest.TestCase):
loopback sockets. this is hardly "simple" but it's simpler than the
later tests. :)
"""
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -225,7 +225,7 @@ class TransportTest(unittest.TestCase):
"""
verify that a long banner doesn't mess up the handshake.
"""
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -257,6 +257,7 @@ class TransportTest(unittest.TestCase):
self.tc.renegotiate_keys()
self.ts.send_ignore(1024)
+ @slow
def test_5_keepalive(self):
"""
verify that the keepalive will be sent.
@@ -820,6 +821,7 @@ class TransportTest(unittest.TestCase):
(2**32, MAX_WINDOW_SIZE)]:
self.assertEqual(self.tc._sanitize_window_size(val), correct)
+ @slow
def test_L_handshake_timeout(self):
"""
verify that we can get a hanshake timeout.
@@ -840,7 +842,7 @@ class TransportTest(unittest.TestCase):
# be fine. Even tho it's a bit squicky.
self.tc.packetizer = SlowPacketizer(self.tc.sock)
# Continue with regular test red tape.
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -892,7 +894,7 @@ class TransportTest(unittest.TestCase):
expected = text.encode("utf-8")
self.assertEqual(sfile.read(len(expected)), expected)
- @skipUnlessBuiltin('buffer')
+ @needs_builtin('buffer')
def test_channel_send_buffer(self):
"""
verify sending buffer instances to a channel
@@ -915,7 +917,7 @@ class TransportTest(unittest.TestCase):
chan.sendall(buffer(data))
self.assertEqual(sfile.read(len(data)), data)
- @skipUnlessBuiltin('memoryview')
+ @needs_builtin('memoryview')
def test_channel_send_memoryview(self):
"""
verify sending memoryview instances to a channel
diff --git a/tests/test_util.py b/tests/test_util.py
index 7880e156..90473f43 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -30,6 +30,7 @@ import paramiko.util
from paramiko.util import lookup_ssh_host_config as host_config, safe_string
from paramiko.py3compat import StringIO, byte_ord, b
+
# Note some lines in this configuration have trailing spaces on purpose
test_config_file = """\
Host *
@@ -366,7 +367,7 @@ IdentityFile something_%l_using_fqdn
def test_get_hostnames(self):
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
- self.assertEqual(config.get_hostnames(), set(['*', '*.example.com', 'spoo.example.com']))
+ self.assertEqual(config.get_hostnames(), {'*', '*.example.com', 'spoo.example.com'})
def test_quoted_host_names(self):
test_config_file = """\
@@ -469,12 +470,12 @@ Host param3 parara
self.assertRaises(Exception, conf._get_hosts, host)
def test_safe_string(self):
- vanilla = b("vanilla")
- has_bytes = b("has \7\3 bytes")
+ vanilla = b"vanilla"
+ has_bytes = b"has \7\3 bytes"
safe_vanilla = safe_string(vanilla)
safe_has_bytes = safe_string(has_bytes)
- expected_bytes = b("has %07%03 bytes")
- err = "{0!r} != {1!r}"
+ expected_bytes = b"has %07%03 bytes"
+ err = "{!r} != {!r}"
msg = err.format(safe_vanilla, vanilla)
assert safe_vanilla == vanilla, msg
msg = err.format(safe_has_bytes, expected_bytes)
diff --git a/tests/util.py b/tests/util.py
index c1b43da8..4ca02374 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -1,6 +1,27 @@
-import os
+from os.path import dirname, realpath, join
-root_path = os.path.dirname(os.path.realpath(__file__))
+import pytest
-def test_path(filename):
- return os.path.join(root_path, filename)
+from paramiko.py3compat import builtins
+
+
+def _support(filename):
+ return join(dirname(realpath(__file__)), filename)
+
+
+# TODO: consider using pytest.importorskip('gssapi') instead? We presumably
+# still need CLI configurability for the Kerberos parameters, though, so can't
+# JUST key off presence of GSSAPI optional dependency...
+# TODO: anyway, s/True/os.environ.get('RUN_GSSAPI', False)/ or something.
+needs_gssapi = pytest.mark.skipif(True, reason="No GSSAPI to test")
+
+
+def needs_builtin(name):
+ """
+ Skip decorated test if builtin name does not exist.
+ """
+ reason = "Test requires a builtin '{}'".format(name)
+ return pytest.mark.skipif(not hasattr(builtins, name), reason=reason)
+
+
+slow = pytest.mark.slow