diff options
61 files changed, 1008 insertions, 1154 deletions
@@ -3,9 +3,9 @@ build/ dist/ .tox/ paramiko.egg-info/ -test.log docs/ demos/*.log !sites/docs _build .coverage +.cache diff --git a/.travis.yml b/.travis.yml index 0e46ec84..b37239ea 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,9 +4,7 @@ cache: directories: - $HOME/.cache/pip python: - - "2.6" - "2.7" - - "3.3" - "3.4" - "3.5" - "3.6" @@ -15,11 +13,12 @@ install: # Self-install for setup.py-driven deps - pip install -e . # Dev (doc/test running) requirements + # TODO: use pipenv + whatever contexty-type stuff it has - pip install codecov # For codecov specifically - pip install -r dev-requirements.txt script: - # Main tests, w/ coverage! - - inv test --coverage + # All (including slow) tests, w/ coverage! + - inv coverage # Ensure documentation builds, both sites, maxxed nitpicking - inv sites # flake8 is now possible! diff --git a/MANIFEST.in b/MANIFEST.in index e718ea24..1eec2054 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ -include LICENSE test.py setup_helper.py +include LICENSE setup_helper.py recursive-include docs * recursive-include tests *.py *.key recursive-include demos *.py *.key user_rsa_key user_rsa_key.pub @@ -22,7 +22,7 @@ What ---- "Paramiko" is a combination of the Esperanto words for "paranoid" and -"friend". It's a module for Python 2.6+/3.3+ that implements the SSH2 protocol +"friend". It's a module for Python 2.7/3.4+ that implements the SSH2 protocol for secure (encrypted and authenticated) connections to remote machines. Unlike SSL (aka TLS), SSH2 protocol does not require hierarchical certificates signed by a powerful central authority. You may know SSH2 as the protocol that @@ -132,6 +132,7 @@ doc/ folder. There are also unit tests here:: - $ python ./test.py + $ pip install -r dev-requirements.txt + $ pytest Which will verify that most of the core components are working correctly. diff --git a/dev-requirements.txt b/dev-requirements.txt index 41fea798..9e09c03e 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,6 +1,3 @@ -# Older junk -tox>=1.4,<1.5 -# For newer tasks like building Sphinx docs. invoke>=0.13,<2.0 invocations>=0.13,<2.0 sphinx>=1.1.3,<1.5 @@ -10,3 +7,4 @@ semantic_version<3.0 wheel==0.24 twine==1.9.1 flake8==2.6.2 +pytest==3.2.1 diff --git a/paramiko/__init__.py b/paramiko/__init__.py index bdc91d51..c4c69a45 100644 --- a/paramiko/__init__.py +++ b/paramiko/__init__.py @@ -19,15 +19,6 @@ # flake8: noqa import sys from paramiko._version import __version__, __version_info__ - -if sys.version_info < (2, 6): - raise RuntimeError('You need Python 2.6+ for this module.') - - -__author__ = "Jeff Forcier <jeff@bitprophet.org>" -__license__ = "GNU Lesser General Public License (LGPL)" - - from paramiko.transport import SecurityOptions, Transport from paramiko.client import ( SSHClient, MissingHostKeyPolicy, AutoAddPolicy, RejectPolicy, @@ -76,6 +67,10 @@ from paramiko.sftp import ( from paramiko.common import io_sleep + +__author__ = "Jeff Forcier <jeff@bitprophet.org>" +__license__ = "GNU Lesser General Public License (LGPL)" + __all__ = [ 'Transport', 'SSHClient', diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py index d2dab3d8..a1ce5e3b 100644 --- a/paramiko/auth_handler.py +++ b/paramiko/auth_handler.py @@ -71,6 +71,9 @@ class AuthHandler (object): self.gss_host = None self.gss_deleg_creds = True + def _log(self, *args): + return self.transport._log(*args) + def is_authenticated(self): return self.authenticated @@ -245,7 +248,7 @@ class AuthHandler (object): def _parse_service_accept(self, m): service = m.get_text() if service == 'ssh-userauth': - self.transport._log(DEBUG, 'userauth is OK') + self._log(DEBUG, 'userauth is OK') m = Message() m.add_byte(cMSG_USERAUTH_REQUEST) m.add_string(self.username) @@ -320,7 +323,8 @@ class AuthHandler (object): self.transport.send_message(m) else: raise SSHException( - "Received Package: %s" % MSG_NAMES[ptype]) + "Received Package: {}".format(MSG_NAMES[ptype]) + ) m = Message() m.add_byte(cMSG_USERAUTH_GSSAPI_MIC) # send the MIC to the server @@ -336,17 +340,17 @@ class AuthHandler (object): min_status = m.get_int() err_msg = m.get_string() m.get_string() # Lang tag - discarded - raise SSHException("GSS-API Error:\nMajor Status: %s\n\ - Minor Status: %s\ \nError Message:\ - %s\n") % (str(maj_status), - str(min_status), - err_msg) + raise SSHException("""GSS-API Error: +Major Status: {} +Minor Status: {} +Error Message: {} +""".format(maj_status, min_status, err_msg)) elif ptype == MSG_USERAUTH_FAILURE: self._parse_userauth_failure(m) return else: raise SSHException( - "Received Package: %s" % MSG_NAMES[ptype]) + "Received Package: {}".format(MSG_NAMES[ptype])) elif ( self.auth_method == 'gssapi-keyex' and self.transport.gss_kex_used @@ -359,22 +363,22 @@ class AuthHandler (object): pass else: raise SSHException( - 'Unknown auth method "%s"' % self.auth_method) + 'Unknown auth method "{}"'.format(self.auth_method)) self.transport._send_message(m) else: - self.transport._log( + self._log( DEBUG, - 'Service request "%s" accepted (?)' % service) + 'Service request "{}" accepted (?)'.format(service)) def _send_auth_result(self, username, method, result): # okay, send result m = Message() if result == AUTH_SUCCESSFUL: - self.transport._log(INFO, 'Auth granted (%s).' % method) + self._log(INFO, 'Auth granted ({}).'.format(method)) m.add_byte(cMSG_USERAUTH_SUCCESS) self.authenticated = True else: - self.transport._log(INFO, 'Auth rejected (%s).' % method) + self._log(INFO, 'Auth rejected ({}).'.format(method)) m.add_byte(cMSG_USERAUTH_FAILURE) m.add_string( self.transport.server_object.get_allowed_auths(username)) @@ -417,16 +421,18 @@ class AuthHandler (object): username = m.get_text() service = m.get_text() method = m.get_text() - self.transport._log( + self._log( DEBUG, - 'Auth request (type=%s) service=%s, username=%s' % ( - method, service, username)) + 'Auth request (type={}) service={}, username={}'.format( + method, service, username + ) + ) if service != 'ssh-connection': self._disconnect_service_not_available() return if ((self.auth_username is not None) and (self.auth_username != username)): - self.transport._log( + self._log( WARNING, 'Auth rejected because the client attempted to change username in mid-flight' # noqa ) @@ -451,7 +457,7 @@ class AuthHandler (object): # always treated as failure, since we don't support changing # passwords, but collect the list of valid auth types from # the callback anyway - self.transport._log( + self._log( DEBUG, 'Auth request to change passwords (rejected)') newpassword = m.get_binary() @@ -470,13 +476,13 @@ class AuthHandler (object): try: key = self.transport._key_info[keytype](Message(keyblob)) except SSHException as e: - self.transport._log( + self._log( INFO, - 'Auth rejected: public key: %s' % str(e)) + 'Auth rejected: public key: {}'.format(str(e))) key = None except Exception as e: - msg = 'Auth rejected: unsupported or mangled public key ({0}: {1})' # noqa - self.transport._log(INFO, msg.format(e.__class__.__name__, e)) + msg = 'Auth rejected: unsupported or mangled public key ({}: {})' # noqa + self._log(INFO, msg.format(e.__class__.__name__, e)) key = None if key is None: self._disconnect_no_more_auth() @@ -498,7 +504,7 @@ class AuthHandler (object): sig = Message(m.get_binary()) blob = self._get_session_blob(key, service, username) if not key.verify_ssh_sig(blob, sig): - self.transport._log( + self._log( INFO, 'Auth rejected: invalid signature') result = AUTH_FAILED @@ -519,7 +525,7 @@ class AuthHandler (object): # We can't accept more than one OID, so if the SSH client sends # more than one, disconnect. if mechs > 1: - self.transport._log( + self._log( INFO, 'Disconnect: Received more than one GSS-API OID mechanism') self._disconnect_no_more_auth() @@ -527,7 +533,7 @@ class AuthHandler (object): mech_ok = sshgss.ssh_check_mech(desired_mech) # if we don't support the mechanism, disconnect. if not mech_ok: - self.transport._log( + self._log( INFO, 'Disconnect: Received an invalid GSS-API OID mechanism') self._disconnect_no_more_auth() @@ -569,9 +575,9 @@ class AuthHandler (object): self._send_auth_result(username, method, result) def _parse_userauth_success(self, m): - self.transport._log( + self._log( INFO, - 'Authentication (%s) successful!' % self.auth_method) + 'Authentication ({}) successful!'.format(self.auth_method)) self.authenticated = True self.transport._auth_trigger() if self.auth_event is not None: @@ -581,22 +587,25 @@ class AuthHandler (object): authlist = m.get_list() partial = m.get_boolean() if partial: - self.transport._log(INFO, 'Authentication continues...') - self.transport._log(DEBUG, 'Methods: ' + str(authlist)) + self._log(INFO, 'Authentication continues...') + self._log(DEBUG, 'Methods: ' + str(authlist)) self.transport.saved_exception = PartialAuthentication(authlist) elif self.auth_method not in authlist: - self.transport._log( - DEBUG, - 'Authentication type (%s) not permitted.' % self.auth_method) - self.transport._log( - DEBUG, - 'Allowed methods: ' + str(authlist)) + for msg in ( + 'Authentication type ({}) not permitted.'.format( + self.auth_method + ), + 'Allowed methods: {}'.format(authlist), + ): + self._log(DEBUG, msg) self.transport.saved_exception = BadAuthenticationType( - 'Bad authentication type', authlist) + 'Bad authentication type', authlist + ) else: - self.transport._log( + self._log( INFO, - 'Authentication (%s) failed.' % self.auth_method) + 'Authentication ({}) failed.'.format(self.auth_method) + ) self.authenticated = False self.username = None if self.auth_event is not None: @@ -605,7 +614,7 @@ class AuthHandler (object): def _parse_userauth_banner(self, m): banner = m.get_string() self.banner = banner - self.transport._log(INFO, 'Auth banner: %s' % banner) + self._log(INFO, 'Auth banner: {}'.format(banner)) # who cares. def _parse_userauth_info_request(self, m): @@ -646,9 +655,9 @@ class AuthHandler (object): def _handle_local_gss_failure(self, e): self.transport.saved_exception = e - self.transport._log(DEBUG, "GSSAPI failure: %s" % str(e)) - self.transport._log(INFO, 'Authentication (%s) failed.' % - self.auth_method) + self._log(DEBUG, "GSSAPI failure: {}".format(e)) + self._log(INFO, 'Authentication ({}) failed.'.format( + self.auth_method)) self.authenticated = False self.username = None if self.auth_event is not None: diff --git a/paramiko/ber.py b/paramiko/ber.py index 7725f944..876347e0 100644 --- a/paramiko/ber.py +++ b/paramiko/ber.py @@ -88,8 +88,8 @@ class BER(object): return util.inflate_long(data) else: # 1: boolean (00 false, otherwise true) - raise BERException( - 'Unknown ber encoding type %d (robey is lazy)' % ident) + msg = 'Unknown ber encoding type {:d} (robey is lazy)' + raise BERException(msg.format(ident)) @staticmethod def decode_sequence(data): @@ -125,7 +125,9 @@ class BER(object): elif (type(x) is list) or (type(x) is tuple): self.encode_tlv(0x30, self.encode_sequence(x)) else: - raise BERException('Unknown type for encoding: %s' % repr(type(x))) + raise BERException( + 'Unknown type for encoding: {!r}'.format(type(x)) + ) @staticmethod def encode_sequence(data): diff --git a/paramiko/channel.py b/paramiko/channel.py index c6016a0e..91a8f0df 100644 --- a/paramiko/channel.py +++ b/paramiko/channel.py @@ -135,7 +135,7 @@ class Channel (ClosingContextManager): """ Return a string representation of this object, for debugging. """ - out = '<paramiko.Channel %d' % self.chanid + out = '<paramiko.Channel {}'.format(self.chanid) if self.closed: out += ' (closed)' elif self.active: @@ -143,9 +143,9 @@ class Channel (ClosingContextManager): out += ' (EOF received)' if self.eof_sent: out += ' (EOF sent)' - out += ' (open) window=%d' % self.out_window_size + out += ' (open) window={}'.format(self.out_window_size) if len(self.in_buffer) > 0: - out += ' in-buffer=%d' % (len(self.in_buffer),) + out += ' in-buffer={}'.format(len(self.in_buffer)) out += ' -> ' + repr(self.transport) out += '>' return out @@ -315,7 +315,7 @@ class Channel (ClosingContextManager): try: self.set_environment_variable(name, value) except SSHException as e: - err = "Failed to set environment variable \"{0}\"." + err = "Failed to set environment variable \"{}\"." raise SSHException(err.format(name), e) @open_only @@ -976,7 +976,7 @@ class Channel (ClosingContextManager): # a window update self.in_window_threshold = window_size // 10 self.in_window_sofar = 0 - self._log(DEBUG, 'Max packet in: %d bytes' % max_packet_size) + self._log(DEBUG, 'Max packet in: {} bytes'.format(max_packet_size)) def _set_remote_channel(self, chanid, window_size, max_packet_size): self.remote_chanid = chanid @@ -985,10 +985,11 @@ class Channel (ClosingContextManager): max_packet_size ) self.active = 1 - self._log(DEBUG, 'Max packet out: %d bytes' % self.out_max_packet_size) + self._log(DEBUG, + 'Max packet out: {} bytes'.format(self.out_max_packet_size)) def _request_success(self, m): - self._log(DEBUG, 'Sesch channel %d request ok' % self.chanid) + self._log(DEBUG, 'Sesch channel {} request ok'.format(self.chanid)) self.event_ready = True self.event.set() return @@ -1017,7 +1018,7 @@ class Channel (ClosingContextManager): if code != 1: self._log( ERROR, - 'unknown extended_data type %d; discarding' % code + 'unknown extended_data type {}; discarding'.format(code) ) return if self.combine_stderr: @@ -1030,7 +1031,7 @@ class Channel (ClosingContextManager): self.lock.acquire() try: if self.ultra_debug: - self._log(DEBUG, 'window up %d' % nbytes) + self._log(DEBUG, 'window up {}'.format(nbytes)) self.out_window_size += nbytes self.out_buffer_cv.notifyAll() finally: @@ -1122,7 +1123,7 @@ class Channel (ClosingContextManager): else: ok = server.check_channel_forward_agent_request(self) else: - self._log(DEBUG, 'Unhandled channel request "%s"' % key) + self._log(DEBUG, 'Unhandled channel request "{}"'.format(key)) ok = False if want_reply: m = Message() @@ -1144,7 +1145,7 @@ class Channel (ClosingContextManager): self._pipe.set_forever() finally: self.lock.release() - self._log(DEBUG, 'EOF received (%s)', self._name) + self._log(DEBUG, 'EOF received ({})'.format(self._name)) def _handle_close(self, m): self.lock.acquire() @@ -1216,7 +1217,7 @@ class Channel (ClosingContextManager): m.add_byte(cMSG_CHANNEL_EOF) m.add_int(self.remote_chanid) self.eof_sent = True - self._log(DEBUG, 'EOF sent (%s)', self._name) + self._log(DEBUG, 'EOF sent ({})'.format(self._name)) return m def _close_internal(self): @@ -1250,12 +1251,13 @@ class Channel (ClosingContextManager): if self.closed or self.eof_received or not self.active: return 0 if self.ultra_debug: - self._log(DEBUG, 'addwindow %d' % n) + self._log(DEBUG, 'addwindow {}'.format(n)) self.in_window_sofar += n if self.in_window_sofar <= self.in_window_threshold: return 0 if self.ultra_debug: - self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar) + self._log(DEBUG, + 'addwindow send {}'.format(self.in_window_sofar)) out = self.in_window_sofar self.in_window_sofar = 0 return out @@ -1298,7 +1300,7 @@ class Channel (ClosingContextManager): size = self.out_max_packet_size - 64 self.out_window_size -= size if self.ultra_debug: - self._log(DEBUG, 'window down to %d' % self.out_window_size) + self._log(DEBUG, 'window down to {}'.format(self.out_window_size)) return size diff --git a/paramiko/client.py b/paramiko/client.py index 75d295ea..5f0e9274 100644 --- a/paramiko/client.py +++ b/paramiko/client.py @@ -143,8 +143,9 @@ class SSHClient (ClosingContextManager): with open(filename, 'w') as f: for hostname, keys in self._host_keys.items(): for keytype, key in keys.items(): - f.write('%s %s %s\n' % ( - hostname, keytype, key.get_base64())) + f.write('{} {} {}\n'.format( + hostname, keytype, key.get_base64() + )) def get_host_keys(self): """ @@ -370,7 +371,7 @@ class SSHClient (ClosingContextManager): if port == SSH_PORT: server_hostkey_name = hostname else: - server_hostkey_name = "[%s]:%d" % (hostname, port) + server_hostkey_name = "[{}]:{}".format(hostname, port) our_server_keys = None our_server_keys = self._system_host_keys.get(server_hostkey_name) @@ -544,14 +545,14 @@ class SSHClient (ClosingContextManager): # TODO: change this to 'Loading' instead of 'Trying' sometime; probably # when #387 is released, since this is a critical log message users are # likely testing/filtering for (bah.) - msg = "Trying discovered key {0} in {1}".format( + msg = "Trying discovered key {} in {}".format( hexlify(key.get_fingerprint()), key_path, ) self._log(DEBUG, msg) # Attempt to load cert if it exists. if os.path.isfile(cert_path): key.load_certificate(cert_path) - self._log(DEBUG, "Adding public certificate {0}".format(cert_path)) + self._log(DEBUG, "Adding public certificate {}".format(cert_path)) return key def _auth(self, username, password, pkey, key_filenames, allow_agent, @@ -571,7 +572,7 @@ class SSHClient (ClosingContextManager): saved_exception = None two_factor = False allowed_types = set() - two_factor_types = set(['keyboard-interactive', 'password']) + two_factor_types = {'keyboard-interactive', 'password'} # If GSS-API support and GSS-PI Key Exchange was performed, we attempt # authentication with gssapi-keyex. @@ -598,7 +599,8 @@ class SSHClient (ClosingContextManager): try: self._log( DEBUG, - 'Trying SSH key %s' % hexlify(pkey.get_fingerprint())) + 'Trying SSH key {}'.format(hexlify(pkey.get_fingerprint())) + ) allowed_types = set( self._transport.auth_publickey(username, pkey)) two_factor = (allowed_types & two_factor_types) @@ -629,10 +631,8 @@ class SSHClient (ClosingContextManager): for key in self._agent.get_keys(): try: - self._log( - DEBUG, - 'Trying SSH agent key %s' % hexlify( - key.get_fingerprint())) + id_ = hexlify(key.get_fingerprint()) + self._log(DEBUG, 'Trying SSH agent key {}'.format(id_)) # for 2-factor auth a successfully auth'd key password # will return an allowed 2fac auth method allowed_types = set( @@ -656,7 +656,7 @@ class SSHClient (ClosingContextManager): # ~/ssh/ is for windows for directory in [".ssh", "ssh"]: full_path = os.path.expanduser( - "~/%s/id_%s" % (directory, name) + "~/{}/id_{}".format(directory, name) ) if os.path.isfile(full_path): # TODO: only do this append if below did not run @@ -736,8 +736,9 @@ class AutoAddPolicy (MissingHostKeyPolicy): client._host_keys.add(hostname, key.get_name(), key) if client._host_keys_filename is not None: client.save_host_keys(client._host_keys_filename) - client._log(DEBUG, 'Adding %s host key for %s: %s' % - (key.get_name(), hostname, hexlify(key.get_fingerprint()))) + client._log(DEBUG, 'Adding {} host key for {}: {}'.format( + key.get_name(), hostname, hexlify(key.get_fingerprint()), + )) class RejectPolicy (MissingHostKeyPolicy): @@ -747,9 +748,12 @@ class RejectPolicy (MissingHostKeyPolicy): """ def missing_host_key(self, client, hostname, key): - client._log(DEBUG, 'Rejecting %s host key for %s: %s' % - (key.get_name(), hostname, hexlify(key.get_fingerprint()))) - raise SSHException('Server %r not found in known_hosts' % hostname) + client._log(DEBUG, 'Rejecting {} host key for {}: {}'.format( + key.get_name(), hostname, hexlify(key.get_fingerprint()), + )) + raise SSHException( + 'Server {!r} not found in known_hosts'.format(hostname) + ) class WarningPolicy (MissingHostKeyPolicy): @@ -758,6 +762,6 @@ class WarningPolicy (MissingHostKeyPolicy): accepting it. This is used by `.SSHClient`. """ def missing_host_key(self, client, hostname, key): - warnings.warn('Unknown %s host key for %s: %s' % - (key.get_name(), hostname, hexlify( - key.get_fingerprint()))) + warnings.warn('Unknown {} host key for {}: {}'.format( + key.get_name(), hostname, hexlify(key.get_fingerprint()), + )) diff --git a/paramiko/config.py b/paramiko/config.py index 073abb36..038d84ea 100644 --- a/paramiko/config.py +++ b/paramiko/config.py @@ -65,7 +65,7 @@ class SSHConfig (object): match = re.match(self.SETTINGS_REGEX, line) if not match: - raise Exception("Unparsable line %s" % line) + raise Exception("Unparsable line {}".format(line)) key = match.group(1).lower() value = match.group(2) @@ -239,7 +239,7 @@ class SSHConfig (object): try: return shlex.split(host) except ValueError: - raise Exception("Unparsable host %s" % host) + raise Exception("Unparsable host {}".format(host)) class LazyFqdn(object): diff --git a/paramiko/ecdsakey.py b/paramiko/ecdsakey.py index 1bb5676f..92e01a75 100644 --- a/paramiko/ecdsakey.py +++ b/paramiko/ecdsakey.py @@ -134,7 +134,7 @@ class ECDSAKey(PKey): ) key_types = self._ECDSA_CURVES.get_key_format_identifier_list() cert_types = [ - '{0}-cert-v01@openssh.com'.format(x) + '{}-cert-v01@openssh.com'.format(x) for x in key_types ] self._check_type_and_load_cert( @@ -144,7 +144,9 @@ class ECDSAKey(PKey): ) curvename = msg.get_text() if curvename != self.ecdsa_curve.nist_name: - raise SSHException("Can't handle curve of type %s" % curvename) + raise SSHException( + "Can't handle curve of type {}".format(curvename) + ) pointinfo = msg.get_binary() try: @@ -249,7 +251,7 @@ class ECDSAKey(PKey): if bits is not None: curve = cls._ECDSA_CURVES.get_by_key_length(bits) if curve is None: - raise ValueError("Unsupported key length: %d" % bits) + raise ValueError("Unsupported key length: {:d}".format(bits)) curve = curve.curve_class() private_key = ec.generate_private_key(curve, backend=default_backend()) diff --git a/paramiko/file.py b/paramiko/file.py index a1bdafbe..df9cdac7 100644 --- a/paramiko/file.py +++ b/paramiko/file.py @@ -338,7 +338,7 @@ class BufferedFile (ClosingContextManager): after rounding up to an internal buffer size) are read. :param int sizehint: desired maximum number of bytes to read. - :returns: `list` of lines read from the file. + :returns: list of lines read from the file. """ lines = [] byte_count = 0 diff --git a/paramiko/hostkeys.py b/paramiko/hostkeys.py index d023b33d..ca185273 100644 --- a/paramiko/hostkeys.py +++ b/paramiko/hostkeys.py @@ -300,7 +300,7 @@ class HostKeys (MutableMapping): salt = decodebytes(b(salt)) assert len(salt) == sha1().digest_size hmac = HMAC(salt, b(hostname), sha1).digest() - hostkey = '|1|%s|%s' % (u(encodebytes(salt)), u(encodebytes(hmac))) + hostkey = '|1|{}|{}'.format(u(encodebytes(salt)), u(encodebytes(hmac))) return hostkey.replace('\n', '') @@ -338,8 +338,8 @@ class HostKeyEntry: fields = line.split(' ') if len(fields) < 3: # Bad number of fields - log.info("Not enough fields found in known_hosts in line %s (%r)" % - (lineno, line)) + msg = "Not enough fields found in known_hosts in line {} ({!r})" + log.info(msg.format(lineno, line)) return None fields = fields[:3] @@ -359,7 +359,7 @@ class HostKeyEntry: elif keytype == 'ssh-ed25519': key = Ed25519Key(data=decodebytes(key)) else: - log.info("Unable to handle key of type %s" % (keytype,)) + log.info("Unable to handle key of type {}".format(keytype)) return None except binascii.Error as e: @@ -374,11 +374,12 @@ class HostKeyEntry: included. """ if self.valid: - return '%s %s %s\n' % ( + return '{} {} {}\n'.format( ','.join(self.hostnames), self.key.get_name(), - self.key.get_base64()) + self.key.get_base64(), + ) return None def __repr__(self): - return '<HostKeyEntry %r: %r>' % (self.hostnames, self.key) + return '<HostKeyEntry {!r}: {!r}>'.format(self.hostnames, self.key) diff --git a/paramiko/kex_ecdh_nist.py b/paramiko/kex_ecdh_nist.py index 702a872d..4e8ff35d 100644 --- a/paramiko/kex_ecdh_nist.py +++ b/paramiko/kex_ecdh_nist.py @@ -45,7 +45,9 @@ class KexNistp256(): return self._parse_kexecdh_init(m) elif not self.transport.server_mode and (ptype == _MSG_KEXECDH_REPLY): return self._parse_kexecdh_reply(m) - raise SSHException('KexECDH asked to handle packet type %d' % ptype) + raise SSHException( + 'KexECDH asked to handle packet type {:d}'.format(ptype) + ) def _generate_key_pair(self): self.P = ec.generate_private_key(self.curve, default_backend()) diff --git a/paramiko/kex_gex.py b/paramiko/kex_gex.py index ba45da18..44030569 100644 --- a/paramiko/kex_gex.py +++ b/paramiko/kex_gex.py @@ -91,8 +91,8 @@ class KexGex (object): return self._parse_kexdh_gex_reply(m) elif ptype == _MSG_KEXDH_GEX_REQUEST_OLD: return self._parse_kexdh_gex_request_old(m) - raise SSHException( - 'KexGex %s asked to handle packet type %d' % self.name, ptype) + msg = "KexGex {} asked to handle packet type {:d}" + raise SSHException(msg.format(self.name, ptype)) # ...internals... @@ -141,8 +141,10 @@ class KexGex (object): 'Can\'t do server-side gex with no modulus pack') self.transport._log( DEBUG, - 'Picking p (%d <= %d <= %d bits)' % ( - minbits, preferredbits, maxbits)) + 'Picking p ({} <= {} <= {} bits)'.format( + minbits, preferredbits, maxbits, + ) + ) self.g, self.p = pack.get_modulus(minbits, preferredbits, maxbits) m = Message() m.add_byte(c_MSG_KEXDH_GEX_GROUP) @@ -166,7 +168,8 @@ class KexGex (object): raise SSHException( 'Can\'t do server-side gex with no modulus pack') self.transport._log( - DEBUG, 'Picking p (~ %d bits)' % (self.preferred_bits,)) + DEBUG, 'Picking p (~ {} bits)'.format(self.preferred_bits) + ) self.g, self.p = pack.get_modulus( self.min_bits, self.preferred_bits, self.max_bits) m = Message() @@ -185,8 +188,8 @@ class KexGex (object): if (bitlen < 1024) or (bitlen > 8192): raise SSHException( 'Server-generated gex p (don\'t ask) is out of range ' - '(%d bits)' % bitlen) - self.transport._log(DEBUG, 'Got server p (%d bits)' % bitlen) + '({} bits)'.format(bitlen)) + self.transport._log(DEBUG, 'Got server p ({} bits)'.format(bitlen)) self._generate_x() # now compute e = g^x mod p self.e = pow(self.g, self.x, self.p) diff --git a/paramiko/kex_group1.py b/paramiko/kex_group1.py index e8f042b1..1bebd375 100644 --- a/paramiko/kex_group1.py +++ b/paramiko/kex_group1.py @@ -73,7 +73,8 @@ class KexGroup1(object): return self._parse_kexdh_init(m) elif not self.transport.server_mode and (ptype == _MSG_KEXDH_REPLY): return self._parse_kexdh_reply(m) - raise SSHException('KexGroup1 asked to handle packet type %d' % ptype) + msg = "KexGroup1 asked to handle packet type {:d}" + raise SSHException(msg.format(ptype)) # ...internals... diff --git a/paramiko/kex_gss.py b/paramiko/kex_gss.py index a2ea9fca..e21620fe 100644 --- a/paramiko/kex_gss.py +++ b/paramiko/kex_gss.py @@ -120,8 +120,8 @@ class KexGSSGroup1(object): return self._parse_kexgss_complete(m) elif ptype == MSG_KEXGSS_ERROR: return self._parse_kexgss_error(m) - raise SSHException('GSS KexGroup1 asked to handle packet type %d' - % ptype) + msg = 'GSS KexGroup1 asked to handle packet type {:d}' + raise SSHException(msg.format(ptype)) # ## internals... @@ -282,10 +282,11 @@ class KexGSSGroup1(object): min_status = m.get_int() err_msg = m.get_string() m.get_string() # we don't care about the language! - raise SSHException("GSS-API Error:\nMajor Status: %s\nMinor Status: %s\ - \nError Message: %s\n") % (str(maj_status), - str(min_status), - err_msg) + raise SSHException("""GSS-API Error: +Major Status: {} +Minor Status: {} +Error Message: {} +""".format(maj_status, min_status, err_msg)) class KexGSSGroup14(KexGSSGroup1): @@ -361,7 +362,8 @@ class KexGSSGex(object): return self._parse_kexgss_complete(m) elif ptype == MSG_KEXGSS_ERROR: return self._parse_kexgss_error(m) - raise SSHException('KexGex asked to handle packet type %d' % ptype) + msg = 'KexGex asked to handle packet type {:d}' + raise SSHException(msg.format(ptype)) # ## internals... @@ -416,8 +418,10 @@ class KexGSSGex(object): 'Can\'t do server-side gex with no modulus pack') self.transport._log( DEBUG, # noqa - 'Picking p (%d <= %d <= %d bits)' % ( - minbits, preferredbits, maxbits)) + 'Picking p ({} <= {} <= {} bits)'.format( + minbits, preferredbits, maxbits, + ) + ) self.g, self.p = pack.get_modulus(minbits, preferredbits, maxbits) m = Message() m.add_byte(c_MSG_KEXGSS_GROUP) @@ -439,8 +443,8 @@ class KexGSSGex(object): if (bitlen < 1024) or (bitlen > 8192): raise SSHException( 'Server-generated gex p (don\'t ask) is out of range ' - '(%d bits)' % bitlen) - self.transport._log(DEBUG, 'Got server p (%d bits)' % bitlen) # noqa + '({} bits)'.format(bitlen)) + self.transport._log(DEBUG, 'Got server p ({} bits)'.format(bitlen)) # noqa self._generate_x() # now compute e = g^x mod p self.e = pow(self.g, self.x, self.p) @@ -603,10 +607,11 @@ class KexGSSGex(object): min_status = m.get_int() err_msg = m.get_string() m.get_string() # we don't care about the language (lang_tag)! - raise SSHException("GSS-API Error:\nMajor Status: %s\nMinor Status: %s\ - \nError Message: %s\n") % (str(maj_status), - str(min_status), - err_msg) + raise SSHException("""GSS-API Error: +Major Status: {} +Minor Status: {} +Error Message: {} +""".format(maj_status, min_status, err_msg)) class NullHostKey(object): diff --git a/paramiko/message.py b/paramiko/message.py index f8ed6170..9af841da 100644 --- a/paramiko/message.py +++ b/paramiko/message.py @@ -187,7 +187,7 @@ class Message (object): def get_list(self): """ - Fetch a `list` of `strings <str>` from the stream. + Fetch a list of `strings <str>` from the stream. These are trivially encoded as comma-separated values in a string. """ @@ -281,7 +281,7 @@ class Message (object): a single string of values separated by commas. (Yes, really, that's how SSH2 does it.) - :param list l: list of strings to add + :param l: list of strings to add """ self.add_string(','.join(l)) return self diff --git a/paramiko/packet.py b/paramiko/packet.py index 95a26c6e..2a1e91e2 100644 --- a/paramiko/packet.py +++ b/paramiko/packet.py @@ -368,7 +368,7 @@ class Packetizer (object): if cmd in MSG_NAMES: cmd_name = MSG_NAMES[cmd] else: - cmd_name = '$%x' % cmd + cmd_name = '${:x}'.format(cmd) orig_len = len(data) self.__write_lock.acquire() try: @@ -378,7 +378,8 @@ class Packetizer (object): if self.__dump_packets: self._log( DEBUG, - 'Write packet <%s>, length %d' % (cmd_name, orig_len)) + 'Write packet <{}>, length {}'.format(cmd_name, orig_len) + ) self._log(DEBUG, util.format_binary(packet, 'OUT: ')) if self.__block_engine_out is not None: out = self.__block_engine_out.update(packet) @@ -404,8 +405,10 @@ class Packetizer (object): ) if sent_too_much and not self.__need_rekey: # only ask once for rekeying - self._log(DEBUG, 'Rekeying (hit %d packets, %d bytes sent)' % - (self.__sent_packets, self.__sent_bytes)) + msg = "Rekeying (hit {} packets, {} bytes sent)" + self._log(DEBUG, msg.format( + self.__sent_packets, self.__sent_bytes, + )) self.__received_bytes_overflow = 0 self.__received_packets_overflow = 0 self._trigger_rekey() @@ -456,7 +459,10 @@ class Packetizer (object): if self.__dump_packets: self._log( DEBUG, - 'Got payload (%d bytes, %d padding)' % (packet_size, padding)) + 'Got payload ({} bytes, {} padding)'.format( + packet_size, padding + ) + ) if self.__compress_engine_in is not None: payload = self.__compress_engine_in(payload) @@ -483,8 +489,10 @@ class Packetizer (object): elif (self.__received_packets >= self.REKEY_PACKETS) or \ (self.__received_bytes >= self.REKEY_BYTES): # only ask once for rekeying - self._log(DEBUG, 'Rekeying (hit %d packets, %d bytes received)' % - (self.__received_packets, self.__received_bytes)) + err = "Rekeying (hit {} packets, {} bytes received)" + self._log(DEBUG, err.format( + self.__received_packets, self.__received_bytes, + )) self.__received_bytes_overflow = 0 self.__received_packets_overflow = 0 self._trigger_rekey() @@ -493,11 +501,12 @@ class Packetizer (object): if cmd in MSG_NAMES: cmd_name = MSG_NAMES[cmd] else: - cmd_name = '$%x' % cmd + cmd_name = '${:x}'.format(cmd) if self.__dump_packets: self._log( DEBUG, - 'Read packet <%s>, length %d' % (cmd_name, len(payload))) + 'Read packet <{}>, length {}'.format(cmd_name, len(payload)) + ) return cmd, msg # ...protected... diff --git a/paramiko/pkey.py b/paramiko/pkey.py index 67723be2..808215f8 100644 --- a/paramiko/pkey.py +++ b/paramiko/pkey.py @@ -310,16 +310,18 @@ class PKey(object): # unencryped: done return data # encrypted keyfile: will need a password - if headers['proc-type'] != '4,ENCRYPTED': + proc_type = headers['proc-type'] + if proc_type != '4,ENCRYPTED': raise SSHException( - 'Unknown private key structure "%s"' % headers['proc-type']) + 'Unknown private key structure "{}"'.format(proc_type) + ) try: encryption_type, saltstr = headers['dek-info'].split(',') except: raise SSHException("Can't parse DEK-info in private key file") if encryption_type not in self._CIPHER_TABLE: raise SSHException( - 'Unknown private key cipher "%s"' % encryption_type) + 'Unknown private key cipher "{}"'.format(encryption_type)) # if no password was passed in, # raise an exception pointing out that we need one if password is None: @@ -409,7 +411,7 @@ class PKey(object): # (requires going back into per-type subclasses.) msg.get_string() else: - err = 'Invalid key (class: {0}, data type: {1}' + err = 'Invalid key (class: {}, data type: {}' raise SSHException(err.format(self.__class__.__name__, type_)) def load_certificate(self, value): @@ -439,7 +441,7 @@ class PKey(object): constructor = 'from_string' blob = getattr(PublicBlob, constructor)(value) if not blob.key_type.startswith(self.get_name()): - err = "PublicBlob type {0} incompatible with key type {1}" + err = "PublicBlob type {} incompatible with key type {}" raise ValueError(err.format(blob.key_type, self.get_name())) self.public_blob = blob @@ -490,7 +492,7 @@ class PublicBlob(object): """ fields = string.split(None, 2) if len(fields) < 2: - msg = "Not enough fields for public blob: {0}" + msg = "Not enough fields for public blob: {}" raise ValueError(msg.format(fields)) key_type = fields[0] key_blob = decodebytes(b(fields[1])) @@ -503,7 +505,7 @@ class PublicBlob(object): m = Message(key_blob) blob_type = m.get_text() if blob_type != key_type: - msg = "Invalid PublicBlob contents: key type={0!r}, but blob type={1!r}" # noqa + msg = "Invalid PublicBlob contents: key type={!r}, but blob type={!r}" # noqa raise ValueError(msg.format(key_type, blob_type)) # All good? All good. return cls(type_=key_type, blob=key_blob, comment=comment) @@ -520,9 +522,9 @@ class PublicBlob(object): return cls(type_=type_, blob=message.asbytes()) def __str__(self): - ret = '{0} public key/certificate'.format(self.key_type) + ret = '{} public key/certificate'.format(self.key_type) if self.comment: - ret += "- {0}".format(self.comment) + ret += "- {}".format(self.comment) return ret def __eq__(self, other): diff --git a/paramiko/primes.py b/paramiko/primes.py index 65617914..ca8f9bec 100644 --- a/paramiko/primes.py +++ b/paramiko/primes.py @@ -91,7 +91,7 @@ class ModulusPack (object): bl = util.bit_length(modulus) if (bl != size) and (bl != size + 1): self.discarded.append( - (modulus, 'incorrectly reported bit length %d' % size)) + (modulus, 'incorrectly reported bit length {}'.format(size))) return if bl not in self.pack: self.pack[bl] = [] diff --git a/paramiko/py3compat.py b/paramiko/py3compat.py index 6703ace8..cb9de412 100644 --- a/paramiko/py3compat.py +++ b/paramiko/py3compat.py @@ -46,7 +46,7 @@ if PY2: elif isinstance(s, buffer): # NOQA return s else: - raise TypeError("Expected unicode or bytes, got %r" % s) + raise TypeError("Expected unicode or bytes, got {!r}".format(s)) def u(s, encoding='utf8'): # NOQA @@ -58,7 +58,7 @@ if PY2: elif isinstance(s, buffer): # NOQA return s.decode(encoding) else: - raise TypeError("Expected unicode or bytes, got %r" % s) + raise TypeError("Expected unicode or bytes, got {!r}".format(s)) def b2s(s): @@ -135,7 +135,7 @@ else: elif isinstance(s, str): return s.encode(encoding) else: - raise TypeError("Expected unicode or bytes, got %r" % s) + raise TypeError("Expected unicode or bytes, got {!r}".format(s)) def u(s, encoding='utf8'): """cast bytes or unicode to unicode""" @@ -144,7 +144,7 @@ else: elif isinstance(s, str): return s else: - raise TypeError("Expected unicode or bytes, got %r" % s) + raise TypeError("Expected unicode or bytes, got {!r}".format(s)) def b2s(s): return s.decode() if isinstance(s, bytes) else s diff --git a/paramiko/server.py b/paramiko/server.py index c96126e9..a7117815 100644 --- a/paramiko/server.py +++ b/paramiko/server.py @@ -223,7 +223,7 @@ class ServerInterface (object): The default implementation always returns ``AUTH_FAILED``. - :param list responses: list of `str` responses from the client + :param responses: list of `str` responses from the client :return: ``AUTH_FAILED`` if the authentication fails; ``AUTH_SUCCESSFUL`` if it succeeds; ``AUTH_PARTIALLY_SUCCESSFUL`` if the interactive auth @@ -667,12 +667,13 @@ class SubsystemHandler (threading.Thread): def _run(self): try: self.__transport._log( - DEBUG, 'Starting handler for subsystem %s' % self.__name) + DEBUG, 'Starting handler for subsystem {}'.format(self.__name) + ) self.start_subsystem(self.__name, self.__transport, self.__channel) except Exception as e: self.__transport._log( ERROR, - 'Exception in subsystem handler for "{0}": {1}'.format( + 'Exception in subsystem handler for "{}": {}'.format( self.__name, e ) ) diff --git a/paramiko/sftp_attr.py b/paramiko/sftp_attr.py index 5597948a..ea12b2f6 100644 --- a/paramiko/sftp_attr.py +++ b/paramiko/sftp_attr.py @@ -82,7 +82,7 @@ class SFTPAttributes (object): return attr def __repr__(self): - return '<SFTPAttributes: %s>' % self._debug_str() + return '<SFTPAttributes: {}>'.format(self._debug_str()) # ...internals... @classmethod @@ -146,15 +146,15 @@ class SFTPAttributes (object): def _debug_str(self): out = '[ ' if self.st_size is not None: - out += 'size=%d ' % self.st_size + out += 'size={} '.format(self.st_size) if (self.st_uid is not None) and (self.st_gid is not None): - out += 'uid=%d gid=%d ' % (self.st_uid, self.st_gid) + out += 'uid={} gid={} '.format(self.st_uid, self.st_gid) if self.st_mode is not None: out += 'mode=' + oct(self.st_mode) + ' ' if (self.st_atime is not None) and (self.st_mtime is not None): - out += 'atime=%d mtime=%d ' % (self.st_atime, self.st_mtime) + out += 'atime={} mtime={} '.format(self.st_atime, self.st_mtime) for k, v in self.attr.items(): - out += '"%s"=%r ' % (str(k), v) + out += '"{}"={!r} '.format(str(k), v) out += ']' return out @@ -222,8 +222,12 @@ class SFTPAttributes (object): if size is None: size = 0 + # TODO: not sure this actually worked as expected beforehand, leaving + # it untouched for the time being, re: .format() upgrade, until someone + # has time to doublecheck return '%s 1 %-8d %-8d %8d %-12s %s' % ( - ks, uid, gid, size, datestr, filename) + ks, uid, gid, size, datestr, filename, + ) def asbytes(self): return b(str(self)) diff --git a/paramiko/sftp_client.py b/paramiko/sftp_client.py index 14b8b58a..b344dff3 100644 --- a/paramiko/sftp_client.py +++ b/paramiko/sftp_client.py @@ -105,7 +105,8 @@ class SFTPClient(BaseSFTP, ClosingContextManager): raise SSHException('EOF during negotiation') self._log( INFO, - 'Opened sftp connection (server version %d)' % server_version) + 'Opened sftp connection (server version {})'.format(server_version) + ) @classmethod def from_transport(cls, t, window_size=None, max_packet_size=None): @@ -143,6 +144,8 @@ class SFTPClient(BaseSFTP, ClosingContextManager): for m in msg: self._log(level, m, *args) else: + # NOTE: these bits MUST continue using %-style format junk because + # logging.Logger.log() explicitly requires it. Grump. # escape '%' in msg (they could come from file or directory names) # before logging msg = msg.replace('%', '%%') @@ -200,7 +203,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): .. versionadded:: 1.2 """ path = self._adjust_cwd(path) - self._log(DEBUG, 'listdir(%r)' % path) + self._log(DEBUG, 'listdir({!r})'.format(path)) t, msg = self._request(CMD_OPENDIR, path) if t != CMD_HANDLE: raise SFTPError('Expected handle') @@ -239,7 +242,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): .. versionadded:: 1.15 """ path = self._adjust_cwd(path) - self._log(DEBUG, 'listdir(%r)' % path) + self._log(DEBUG, 'listdir({!r})'.format(path)) t, msg = self._request(CMD_OPENDIR, path) if t != CMD_HANDLE: @@ -322,7 +325,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :raises: ``IOError`` -- if the file could not be opened. """ filename = self._adjust_cwd(filename) - self._log(DEBUG, 'open(%r, %r)' % (filename, mode)) + self._log(DEBUG, 'open({!r}, {!r})'.format(filename, mode)) imode = 0 if ('r' in mode) or ('+' in mode): imode |= SFTP_FLAG_READ @@ -341,7 +344,8 @@ class SFTPClient(BaseSFTP, ClosingContextManager): handle = msg.get_binary() self._log( DEBUG, - 'open(%r, %r) -> %s' % (filename, mode, u(hexlify(handle)))) + 'open({!r}, {!r}) -> {}'.format(filename, mode, u(hexlify(handle))) + ) return SFTPFile(self, handle, mode, bufsize) # Python continues to vacillate about "open" vs "file"... @@ -357,7 +361,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :raises: ``IOError`` -- if the path refers to a folder (directory) """ path = self._adjust_cwd(path) - self._log(DEBUG, 'remove(%r)' % path) + self._log(DEBUG, 'remove({!r})'.format(path)) self._request(CMD_REMOVE, path) unlink = remove @@ -382,7 +386,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): """ oldpath = self._adjust_cwd(oldpath) newpath = self._adjust_cwd(newpath) - self._log(DEBUG, 'rename(%r, %r)' % (oldpath, newpath)) + self._log(DEBUG, 'rename({!r}, {!r})'.format(oldpath, newpath)) self._request(CMD_RENAME, oldpath, newpath) def posix_rename(self, oldpath, newpath): @@ -402,7 +406,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): """ oldpath = self._adjust_cwd(oldpath) newpath = self._adjust_cwd(newpath) - self._log(DEBUG, 'posix_rename(%r, %r)' % (oldpath, newpath)) + self._log(DEBUG, 'posix_rename({!r}, {!r})'.format(oldpath, newpath)) self._request( CMD_EXTENDED, "posix-rename@openssh.com", oldpath, newpath ) @@ -417,7 +421,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :param int mode: permissions (posix-style) for the newly-created folder """ path = self._adjust_cwd(path) - self._log(DEBUG, 'mkdir(%r, %r)' % (path, mode)) + self._log(DEBUG, 'mkdir({!r}, {!r})'.format(path, mode)) attr = SFTPAttributes() attr.st_mode = mode self._request(CMD_MKDIR, path, attr) @@ -429,7 +433,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :param str path: name of the folder to remove """ path = self._adjust_cwd(path) - self._log(DEBUG, 'rmdir(%r)' % path) + self._log(DEBUG, 'rmdir({!r})'.format(path)) self._request(CMD_RMDIR, path) def stat(self, path): @@ -452,7 +456,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): file """ path = self._adjust_cwd(path) - self._log(DEBUG, 'stat(%r)' % path) + self._log(DEBUG, 'stat({!r})'.format(path)) t, msg = self._request(CMD_STAT, path) if t != CMD_ATTRS: raise SFTPError('Expected attributes') @@ -470,7 +474,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): file """ path = self._adjust_cwd(path) - self._log(DEBUG, 'lstat(%r)' % path) + self._log(DEBUG, 'lstat({!r})'.format(path)) t, msg = self._request(CMD_LSTAT, path) if t != CMD_ATTRS: raise SFTPError('Expected attributes') @@ -484,7 +488,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :param str dest: path of the newly created symlink """ dest = self._adjust_cwd(dest) - self._log(DEBUG, 'symlink(%r, %r)' % (source, dest)) + self._log(DEBUG, 'symlink({!r}, {!r})'.format(source, dest)) source = bytestring(source) self._request(CMD_SYMLINK, source, dest) @@ -498,7 +502,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :param int mode: new permissions """ path = self._adjust_cwd(path) - self._log(DEBUG, 'chmod(%r, %r)' % (path, mode)) + self._log(DEBUG, 'chmod({!r}, {!r})'.format(path, mode)) attr = SFTPAttributes() attr.st_mode = mode self._request(CMD_SETSTAT, path, attr) @@ -515,7 +519,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :param int gid: new group id """ path = self._adjust_cwd(path) - self._log(DEBUG, 'chown(%r, %r, %r)' % (path, uid, gid)) + self._log(DEBUG, 'chown({!r}, {!r}, {!r})'.format(path, uid, gid)) attr = SFTPAttributes() attr.st_uid, attr.st_gid = uid, gid self._request(CMD_SETSTAT, path, attr) @@ -537,7 +541,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): path = self._adjust_cwd(path) if times is None: times = (time.time(), time.time()) - self._log(DEBUG, 'utime(%r, %r)' % (path, times)) + self._log(DEBUG, 'utime({!r}, {!r})'.format(path, times)) attr = SFTPAttributes() attr.st_atime, attr.st_mtime = times self._request(CMD_SETSTAT, path, attr) @@ -552,7 +556,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :param int size: the new size of the file """ path = self._adjust_cwd(path) - self._log(DEBUG, 'truncate(%r, %r)' % (path, size)) + self._log(DEBUG, 'truncate({!r}, {!r})'.format(path, size)) attr = SFTPAttributes() attr.st_size = size self._request(CMD_SETSTAT, path, attr) @@ -567,7 +571,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :return: target path, as a `str` """ path = self._adjust_cwd(path) - self._log(DEBUG, 'readlink(%r)' % path) + self._log(DEBUG, 'readlink({!r})'.format(path)) t, msg = self._request(CMD_READLINK, path) if t != CMD_NAME: raise SFTPError('Expected name response') @@ -575,7 +579,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): if count == 0: return None if count != 1: - raise SFTPError('Readlink returned %d results' % count) + raise SFTPError('Readlink returned {} results'.format(count)) return _to_unicode(msg.get_string()) def normalize(self, path): @@ -591,13 +595,13 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :raises: ``IOError`` -- if the path can't be resolved on the server """ path = self._adjust_cwd(path) - self._log(DEBUG, 'normalize(%r)' % path) + self._log(DEBUG, 'normalize({!r})'.format(path)) t, msg = self._request(CMD_REALPATH, path) if t != CMD_NAME: raise SFTPError('Expected name response') count = msg.get_int() if count != 1: - raise SFTPError('Realpath returned %d results' % count) + raise SFTPError('Realpath returned {} results'.format(count)) return msg.get_text() def chdir(self, path=None): @@ -620,8 +624,10 @@ class SFTPClient(BaseSFTP, ClosingContextManager): self._cwd = None return if not stat.S_ISDIR(self.stat(path).st_mode): + code = errno.ENOTDIR raise SFTPError( - errno.ENOTDIR, "%s: %s" % (os.strerror(errno.ENOTDIR), path)) + code, "{}: {}".format(os.strerror(code), path) + ) self._cwd = b(self.normalize(path)) def getcwd(self): @@ -683,7 +689,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): s = self.stat(remotepath) if s.st_size != size: raise IOError( - 'size mismatch in put! %d != %d' % (s.st_size, size)) + 'size mismatch in put! {} != {}'.format(s.st_size, size)) else: s = SFTPAttributes() return s @@ -765,7 +771,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): s = os.stat(localpath) if s.st_size != size: raise IOError( - 'size mismatch in get! %d != %d' % (s.st_size, size)) + 'size mismatch in get! {} != {}'.format(s.st_size, size)) # ...internals... @@ -803,7 +809,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): try: t, data = self._read_packet() except EOFError as e: - raise SSHException('Server connection dropped: %s' % str(e)) + raise SSHException('Server connection dropped: {}'.format(e)) msg = Message(data) num = msg.get_int() self._lock.acquire() @@ -811,7 +817,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): if num not in self._expecting: # might be response for a file that was closed before # responses came back - self._log(DEBUG, 'Unexpected response #%d' % (num,)) + self._log(DEBUG, 'Unexpected response #{}'.format(num)) if waitfor is None: # just doing a single check break diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index bc34db94..52f2bde8 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -83,7 +83,7 @@ class SFTPFile (BufferedFile): # __del__.) if self._closed: return - self.sftp._log(DEBUG, 'close(%s)' % u(hexlify(self.handle))) + self.sftp._log(DEBUG, 'close({})'.format(u(hexlify(self.handle)))) if self.pipelined: self.sftp._finish_responses(self) BufferedFile.close(self) @@ -288,7 +288,8 @@ class SFTPFile (BufferedFile): :param int mode: new permissions """ - self.sftp._log(DEBUG, 'chmod(%s, %r)' % (hexlify(self.handle), mode)) + self.sftp._log(DEBUG, 'chmod({}, {!r})'.format( + hexlify(self.handle), mode)) attr = SFTPAttributes() attr.st_mode = mode self.sftp._request(CMD_FSETSTAT, self.handle, attr) @@ -305,7 +306,7 @@ class SFTPFile (BufferedFile): """ self.sftp._log( DEBUG, - 'chown(%s, %r, %r)' % (hexlify(self.handle), uid, gid)) + 'chown({}, {!r}, {!r})'.format(hexlify(self.handle), uid, gid)) attr = SFTPAttributes() attr.st_uid, attr.st_gid = uid, gid self.sftp._request(CMD_FSETSTAT, self.handle, attr) @@ -325,7 +326,8 @@ class SFTPFile (BufferedFile): """ if times is None: times = (time.time(), time.time()) - self.sftp._log(DEBUG, 'utime(%s, %r)' % (hexlify(self.handle), times)) + self.sftp._log(DEBUG, 'utime({}, {!r})'.format( + hexlify(self.handle), times)) attr = SFTPAttributes() attr.st_atime, attr.st_mtime = times self.sftp._request(CMD_FSETSTAT, self.handle, attr) @@ -340,7 +342,7 @@ class SFTPFile (BufferedFile): """ self.sftp._log( DEBUG, - 'truncate(%s, %r)' % (hexlify(self.handle), size)) + 'truncate({}, {!r})'.format(hexlify(self.handle), size)) attr = SFTPAttributes() attr.st_size = size self.sftp._request(CMD_FSETSTAT, self.handle, attr) @@ -473,7 +475,8 @@ class SFTPFile (BufferedFile): .. versionadded:: 1.5.4 """ - self.sftp._log(DEBUG, 'readv(%s, %r)' % (hexlify(self.handle), chunks)) + self.sftp._log(DEBUG, 'readv({}, {!r})'.format( + hexlify(self.handle), chunks)) read_chunks = [] for offset, size in chunks: diff --git a/paramiko/sftp_server.py b/paramiko/sftp_server.py index f7d1c657..f8c4f727 100644 --- a/paramiko/sftp_server.py +++ b/paramiko/sftp_server.py @@ -100,7 +100,7 @@ class SFTPServer (BaseSFTP, SubsystemHandler): def start_subsystem(self, name, transport, channel): self.sock = channel - self._log(DEBUG, 'Started sftp server on channel %s' % repr(channel)) + self._log(DEBUG, 'Started sftp server on channel {!r}'.format(channel)) self._send_server_version() self.server.session_started() while True: @@ -200,7 +200,7 @@ class SFTPServer (BaseSFTP, SubsystemHandler): item._pack(msg) else: raise Exception( - 'unknown type for {0!r} type {1!r}'.format( + 'unknown type for {!r} type {!r}'.format( item, type(item))) self._send_packet(t, msg) @@ -209,7 +209,7 @@ class SFTPServer (BaseSFTP, SubsystemHandler): # must be error code self._send_status(request_number, handle) return - handle._set_name(b('hx%d' % self.next_handle)) + handle._set_name(b('hx{:d}'.format(self.next_handle))) self.next_handle += 1 if folder: self.folder_table[handle._get_name()] = handle @@ -334,7 +334,7 @@ class SFTPServer (BaseSFTP, SubsystemHandler): return flags def _process(self, t, request_number, msg): - self._log(DEBUG, 'Request: %s' % CMD_NAMES[t]) + self._log(DEBUG, 'Request: {}'.format(CMD_NAMES[t])) if t == CMD_OPEN: path = msg.get_text() flags = self._convert_pflags(msg.get_int()) diff --git a/paramiko/ssh_exception.py b/paramiko/ssh_exception.py index e9ab8d66..2df84b65 100644 --- a/paramiko/ssh_exception.py +++ b/paramiko/ssh_exception.py @@ -63,7 +63,7 @@ class BadAuthenticationType (AuthenticationException): self.args = (explanation, types, ) def __str__(self): - return '{0} (allowed_types={1!r})'.format( + return '{} (allowed_types={!r})'.format( SSHException.__str__(self), self.allowed_types ) @@ -107,7 +107,7 @@ class BadHostKeyException (SSHException): .. versionadded:: 1.6 """ def __init__(self, hostname, got_key, expected_key): - message = 'Host key for server {0} does not match: got {1}, expected {2}' # noqa + message = 'Host key for server {} does not match: got {}, expected {}' # noqa message = message.format( hostname, got_key.get_base64(), expected_key.get_base64()) @@ -128,7 +128,7 @@ class ProxyCommandFailure (SSHException): """ def __init__(self, command, error): SSHException.__init__(self, - '"ProxyCommand (%s)" returned non-zero exit status: %s' % ( + '"ProxyCommand ({})" returned non-zero exit status: {}'.format( command, error ) ) diff --git a/paramiko/ssh_gss.py b/paramiko/ssh_gss.py index b3c3f72b..88dedf7e 100644 --- a/paramiko/ssh_gss.py +++ b/paramiko/ssh_gss.py @@ -284,7 +284,7 @@ class _SSH_GSSAPI(_SSH_GSSAuth): else: token = self._gss_ctxt.step(recv_token) except gssapi.GSSException: - message = "{0} Target: {1}".format( + message = "{} Target: {}".format( sys.exc_info()[1], self._gss_host) raise gssapi.GSSException(message) self._gss_ctxt_status = self._gss_ctxt.established @@ -444,7 +444,7 @@ class _SSH_SSPI(_SSH_GSSAuth): error, token = self._gss_ctxt.authorize(recv_token) token = token[0].Buffer except pywintypes.error as e: - e.strerror += ", Target: {1}".format(e, self._gss_host) + e.strerror += ", Target: {}".format(e, self._gss_host) raise if error == 0: diff --git a/paramiko/transport.py b/paramiko/transport.py index 9df34caf..68519f90 100644 --- a/paramiko/transport.py +++ b/paramiko/transport.py @@ -100,7 +100,7 @@ class Transport(threading.Thread, ClosingContextManager): _DECRYPT = object() _PROTO_ID = '2.0' - _CLIENT_ID = 'paramiko_%s' % paramiko.__version__ + _CLIENT_ID = 'paramiko_{}'.format(paramiko.__version__) # These tuples of algorithm identifiers are in preference order; do not # reorder without reason! @@ -329,7 +329,7 @@ class Transport(threading.Thread, ClosingContextManager): break else: raise SSHException( - 'Unable to connect to %s: %s' % (hostname, reason)) + 'Unable to connect to {}: {}'.format(hostname, reason)) # okay, normal socket-ish flow here... threading.Thread.__init__(self) self.setDaemon(True) @@ -415,17 +415,19 @@ class Transport(threading.Thread, ClosingContextManager): """ Returns a string representation of this object, for debugging. """ - out = '<paramiko.Transport at %s' % hex(long(id(self)) & xffffffff) + id_ = hex(long(id(self)) & xffffffff) + out = '<paramiko.Transport at {}'.format(id_) if not self.active: out += ' (unconnected)' else: if self.local_cipher != '': - out += ' (cipher %s, %d bits)' % ( + out += ' (cipher {}, {:d} bits)'.format( self.local_cipher, self._cipher_info[self.local_cipher]['key-size'] * 8 ) if self.is_authenticated(): - out += ' (active; %d open channel(s))' % len(self._channels) + out += ' (active; {} open channel(s))'.format( + len(self._channels)) elif self.initial_kex_done: out += ' (connected; awaiting auth)' else: @@ -1061,7 +1063,7 @@ class Transport(threading.Thread, ClosingContextManager): m.add_boolean(wait) if data is not None: m.add(*data) - self._log(DEBUG, 'Sending global request "%s"' % kind) + self._log(DEBUG, 'Sending global request "{}"'.format(kind)) self._send_user_message(m) if not wait: return None @@ -1179,14 +1181,15 @@ class Transport(threading.Thread, ClosingContextManager): key.asbytes() != hostkey.asbytes() ): self._log(DEBUG, 'Bad host key from server') - self._log(DEBUG, 'Expected: %s: %s' % ( - hostkey.get_name(), repr(hostkey.asbytes())) - ) - self._log(DEBUG, 'Got : %s: %s' % ( - key.get_name(), repr(key.asbytes())) - ) + self._log(DEBUG, 'Expected: {}: {}'.format( + hostkey.get_name(), repr(hostkey.asbytes()), + )) + self._log(DEBUG, 'Got : {}: {}'.format( + key.get_name(), repr(key.asbytes()), + )) raise SSHException('Bad host key from server') - self._log(DEBUG, 'Host key verified (%s)' % hostkey.get_name()) + self._log(DEBUG, 'Host key verified ({})'.format( + hostkey.get_name())) if (pkey is not None) or (password is not None) or gss_auth or gss_kex: if gss_auth: @@ -1295,7 +1298,7 @@ class Transport(threading.Thread, ClosingContextManager): :param str username: the username to authenticate as :return: - `list` of auth types permissible for the next stage of + list of auth types permissible for the next stage of authentication (normally empty) :raises: @@ -1350,7 +1353,7 @@ class Transport(threading.Thread, ClosingContextManager): ``True`` if an attempt at an automated "interactive" password auth should be made if the server doesn't support normal password auth :return: - `list` of auth types permissible for the next stage of + list of auth types permissible for the next stage of authentication (normally empty) :raises: @@ -1421,7 +1424,7 @@ class Transport(threading.Thread, ClosingContextManager): an event to trigger when the authentication attempt is complete (whether it was successful or not) :return: - `list` of auth types permissible for the next stage of + list of auth types permissible for the next stage of authentication (normally empty) :raises: @@ -1479,7 +1482,7 @@ class Transport(threading.Thread, ClosingContextManager): :param callable handler: a handler for responding to server questions :param str submethods: a string list of desired submethods (optional) :return: - `list` of auth types permissible for the next stage of + list of auth types permissible for the next stage of authentication (normally empty). :raises: `.BadAuthenticationType` -- if public-key authentication isn't @@ -1529,7 +1532,6 @@ class Transport(threading.Thread, ClosingContextManager): :param bool gss_deleg_creds: Delegate credentials or not :return: list of auth types permissible for the next stage of authentication (normally empty) - :rtype: list :raises: `.BadAuthenticationType` -- if gssapi-with-mic isn't allowed by the server (and no event was passed in) :raises: @@ -1553,7 +1555,7 @@ class Transport(threading.Thread, ClosingContextManager): :param str username: The username to authenticate as. :returns: - a `list` of auth types permissible for the next stage of + a list of auth types permissible for the next stage of authentication (normally empty) :raises: `.BadAuthenticationType` -- if GSS-API Key Exchange was not performed (and no event was passed @@ -1746,7 +1748,7 @@ class Transport(threading.Thread, ClosingContextManager): if key is None: raise SSHException('Unknown host key type') if not key.verify_ssh_sig(self.H, Message(sig)): - raise SSHException('Signature verification (%s) failed.' % self.host_key_type) # noqa + raise SSHException('Signature verification ({}) failed.'.format(self.host_key_type)) # noqa self.host_key = key def _compute_key(self, id, nbytes): @@ -1759,8 +1761,8 @@ class Transport(threading.Thread, ClosingContextManager): # Fallback to SHA1 for kex engines that fail to specify a hex # algorithm, or for e.g. transport tests that don't run kexinit. hash_algo = getattr(self.kex_engine, 'hash_algo', None) - hash_select_msg = "kex engine %s specified hash_algo %r" % ( - self.kex_engine.__class__.__name__, hash_algo + hash_select_msg = "kex engine {} specified hash_algo {!r}".format( + self.kex_engine.__class__.__name__, hash_algo, ) if hash_algo is None: hash_algo = sha1 @@ -1844,13 +1846,13 @@ class Transport(threading.Thread, ClosingContextManager): _active_threads.append(self) tid = hex(long(id(self)) & xffffffff) if self.server_mode: - self._log(DEBUG, 'starting thread (server mode): %s' % tid) + self._log(DEBUG, 'starting thread (server mode): {}'.format(tid)) else: - self._log(DEBUG, 'starting thread (client mode): %s' % tid) + self._log(DEBUG, 'starting thread (client mode): {}'.format(tid)) try: try: self.packetizer.write_all(b(self.local_version + '\r\n')) - self._log(DEBUG, 'Local version/idstring: %s' % self.local_version) # noqa + self._log(DEBUG, 'Local version/idstring: {}'.format(self.local_version)) # noqa self._check_banner() # The above is actually very much part of the handshake, but # sometimes the banner can be read but the machine is not @@ -1880,7 +1882,7 @@ class Transport(threading.Thread, ClosingContextManager): continue if len(self._expected_packet) > 0: if ptype not in self._expected_packet: - raise SSHException('Expecting packet from %r, got %d' % (self._expected_packet, ptype)) # noqa + raise SSHException('Expecting packet from {!r}, got {:d}'.format(self._expected_packet, ptype)) # noqa self._expected_packet = tuple() if (ptype >= 30) and (ptype <= 41): self.kex_engine.parse_next(ptype, m) @@ -1894,9 +1896,9 @@ class Transport(threading.Thread, ClosingContextManager): if chan is not None: self._channel_handler_table[ptype](chan, m) elif chanid in self.channels_seen: - self._log(DEBUG, 'Ignoring message for dead channel %d' % chanid) # noqa + self._log(DEBUG, 'Ignoring message for dead channel {:d}'.format(chanid)) # noqa else: - self._log(ERROR, 'Channel request for unknown channel %d' % chanid) # noqa + self._log(ERROR, 'Channel request for unknown channel {:d}'.format(chanid)) # noqa break elif ( self.auth_handler is not None and @@ -1907,7 +1909,8 @@ class Transport(threading.Thread, ClosingContextManager): if len(self._expected_packet) > 0: continue else: - self._log(WARNING, 'Oops, unhandled type %d' % ptype) + err = 'Oops, unhandled type {:d}'.format(ptype) + self._log(WARNING, err) msg = Message() msg.add_byte(cMSG_UNIMPLEMENTED) msg.add_int(m.seqno) @@ -1923,7 +1926,7 @@ class Transport(threading.Thread, ClosingContextManager): except socket.error as e: if type(e.args) is tuple: if e.args: - emsg = '%s (%d)' % (e.args[1], e.args[0]) + emsg = '{} ({:d})'.format(e.args[1], e.args[0]) else: # empty tuple, e.g. socket.timeout emsg = str(e) or repr(e) else: @@ -1965,11 +1968,11 @@ class Transport(threading.Thread, ClosingContextManager): # Log useful, non-duplicative line re: an agreed-upon algorithm. # Old code implied algorithms could be asymmetrical (different for # inbound vs outbound) so we preserve that possibility. - msg = "{0} agreed: ".format(which) + msg = "{} agreed: ".format(which) if local == remote: msg += local else: - msg += "local={0}, remote={1}".format(local, remote) + msg += "local={}, remote={}".format(local, remote) self._log(DEBUG, msg) # protocol stages @@ -2011,7 +2014,7 @@ class Transport(threading.Thread, ClosingContextManager): raise SSHException('Indecipherable protocol version "' + buf + '"') # save this server version string for later self.remote_version = buf - self._log(DEBUG, 'Remote version/idstring: %s' % buf) + self._log(DEBUG, 'Remote version/idstring: {}'.format(buf)) # pull off any attached comment # NOTE: comment used to be stored in a variable and then...never used. # since 2003. ca 877cd974b8182d26fa76d566072917ea67b64e67 @@ -2025,9 +2028,9 @@ class Transport(threading.Thread, ClosingContextManager): version = segs[1] client = segs[2] if version != '1.99' and version != '2.0': - msg = 'Incompatible version ({0} instead of 2.0)' + msg = 'Incompatible version ({} instead of 2.0)' raise SSHException(msg.format(version)) - msg = 'Connected (version {0}, client {1})'.format(version, client) + msg = 'Connected (version {}, client {})'.format(version, client) self._log(INFO, msg) def _send_kex_init(self): @@ -2130,7 +2133,7 @@ class Transport(threading.Thread, ClosingContextManager): if len(agreed_kex) == 0: raise SSHException('Incompatible ssh peer (no acceptable kex algorithm)') # noqa self.kex_engine = self._kex_info[agreed_kex[0]](self) - self._log(DEBUG, "Kex agreed: %s" % agreed_kex[0]) + self._log(DEBUG, "Kex agreed: {}".format(agreed_kex[0])) if self.server_mode: available_server_keys = list(filter( @@ -2223,7 +2226,7 @@ class Transport(threading.Thread, ClosingContextManager): len(agreed_local_compression) == 0 or len(agreed_remote_compression) == 0 ): - msg = 'Incompatible ssh server (no acceptable compression) {0!r} {1!r} {2!r}' # noqa + msg = 'Incompatible ssh server (no acceptable compression) {!r} {!r} {!r}' # noqa raise SSHException(msg.format( agreed_local_compression, agreed_remote_compression, self._preferred_compression, @@ -2366,16 +2369,16 @@ class Transport(threading.Thread, ClosingContextManager): def _parse_disconnect(self, m): code = m.get_int() desc = m.get_text() - self._log(INFO, 'Disconnect (code %d): %s' % (code, desc)) + self._log(INFO, 'Disconnect (code {:d}): {}'.format(code, desc)) def _parse_global_request(self, m): kind = m.get_text() - self._log(DEBUG, 'Received global request "%s"' % kind) + self._log(DEBUG, 'Received global request "{}"'.format(kind)) want_reply = m.get_boolean() if not self.server_mode: self._log( DEBUG, - 'Rejecting "%s" global request from server.' % kind + 'Rejecting "{}" global request from server.'.format(kind) ) ok = False elif kind == 'tcpip-forward': @@ -2429,7 +2432,7 @@ class Transport(threading.Thread, ClosingContextManager): try: chan._set_remote_channel( server_chanid, server_window_size, server_max_packet_size) - self._log(DEBUG, 'Secsh channel %d opened.' % chanid) + self._log(DEBUG, 'Secsh channel {:d} opened.'.format(chanid)) if chanid in self.channel_events: self.channel_events[chanid].set() del self.channel_events[chanid] @@ -2445,8 +2448,9 @@ class Transport(threading.Thread, ClosingContextManager): reason_text = CONNECTION_FAILED_CODE.get(reason, '(unknown code)') self._log( ERROR, - 'Secsh channel %d open FAILED: %s: %s' % ( - chanid, reason_str, reason_text) + 'Secsh channel {:d} open FAILED: {}: {}'.format( + chanid, reason_str, reason_text, + ) ) self.lock.acquire() try: @@ -2481,8 +2485,9 @@ class Transport(threading.Thread, ClosingContextManager): origin_port = m.get_int() self._log( DEBUG, - 'Incoming x11 connection from %s:%d' % ( - origin_addr, origin_port) + 'Incoming x11 connection from {}:{:d}'.format( + origin_addr, origin_port, + ) ) self.lock.acquire() try: @@ -2496,8 +2501,9 @@ class Transport(threading.Thread, ClosingContextManager): origin_port = m.get_int() self._log( DEBUG, - 'Incoming tcp forwarded connection from %s:%d' % ( - origin_addr, origin_port) + 'Incoming tcp forwarded connection from {}:{:d}'.format( + origin_addr, origin_port, + ) ) self.lock.acquire() try: @@ -2507,7 +2513,7 @@ class Transport(threading.Thread, ClosingContextManager): elif not self.server_mode: self._log( DEBUG, - 'Rejecting "%s" channel request from server.' % kind) + 'Rejecting "{}" channel request from server.'.format(kind)) reject = True reason = OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED else: @@ -2533,7 +2539,7 @@ class Transport(threading.Thread, ClosingContextManager): if reason != OPEN_SUCCEEDED: self._log( DEBUG, - 'Rejecting "%s" channel request from client.' % kind) + 'Rejecting "{}" channel request from client.'.format(kind)) reject = True if reject: msg = Message() @@ -2564,7 +2570,9 @@ class Transport(threading.Thread, ClosingContextManager): m.add_int(self.default_window_size) m.add_int(self.default_max_packet_size) self._send_message(m) - self._log(DEBUG, 'Secsh channel %d (%s) opened.', my_chanid, kind) + self._log(DEBUG, + 'Secsh channel {:d} ({}) opened.'.format(my_chanid, kind) + ) if kind == 'auth-agent@openssh.com': self._forward_agent_handler(chan) elif kind == 'x11': @@ -2583,7 +2591,7 @@ class Transport(threading.Thread, ClosingContextManager): m.get_boolean() # always_display msg = m.get_string() m.get_string() # language - self._log(DEBUG, 'Debug msg: {0}'.format(util.safe_string(msg))) + self._log(DEBUG, 'Debug msg: {}'.format(util.safe_string(msg))) def _get_subsystem_handler(self, name): try: @@ -2638,7 +2646,7 @@ class SecurityOptions (object): """ Returns a string representation of this object, for debugging. """ - return '<paramiko.SecurityOptions for %s>' % repr(self._transport) + return '<paramiko.SecurityOptions for {!r}>'.format(self._transport) def _set(self, name, orig, x): if type(x) is list: diff --git a/paramiko/util.py b/paramiko/util.py index de099c0c..2854ef98 100644 --- a/paramiko/util.py +++ b/paramiko/util.py @@ -102,19 +102,22 @@ def format_binary(data, prefix=''): def format_binary_line(data): - left = ' '.join(['%02X' % byte_ord(c) for c in data]) - right = ''.join([('.%c..' % c)[(byte_ord(c) + 63) // 95] for c in data]) - return '%-50s %s' % (left, right) + left = ' '.join(['{:02X}'.format(byte_ord(c)) for c in data]) + right = ''.join([ + '.{:c}..'.format(byte_ord(c))[(byte_ord(c) + 63) // 95] + for c in data + ]) + return '{:50s} {}'.format(left, right) def safe_string(s): - out = b('') + out = b'' for c in s: i = byte_ord(c) if 32 <= i <= 127: out += byte_chr(i) else: - out += b('%%%02X' % i) + out += b('%{:02X}'.format(i)) return out diff --git a/paramiko/win_pageant.py b/paramiko/win_pageant.py index fda3b9c1..661ba575 100644 --- a/paramiko/win_pageant.py +++ b/paramiko/win_pageant.py @@ -44,7 +44,7 @@ win32con_WM_COPYDATA = 74 def _get_pageant_window_object(): - return ctypes.windll.user32.FindWindowA(b('Pageant'), b('Pageant')) + return ctypes.windll.user32.FindWindowA(b'Pageant', b'Pageant') def can_talk_to_agent(): @@ -65,11 +65,8 @@ setup( 'Topic :: Security :: Cryptography', 'Programming Language :: Python', 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.6', 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.2', - 'Programming Language :: Python :: 3.3', 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', diff --git a/sites/shared_conf.py b/sites/shared_conf.py index 99fab315..cf0d77ff 100644 --- a/sites/shared_conf.py +++ b/sites/shared_conf.py @@ -26,13 +26,13 @@ html_sidebars = { # Everything intersphinx's to Python intersphinx_mapping = { - 'python': ('http://docs.python.org/2.6', None), + 'python': ('https://docs.python.org/2.7/', None), } # Regular settings project = 'Paramiko' year = datetime.now().year -copyright = '%d Jeff Forcier' % year +copyright = '{} Jeff Forcier'.format(year) master_doc = 'index' templates_path = ['_templates'] exclude_trees = ['_build'] diff --git a/sites/www/changelog.rst b/sites/www/changelog.rst index 2a54714a..c8b584d7 100644 --- a/sites/www/changelog.rst +++ b/sites/www/changelog.rst @@ -5,7 +5,19 @@ Changelog * :bug:`1108 (1.17+)` Rename a private method keyword argument (which was named ``async``) so that we're compatible with the upcoming Python 3.7 release (where ``async`` is a new keyword.) Thanks to ``@vEpiphyte`` for the report. +* :support:`1100` Updated the test suite & related docs/metadata/config to be + compatible with pytest instead of using the old, custom, crufty + unittest-based ``test.py``. + + This includes marking known-slow tests (mostly the SFTP ones) so they can be + filtered out by ``inv test``'s default behavior; as well as other minor + tweaks to test collection and/or display (for example, GSSAPI tests are + collected, but skipped, instead of not even being collected by default as in + ``test.py``.) * :support:`- backported` Include LICENSE file in wheel archives. +* :support:`1070` Drop Python 2.6 and Python 3.3 support; now only 2.7 and 3.4+ + are supported. If you're unable to upgrade from 2.6 or 3.3, please stick to + the Paramiko 2.3.x (or below) release lines. * :release:`2.3.1 <2017-09-22>` * :bug:`1071` Certificate support broke the no-certificate case for Ed25519 keys (symptom is an ``AttributeError`` about ``public_blob``.) This went diff --git a/sites/www/index.rst b/sites/www/index.rst index f0a5db8a..26961f24 100644 --- a/sites/www/index.rst +++ b/sites/www/index.rst @@ -1,11 +1,11 @@ Welcome to Paramiko! ==================== -Paramiko is a Python (2.6+, 3.3+) implementation of the SSHv2 protocol [#]_, +Paramiko is a Python (2.7, 3.4+) implementation of the SSHv2 protocol [#]_, providing both client and server functionality. While it leverages a Python C -extension for low level cryptography -(`Cryptography <https://cryptography.io>`_), Paramiko itself is a pure Python -interface around SSH networking concepts. +extension for low level cryptography (`Cryptography +<https://cryptography.io>`_), Paramiko itself is a pure Python interface around +SSH networking concepts. This website covers project information for Paramiko such as the changelog, contribution guidelines, development roadmap, news/blog, and so forth. Detailed diff --git a/sites/www/installing.rst b/sites/www/installing.rst index f335a9e7..e6db2dca 100644 --- a/sites/www/installing.rst +++ b/sites/www/installing.rst @@ -19,8 +19,8 @@ via `pip <http://pip-installer.org>`_:: $ pip install paramiko -We currently support **Python 2.6, 2.7, 3.3+, and PyPy**. Users on Python 2.5 -or older (or 3.2 or older) are urged to upgrade. +We currently support **Python 2.7, 3.4+, and PyPy**. Users on Python 2.6 or +older (or 3.3 or older) are urged to upgrade. Paramiko has only one direct hard dependency: the Cryptography library. See :ref:`cryptography`. @@ -8,26 +8,46 @@ from invocations.packaging.release import ns as release_coll, publish from invocations.testing import count_errors -# Until we move to spec-based testing @task -def test(ctx, coverage=False, flags=""): - if "--verbose" not in flags.split(): - flags += " --verbose" - runner = "python" +def test(ctx, verbose=True, coverage=False, include_slow=False, opts=""): + """ + Run unit tests via pytest. + + By default, known-slow parts of the suite are SKIPPED unless + ``--include-slow`` is given. (Note that ``--include-slow`` does not mesh + well with explicit ``--opts="-m=xxx"`` - if ``-m`` is found in ``--opts``, + ``--include-slow`` will be ignored!) + """ + if verbose and '--verbose' not in opts and '-v' not in opts: + opts += " --verbose" + if '-m' not in opts and not include_slow: + opts += " -m 'not slow'" + runner = "pytest" if coverage: - runner = "coverage run --source=paramiko" + # Leverage how pytest can be run as 'python -m pytest', and then how + # coverage can be told to run things in that manner instead of + # expecting a literal .py file. + # TODO: get pytest's coverage plugin working, IIRC it has issues? + runner = "coverage run --source=paramiko -m pytest" # Strip SSH_AUTH_SOCK from parent env to avoid pollution by interactive # users. + # TODO: once pytest coverage plugin works, see if there's a pytest-native + # way to handle the env stuff too, then we can remove these tasks entirely + # in favor of just "run pytest"? env = dict(os.environ) if 'SSH_AUTH_SOCK' in env: del env['SSH_AUTH_SOCK'] - cmd = "{0} test.py {1}".format(runner, flags) + cmd = "{} {}".format(runner, opts) + # NOTE: we have a pytest.ini and tend to use that over PYTEST_ADDOPTS. ctx.run(cmd, pty=True, env=env, replace_env=True) @task -def coverage(ctx): - ctx.run("coverage run --source=paramiko test.py --verbose") +def coverage(ctx, opts=""): + """ + Execute all tests (normal and slow) with coverage enabled. + """ + return test(ctx, coverage=True, include_slow=True, opts=opts) # Until we stop bundling docs w/ releases. Need to discover use cases first. diff --git a/test.py b/test.py deleted file mode 100755 index 7849c149..00000000 --- a/test.py +++ /dev/null @@ -1,196 +0,0 @@ -#!/usr/bin/env python - -# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com> -# -# This file is part of paramiko. -# -# Paramiko is free software; you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation; either version 2.1 of the License, or (at your option) -# any later version. -# -# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY -# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR -# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. - -""" -do the unit tests! -""" - -# flake8: noqa -import os -import re -import sys -import unittest -from optparse import OptionParser -import paramiko -import threading -from paramiko.py3compat import PY2 - -sys.path.append('tests') - -from tests.test_message import MessageTest -from tests.test_file import BufferedFileTest -from tests.test_buffered_pipe import BufferedPipeTest -from tests.test_util import UtilTest -from tests.test_hostkeys import HostKeysTest -from tests.test_pkey import KeyTest -from tests.test_kex import KexTest -from tests.test_packetizer import PacketizerTest -from tests.test_auth import AuthTest -from tests.test_transport import TransportTest -from tests.test_ssh_exception import NoValidConnectionsErrorTest -from tests.test_client import SSHClientTest -from test_client import SSHClientTest # XXX why shadow the above import? -from test_gssapi import GSSAPITest -from test_ssh_gss import GSSAuthTest -from test_kex_gss import GSSKexTest - -default_host = 'localhost' -default_user = os.environ.get('USER', 'nobody') -default_keyfile = os.path.join(os.environ.get('HOME', '/'), '.ssh/id_rsa') -default_passwd = None - - -def iter_suite_tests(suite): - """Return all tests in a suite, recursing through nested suites""" - for item in suite._tests: - if isinstance(item, unittest.TestCase): - yield item - elif isinstance(item, unittest.TestSuite): - for r in iter_suite_tests(item): - yield r - else: - raise Exception('unknown object %r inside test suite %r' - % (item, suite)) - - -def filter_suite_by_re(suite, pattern): - result = unittest.TestSuite() - filter_re = re.compile(pattern) - for test in iter_suite_tests(suite): - if filter_re.search(test.id()): - result.addTest(test) - return result - - -def main(): - parser = OptionParser('usage: %prog [options]') - parser.add_option('--verbose', action='store_true', dest='verbose', default=False, - help='verbose display (one line per test)') - parser.add_option('--no-pkey', action='store_false', dest='use_pkey', default=True, - help='skip RSA/DSS private key tests (which can take a while)') - parser.add_option('--no-transport', action='store_false', dest='use_transport', default=True, - help='skip transport tests (which can take a while)') - parser.add_option('--no-sftp', action='store_false', dest='use_sftp', default=True, - help='skip SFTP client/server tests, which can be slow') - parser.add_option('--no-big-file', action='store_false', dest='use_big_file', default=True, - help='skip big file SFTP tests, which are slow as molasses') - parser.add_option('--gssapi-test', action='store_true', dest='gssapi_test', default=False, - help='Test the used APIs for GSS-API / SSPI authentication') - parser.add_option('--test-gssauth', action='store_true', dest='test_gssauth', default=False, - help='Test GSS-API / SSPI authentication for SSHv2. To test this, you need kerberos a infrastructure.\ - Note: Paramiko needs access to your krb5.keytab file. Make it readable for Paramiko or\ - copy the used key to another file and set the environment variable KRB5_KTNAME to this file.') - parser.add_option('--test-gssapi-keyex', action='store_true', dest='test_gsskex', default=False, - help='Test GSS-API / SSPI authenticated iffie-Hellman Key Exchange and user\ - authentication. To test this, you need kerberos a infrastructure.\ - Note: Paramiko needs access to your krb5.keytab file. Make it readable for Paramiko or\ - copy the used key to another file and set the environment variable KRB5_KTNAME to this file.') - parser.add_option('-R', action='store_false', dest='use_loopback_sftp', default=True, - help='perform SFTP tests against a remote server (by default, SFTP tests ' + - 'are done through a loopback socket)') - parser.add_option('-H', '--sftp-host', dest='hostname', type='string', default=default_host, - metavar='<host>', - help='[with -R] host for remote sftp tests (default: %s)' % default_host) - parser.add_option('-U', '--sftp-user', dest='username', type='string', default=default_user, - metavar='<username>', - help='[with -R] username for remote sftp tests (default: %s)' % default_user) - parser.add_option('-K', '--sftp-key', dest='keyfile', type='string', default=default_keyfile, - metavar='<keyfile>', - help='[with -R] location of private key for remote sftp tests (default: %s)' % - default_keyfile) - parser.add_option('-P', '--sftp-passwd', dest='password', type='string', default=default_passwd, - metavar='<password>', - help='[with -R] (optional) password to unlock the private key for remote sftp tests') - parser.add_option('--krb5_principal', dest='krb5_principal', type='string', - metavar='<krb5_principal>', - help='The krb5 principal (your username) for GSS-API / SSPI authentication') - parser.add_option('--targ_name', dest='targ_name', type='string', - metavar='<targ_name>', - help='Target name for GSS-API / SSPI authentication.\ - This is the hosts name you are running the test on in the kerberos database.') - parser.add_option('--server_mode', action='store_true', dest='server_mode', default=False, - help='Usage with --gssapi-test. Test the available GSS-API / SSPI server mode to.\ - Note: you need to have access to the kerberos keytab file.') - - options, args = parser.parse_args() - - # setup logging - paramiko.util.log_to_file('test.log') - - if options.use_sftp: - from tests.test_sftp import SFTPTest - if options.use_loopback_sftp: - SFTPTest.init_loopback() - else: - SFTPTest.init(options.hostname, options.username, options.keyfile, options.password) - if not options.use_big_file: - SFTPTest.set_big_file_test(False) - if options.use_big_file: - from tests.test_sftp_big import BigSFTPTest - - suite = unittest.TestSuite() - suite.addTest(unittest.makeSuite(MessageTest)) - suite.addTest(unittest.makeSuite(BufferedFileTest)) - suite.addTest(unittest.makeSuite(BufferedPipeTest)) - suite.addTest(unittest.makeSuite(UtilTest)) - suite.addTest(unittest.makeSuite(HostKeysTest)) - if options.use_pkey: - suite.addTest(unittest.makeSuite(KeyTest)) - suite.addTest(unittest.makeSuite(KexTest)) - suite.addTest(unittest.makeSuite(PacketizerTest)) - if options.use_transport: - suite.addTest(unittest.makeSuite(AuthTest)) - suite.addTest(unittest.makeSuite(TransportTest)) - suite.addTest(unittest.makeSuite(NoValidConnectionsErrorTest)) - suite.addTest(unittest.makeSuite(SSHClientTest)) - if options.use_sftp: - suite.addTest(unittest.makeSuite(SFTPTest)) - if options.use_big_file: - suite.addTest(unittest.makeSuite(BigSFTPTest)) - if options.gssapi_test: - GSSAPITest.init(options.targ_name, options.server_mode) - suite.addTest(unittest.makeSuite(GSSAPITest)) - if options.test_gssauth: - GSSAuthTest.init(options.krb5_principal, options.targ_name) - suite.addTest(unittest.makeSuite(GSSAuthTest)) - if options.test_gsskex: - GSSKexTest.init(options.krb5_principal, options.targ_name) - suite.addTest(unittest.makeSuite(GSSKexTest)) - verbosity = 1 - if options.verbose: - verbosity = 2 - - runner = unittest.TextTestRunner(verbosity=verbosity) - if len(args) > 0: - filter = '|'.join(args) - suite = filter_suite_by_re(suite, filter) - result = runner.run(suite) - # Clean up stale threads from poorly cleaned-up tests. - # TODO: make that not a problem, jeez - for thread in threading.enumerate(): - if thread is not threading.currentThread(): - thread.join(timeout=1) - # Exit correctly - if not result.wasSuccessful(): - sys.exit(1) - - -if __name__ == '__main__': - main() diff --git a/tests/__init__.py b/tests/__init__.py index 8878f14d..be1d2daa 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,36 +1 @@ -# Copyright (C) 2017 Martin Packman <gzlist@googlemail.com> -# -# This file is part of paramiko. -# -# Paramiko is free software; you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation; either version 2.1 of the License, or (at your option) -# any later version. -# -# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY -# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR -# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. - -"""Base classes and helpers for testing paramiko.""" - -import unittest - -from paramiko.py3compat import ( - builtins, - ) - - -def skipUnlessBuiltin(name): - """Skip decorated test if builtin name does not exist.""" - if getattr(builtins, name, None) is None: - skip = getattr(unittest, "skip", None) - if skip is None: - # Python 2.6 pseudo-skip - return lambda func: None - return skip("No builtin " + repr(name)) - return lambda func: func +# This file's just here so test modules can use explicit-relative imports. diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..d1967a73 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,103 @@ +import logging +import os +import shutil +import threading + +import pytest +from paramiko import RSAKey, SFTPServer, SFTP, Transport + +from .loop import LoopSocket +from .stub_sftp import StubServer, StubSFTPServer +from .util import _support + + +# TODO: not a huge fan of conftest.py files, see if we can move these somewhere +# 'nicer'. + + +# Perform logging by default; pytest will capture and thus hide it normally, +# presenting it on error/failure. (But also allow turning it off when doing +# very pinpoint debugging - e.g. using breakpoints, so you don't want output +# hiding enabled, but also don't want all the logging to gum up the terminal.) +if not os.environ.get('DISABLE_LOGGING', False): + logging.basicConfig( + level=logging.DEBUG, + # Also make sure to set up timestamping for more sanity when debugging. + format="[%(relativeCreated)s]\t%(levelname)s:%(name)s:%(message)s", + datefmt="%H:%M:%S", + ) + + +def make_sftp_folder(): + """ + Ensure expected target temp folder exists on the remote end. + + Will clean it out if it already exists. + """ + # TODO: go back to using the sftp functionality itself for folder setup so + # we can test against live SFTP servers again someday. (Not clear if anyone + # is/was using the old capability for such, though...) + # TODO: something that would play nicer with concurrent testing (but + # probably e.g. using thread ID or UUIDs or something; not the "count up + # until you find one not used!" crap from before...) + # TODO: if we want to lock ourselves even harder into localhost-only + # testing (probably not?) could use tempdir modules for this for improved + # safety. Then again...why would someone have such a folder??? + path = os.environ.get('TEST_FOLDER', 'paramiko-test-target') + # Forcibly nuke this directory locally, since at the moment, the below + # fixtures only ever run with a locally scoped stub test server. + shutil.rmtree(path, ignore_errors=True) + # Then create it anew, again locally, for the same reason. + os.mkdir(path) + return path + + +@pytest.fixture#(scope='session') +def sftp_server(): + """ + Set up an in-memory SFTP server thread. Yields the client Transport/socket. + + The resulting client Transport (along with all the server components) will + be the same object throughout the test session; the `sftp` fixture then + creates new higher level client objects wrapped around the client + Transport, as necessary. + """ + # Sockets & transports + socks = LoopSocket() + sockc = LoopSocket() + sockc.link(socks) + tc = Transport(sockc) + ts = Transport(socks) + # Auth + host_key = RSAKey.from_private_key_file(_support('test_rsa.key')) + ts.add_server_key(host_key) + # Server setup + event = threading.Event() + server = StubServer() + ts.set_subsystem_handler('sftp', SFTPServer, StubSFTPServer) + ts.start_server(event, server) + # Wait (so client has time to connect? Not sure. Old.) + event.wait(1.0) + # Make & yield connection. + tc.connect(username='slowdive', password='pygmalion') + yield tc + # TODO: any need for shutdown? Why didn't old suite do so? Or was that the + # point of the "join all threads from threading module" crap in test.py? + + +@pytest.fixture +def sftp(sftp_server): + """ + Yield an SFTP client connected to the global in-session SFTP server thread. + """ + # Client setup + client = SFTP.from_transport(sftp_server) + # Work in 'remote' folder setup (as it wants to use the client) + # TODO: how cleanest to make this available to tests? Doing it this way is + # marginally less bad than the previous 'global'-using setup, but not by + # much? + client.FOLDER = make_sftp_folder() + # Yield client to caller + yield client + # Clean up - as in make_sftp_folder, we assume local-only exec for now. + shutil.rmtree(client.FOLDER, ignore_errors=True) diff --git a/tests/loop.py b/tests/loop.py index e805ad96..6c432867 100644 --- a/tests/loop.py +++ b/tests/loop.py @@ -16,11 +16,9 @@ # along with Paramiko; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. -""" -... -""" +import socket +import threading -import threading, socket from paramiko.common import asbytes diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py index 0d673091..19545865 100644 --- a/tests/stub_sftp.py +++ b/tests/stub_sftp.py @@ -22,6 +22,7 @@ A stub SFTP server for loopback SFTP testing. import os import sys + from paramiko import ( ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, SFTPHandle, SFTP_OK, SFTP_FAILURE, AUTH_SUCCESSFUL, OPEN_SUCCEEDED, diff --git a/tests/test_auth.py b/tests/test_auth.py index e78397c6..4eade610 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -31,8 +31,10 @@ from paramiko import ( ) from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL from paramiko.py3compat import u -from tests.loop import LoopSocket -from tests.util import test_path + +from .loop import LoopSocket +from .util import _support, slow + _pwd = u('\u2022') @@ -40,7 +42,7 @@ _pwd = u('\u2022') class NullServer (ServerInterface): paranoid_did_password = False paranoid_did_public_key = False - paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key')) + paranoid_key = DSSKey.from_private_key_file(_support('test_dss.key')) def get_allowed_auths(self, username): if username == 'slowdive': @@ -118,7 +120,7 @@ class AuthTest (unittest.TestCase): self.sockc.close() def start_server(self): - host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = RSAKey.from_private_key_file(_support('test_rsa.key')) self.public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) self.event = threading.Event() @@ -170,7 +172,7 @@ class AuthTest (unittest.TestCase): self.tc.connect(hostkey=self.public_host_key) remain = self.tc.auth_password(username='paranoid', password='paranoid') self.assertEqual(['publickey'], remain) - key = DSSKey.from_private_key_file(test_path('test_dss.key')) + key = DSSKey.from_private_key_file(_support('test_dss.key')) remain = self.tc.auth_publickey(username='paranoid', key=key) self.assertEqual([], remain) self.verify_finished() @@ -238,6 +240,7 @@ class AuthTest (unittest.TestCase): etype, evalue, etb = sys.exc_info() self.assertTrue(issubclass(etype, AuthenticationException)) + @slow def test_9_auth_non_responsive(self): """ verify that authentication times out if server takes to long to diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py index eeb4d0ad..03616c55 100644 --- a/tests/test_buffered_pipe.py +++ b/tests/test_buffered_pipe.py @@ -23,6 +23,7 @@ Some unit tests for BufferedPipe. import threading import time import unittest + from paramiko.buffered_pipe import BufferedPipe, PipeTimeout from paramiko import pipe from paramiko.py3compat import b diff --git a/tests/test_client.py b/tests/test_client.py index 7058f394..0483cf6f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -23,22 +23,27 @@ Some unit tests for SSHClient. from __future__ import with_statement import gc +import os import platform import socket -from tempfile import mkstemp import threading +import time import unittest -import weakref import warnings -import os -import time -from tests.util import test_path +import weakref +from tempfile import mkstemp import paramiko from paramiko.pkey import PublicBlob from paramiko.common import PY2 from paramiko.ssh_exception import SSHException, AuthenticationException +from .util import _support, slow + + +requires_gss_auth = unittest.skipUnless( + paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available" +) FINGERPRINTS = { 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c', @@ -131,10 +136,10 @@ class SSHClientTest (unittest.TestCase): allowed_keys = FINGERPRINTS.keys() self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks) - keypath = test_path('test_rsa.key') + keypath = _support('test_rsa.key') host_key = paramiko.RSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) - keypath = test_path('test_ecdsa_256.key') + keypath = _support('test_ecdsa_256.key') host_key = paramiko.ECDSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob) @@ -154,7 +159,7 @@ class SSHClientTest (unittest.TestCase): run_kwargs[key] = kwargs.pop(key, None) # Server setup threading.Thread(target=self._run, kwargs=run_kwargs).start() - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) # Client setup @@ -200,22 +205,22 @@ class SSHClientTest (unittest.TestCase): """ verify that SSHClient works with a DSA key. """ - self._test_connection(key_filename=test_path('test_dss.key')) + self._test_connection(key_filename=_support('test_dss.key')) def test_client_rsa(self): """ verify that SSHClient works with an RSA key. """ - self._test_connection(key_filename=test_path('test_rsa.key')) + self._test_connection(key_filename=_support('test_rsa.key')) def test_2_5_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ - self._test_connection(key_filename=test_path('test_ecdsa_256.key')) + self._test_connection(key_filename=_support('test_ecdsa_256.key')) def test_client_ed25519(self): - self._test_connection(key_filename=test_path('test_ed25519.key')) + self._test_connection(key_filename=_support('test_ed25519.key')) def test_3_multiple_key_files(self): """ @@ -238,7 +243,7 @@ class SSHClientTest (unittest.TestCase): try: self._test_connection( key_filename=[ - test_path('test_{0}.key'.format(x)) for x in attempt + _support('test_{}.key'.format(x)) for x in attempt ], allowed_keys=[types_[x] for x in accept], ) @@ -256,7 +261,7 @@ class SSHClientTest (unittest.TestCase): # various platforms trigger different errors here >_< self.assertRaises(SSHException, self._test_connection, - key_filename=[test_path('test_rsa.key')], + key_filename=[_support('test_rsa.key')], allowed_keys=['ecdsa-sha2-nistp256'], ) @@ -266,8 +271,8 @@ class SSHClientTest (unittest.TestCase): # server-side behavior is 100% identical.) # NOTE: only bothered whipping up one cert per overall class/family. for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'): - cert_name = 'test_{0}.key-cert.pub'.format(type_) - cert_path = test_path(os.path.join('cert_support', cert_name)) + cert_name = 'test_{}.key-cert.pub'.format(type_) + cert_path = _support(os.path.join('cert_support', cert_name)) self._test_connection( key_filename=cert_path, public_blob=PublicBlob.from_file(cert_path), @@ -281,12 +286,12 @@ class SSHClientTest (unittest.TestCase): # that a specific cert was found, along with regular authorization # succeeding proving that the overall flow works. for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'): - key_name = 'test_{0}.key'.format(type_) - key_path = test_path(os.path.join('cert_support', key_name)) + key_name = 'test_{}.key'.format(type_) + key_path = _support(os.path.join('cert_support', key_name)) self._test_connection( key_filename=key_path, public_blob=PublicBlob.from_file( - '{0}-cert.pub'.format(key_path) + '{}-cert.pub'.format(key_path) ), ) @@ -302,7 +307,7 @@ class SSHClientTest (unittest.TestCase): """ threading.Thread(target=self._run).start() hostname = '[%s]:%d' % (self.addr, self.port) - key_file = test_path('test_ecdsa_256.key') + key_file = _support('test_ecdsa_256.key') public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) self.tc = paramiko.SSHClient() @@ -325,7 +330,7 @@ class SSHClientTest (unittest.TestCase): """ warnings.filterwarnings('ignore', 'tempnam.*') - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) fd, localname = mkstemp() os.close(fd) @@ -405,7 +410,7 @@ class SSHClientTest (unittest.TestCase): """ # Start the thread with a 1 second wait. threading.Thread(target=self._run, kwargs={'delay': 1}).start() - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() @@ -432,12 +437,13 @@ class SSHClientTest (unittest.TestCase): # 'television' as per tests/test_pkey.py). NOTE: must use # key_filename, loading the actual key here with PKey will except # immediately; we're testing the try/except crap within Client. - key_filename=[test_path('test_rsa_password.key')], + key_filename=[_support('test_rsa_password.key')], # Actual password for default 'slowdive' user password='pygmalion', ) self._test_connection(**kwargs) + @slow def test_9_auth_timeout(self): """ verify that the SSHClient has a configurable auth timeout @@ -450,27 +456,25 @@ class SSHClientTest (unittest.TestCase): auth_timeout=0.5, ) + @requires_gss_auth def test_10_auth_trickledown_gsskex(self): """ Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding """ - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest kwargs = dict( gss_kex=True, - key_filename=[test_path('test_rsa.key')], + key_filename=[_support('test_rsa.key')], ) self._test_connection(**kwargs) + @requires_gss_auth def test_11_auth_trickledown_gssauth(self): """ Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding """ - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest kwargs = dict( gss_auth=True, - key_filename=[test_path('test_rsa.key')], + key_filename=[_support('test_rsa.key')], ) self._test_connection(**kwargs) @@ -489,14 +493,13 @@ class SSHClientTest (unittest.TestCase): password='pygmalion', **self.connect_kwargs ) + @requires_gss_auth def test_13_reject_policy_gsskex(self): """ verify that SSHClient's RejectPolicy works, even if gssapi-keyex was enabled but not used. """ # Test for a bug present in paramiko versions released before 2017-08-01 - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest threading.Thread(target=self._run).start() self.tc = paramiko.SSHClient() @@ -532,7 +535,7 @@ class SSHClientTest (unittest.TestCase): self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) - host_key = ktype.from_private_key_file(test_path(kfile)) + host_key = ktype.from_private_key_file(_support(kfile)) known_hosts = self.tc.get_host_keys() known_hosts.add(hostname, host_key.get_name(), host_key) @@ -556,10 +559,7 @@ class SSHClientTest (unittest.TestCase): def test_host_key_negotiation_4(self): self._client_host_key_good(paramiko.RSAKey, 'test_rsa.key') - def test_update_environment(self): - """ - Verify that environment variables can be set by the client. - """ + def _setup_for_env(self): threading.Thread(target=self._run).start() self.tc = paramiko.SSHClient() @@ -571,6 +571,11 @@ class SSHClientTest (unittest.TestCase): self.assertTrue(self.event.isSet()) self.assertTrue(self.ts.is_active()) + def test_update_environment(self): + """ + Verify that environment variables can be set by the client. + """ + self._setup_for_env() target_env = {b'A': b'B', b'C': b'd'} self.tc.exec_command('yes', environment=target_env) @@ -578,19 +583,20 @@ class SSHClientTest (unittest.TestCase): self.assertEqual(target_env, getattr(schan, 'env', {})) schan.close() - # Cannot use assertRaises in context manager mode as it is not supported - # in Python 2.6. - try: + @unittest.skip("Clients normally fail silently, thus so do we, for now") + def test_env_update_failures(self): + self._setup_for_env() + with self.assertRaises(SSHException) as manager: # Verify that a rejection by the server can be detected self.tc.exec_command('yes', environment={b'INVALID_ENV': b''}) - except SSHException as e: - self.assertTrue('INVALID_ENV' in str(e), - 'Expected variable name in error message') - self.assertTrue(isinstance(e.args[1], SSHException), - 'Expected original SSHException in exception') - else: - self.assertFalse(False, 'SSHException was not thrown.') - + self.assertTrue( + 'INVALID_ENV' in str(manager.exception), + 'Expected variable name in error message' + ) + self.assertTrue( + isinstance(manager.exception.args[1], SSHException), + 'Expected original SSHException in exception' + ) def test_missing_key_policy_accepts_classes_or_instances(self): """ diff --git a/tests/test_file.py b/tests/test_file.py index b33ecd51..3d2c94e6 100755..100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -27,7 +27,7 @@ from paramiko.common import linefeed_byte, crlf, cr_byte from paramiko.file import BufferedFile from paramiko.py3compat import BytesIO -from tests import skipUnlessBuiltin +from .util import needs_builtin class LoopbackFile (BufferedFile): @@ -198,13 +198,13 @@ class BufferedFileTest (unittest.TestCase): f.write(text) self.assertEqual(f.read(), text.encode("utf-8")) - @skipUnlessBuiltin('memoryview') + @needs_builtin('memoryview') def test_write_bytearray(self): with LoopbackFile('rb+') as f: f.write(bytearray(12)) self.assertEqual(f.read(), 12 * b"\0") - @skipUnlessBuiltin('buffer') + @needs_builtin('buffer') def test_write_buffer(self): data = 3 * b"pretend giant block of data\n" offsets = range(0, len(data), 8) @@ -213,7 +213,7 @@ class BufferedFileTest (unittest.TestCase): f.write(buffer(data, offset, 8)) self.assertEqual(f.read(), data) - @skipUnlessBuiltin('memoryview') + @needs_builtin('memoryview') def test_write_memoryview(self): data = 3 * b"pretend giant block of data\n" offsets = range(0, len(data), 8) diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index bc220108..d4b632be 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -25,14 +25,17 @@ Test the used APIs for GSS-API / SSPI authentication import unittest import socket +from .util import needs_gssapi + +@needs_gssapi class GSSAPITest(unittest.TestCase): - @staticmethod - def init(hostname=None, srv_mode=False): - global krb5_mech, targ_name, server_mode - krb5_mech = "1.2.840.113554.1.2.2" - targ_name = hostname - server_mode = srv_mode + def setup(): + # TODO: these vars should all come from os.environ or whatever the + # approved pytest method is for runtime-configuring test data. + self.krb5_mech = "1.2.840.113554.1.2.2" + self.targ_name = "hostname" + self.server_mode = False def test_1_pyasn1(self): """ @@ -40,9 +43,9 @@ class GSSAPITest(unittest.TestCase): """ from pyasn1.type.univ import ObjectIdentifier from pyasn1.codec.der import encoder, decoder - oid = encoder.encode(ObjectIdentifier(krb5_mech)) + oid = encoder.encode(ObjectIdentifier(self.krb5_mech)) mech, __ = decoder.decode(oid) - self.assertEquals(krb5_mech, mech.__str__()) + self.assertEquals(self.krb5_mech, mech.__str__()) def test_2_gssapi_sspi(self): """ @@ -61,7 +64,7 @@ class GSSAPITest(unittest.TestCase): mic_msg = b"G'day Mate!" if _API == "MIT": - if server_mode: + if self.server_mode: gss_flags = (gssapi.C_PROT_READY_FLAG, gssapi.C_INTEG_FLAG, gssapi.C_MUTUAL_FLAG, @@ -73,13 +76,13 @@ class GSSAPITest(unittest.TestCase): # Initialize a GSS-API context. ctx = gssapi.Context() ctx.flags = gss_flags - krb5_oid = gssapi.OID.mech_from_string(krb5_mech) - target_name = gssapi.Name("host@" + targ_name, + krb5_oid = gssapi.OID.mech_from_string(self.krb5_mech) + target_name = gssapi.Name("host@" + self.targ_name, gssapi.C_NT_HOSTBASED_SERVICE) gss_ctxt = gssapi.InitContext(peer_name=target_name, mech_type=krb5_oid, req_flags=ctx.flags) - if server_mode: + if self.server_mode: c_token = gss_ctxt.step(c_token) gss_ctxt_status = gss_ctxt.established self.assertEquals(False, gss_ctxt_status) @@ -99,7 +102,7 @@ class GSSAPITest(unittest.TestCase): # Build MIC mic_token = gss_ctxt.get_mic(mic_msg) - if server_mode: + if self.server_mode: # Check MIC status = gss_srv_ctxt.verify_mic(mic_msg, mic_token) self.assertEquals(0, status) @@ -110,11 +113,11 @@ class GSSAPITest(unittest.TestCase): sspicon.ISC_REQ_DELEGATE ) # Initialize a GSS-API context. - target_name = "host/" + socket.getfqdn(targ_name) + target_name = "host/" + socket.getfqdn(self.targ_name) gss_ctxt = sspi.ClientAuth("Kerberos", scflags=gss_flags, targetspn=target_name) - if server_mode: + if self.server_mode: error, token = gss_ctxt.authorize(c_token) c_token = token[0].Buffer self.assertEquals(0, error) diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py index 2c7ceeb9..cd75f8ab 100644 --- a/tests/test_hostkeys.py +++ b/tests/test_hostkeys.py @@ -23,6 +23,7 @@ Some unit tests for HostKeys. from binascii import hexlify import os import unittest + import paramiko from paramiko.py3compat import decodebytes diff --git a/tests/test_kex.py b/tests/test_kex.py index b7f588f7..b5808e7e 100644 --- a/tests/test_kex.py +++ b/tests/test_kex.py @@ -24,14 +24,15 @@ from binascii import hexlify, unhexlify import os import unittest +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import ec + import paramiko.util from paramiko.kex_group1 import KexGroup1 from paramiko.kex_gex import KexGex, KexGexSHA256 from paramiko import Message from paramiko.common import byte_chr from paramiko.kex_ecdh_nist import KexNistp256 -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.asymmetric import ec def dummy_urandom(n): diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index af342a7c..025d1faa 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -31,6 +31,8 @@ import unittest import paramiko +from .util import needs_gssapi + class NullServer (paramiko.ServerInterface): @@ -57,6 +59,7 @@ class NullServer (paramiko.ServerInterface): return True +@needs_gssapi class GSSKexTest(unittest.TestCase): @staticmethod def init(username, hostname): diff --git a/tests/test_message.py b/tests/test_message.py index f18cae90..645b0509 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -21,6 +21,7 @@ Some unit tests for ssh protocol message blocks. """ import unittest + from paramiko.message import Message from paramiko.common import byte_chr, zero_byte diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py index 02173292..414b7e38 100644 --- a/tests/test_packetizer.py +++ b/tests/test_packetizer.py @@ -27,11 +27,12 @@ from hashlib import sha1 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes -from tests.loop import LoopSocket - from paramiko import Message, Packetizer, util from paramiko.common import byte_chr, zero_byte +from .loop import LoopSocket + + x55 = byte_chr(0x55) x1f = byte_chr(0x1f) diff --git a/tests/test_pkey.py b/tests/test_pkey.py index a9205a18..c745232b 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -30,7 +30,8 @@ import base64 from paramiko import RSAKey, DSSKey, ECDSAKey, Ed25519Key, Message, util from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2 -from tests.util import test_path +from .util import _support + # from openssh's ssh-keygen PUB_RSA = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c=' @@ -138,7 +139,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(exp, key) def test_2_load_rsa(self): - key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + key = RSAKey.from_private_key_file(_support('test_rsa.key')) self.assertEqual('ssh-rsa', key.get_name()) exp_rsa = b(FINGER_RSA.split()[1].replace(':', '')) my_rsa = hexlify(key.get_fingerprint()) @@ -154,7 +155,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, key2) def test_3_load_rsa_password(self): - key = RSAKey.from_private_key_file(test_path('test_rsa_password.key'), 'television') + key = RSAKey.from_private_key_file(_support('test_rsa_password.key'), 'television') self.assertEqual('ssh-rsa', key.get_name()) exp_rsa = b(FINGER_RSA.split()[1].replace(':', '')) my_rsa = hexlify(key.get_fingerprint()) @@ -163,7 +164,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(1024, key.get_bits()) def test_4_load_dss(self): - key = DSSKey.from_private_key_file(test_path('test_dss.key')) + key = DSSKey.from_private_key_file(_support('test_dss.key')) self.assertEqual('ssh-dss', key.get_name()) exp_dss = b(FINGER_DSS.split()[1].replace(':', '')) my_dss = hexlify(key.get_fingerprint()) @@ -179,7 +180,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, key2) def test_5_load_dss_password(self): - key = DSSKey.from_private_key_file(test_path('test_dss_password.key'), 'television') + key = DSSKey.from_private_key_file(_support('test_dss_password.key'), 'television') self.assertEqual('ssh-dss', key.get_name()) exp_dss = b(FINGER_DSS.split()[1].replace(':', '')) my_dss = hexlify(key.get_fingerprint()) @@ -189,7 +190,7 @@ class KeyTest(unittest.TestCase): def test_6_compare_rsa(self): # verify that the private & public keys compare equal - key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + key = RSAKey.from_private_key_file(_support('test_rsa.key')) self.assertEqual(key, key) pub = RSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -198,7 +199,7 @@ class KeyTest(unittest.TestCase): def test_7_compare_dss(self): # verify that the private & public keys compare equal - key = DSSKey.from_private_key_file(test_path('test_dss.key')) + key = DSSKey.from_private_key_file(_support('test_dss.key')) self.assertEqual(key, key) pub = DSSKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -207,7 +208,7 @@ class KeyTest(unittest.TestCase): def test_8_sign_rsa(self): # verify that the rsa private key can sign and verify - key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + key = RSAKey.from_private_key_file(_support('test_rsa.key')) msg = key.sign_ssh_data(b'ice weasels') self.assertTrue(type(msg) is Message) msg.rewind() @@ -220,7 +221,7 @@ class KeyTest(unittest.TestCase): def test_9_sign_dss(self): # verify that the dss private key can sign and verify - key = DSSKey.from_private_key_file(test_path('test_dss.key')) + key = DSSKey.from_private_key_file(_support('test_dss.key')) msg = key.sign_ssh_data(b'ice weasels') self.assertTrue(type(msg) is Message) msg.rewind() @@ -275,7 +276,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key.get_name(), 'ecdsa-sha2-nistp521') def test_10_load_ecdsa_256(self): - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_256.key')) self.assertEqual('ecdsa-sha2-nistp256', key.get_name()) exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) @@ -291,7 +292,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, key2) def test_11_load_ecdsa_password_256(self): - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_256.key'), b'television') + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_password_256.key'), b'television') self.assertEqual('ecdsa-sha2-nistp256', key.get_name()) exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) @@ -301,7 +302,7 @@ class KeyTest(unittest.TestCase): def test_12_compare_ecdsa_256(self): # verify that the private & public keys compare equal - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_256.key')) self.assertEqual(key, key) pub = ECDSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -310,7 +311,7 @@ class KeyTest(unittest.TestCase): def test_13_sign_ecdsa_256(self): # verify that the rsa private key can sign and verify - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_256.key')) msg = key.sign_ssh_data(b'ice weasels') self.assertTrue(type(msg) is Message) msg.rewind() @@ -325,7 +326,7 @@ class KeyTest(unittest.TestCase): self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg)) def test_14_load_ecdsa_384(self): - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_384.key')) self.assertEqual('ecdsa-sha2-nistp384', key.get_name()) exp_ecdsa = b(FINGER_ECDSA_384.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) @@ -341,7 +342,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, key2) def test_15_load_ecdsa_password_384(self): - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_384.key'), b'television') + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_password_384.key'), b'television') self.assertEqual('ecdsa-sha2-nistp384', key.get_name()) exp_ecdsa = b(FINGER_ECDSA_384.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) @@ -351,7 +352,7 @@ class KeyTest(unittest.TestCase): def test_16_compare_ecdsa_384(self): # verify that the private & public keys compare equal - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_384.key')) self.assertEqual(key, key) pub = ECDSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -360,7 +361,7 @@ class KeyTest(unittest.TestCase): def test_17_sign_ecdsa_384(self): # verify that the rsa private key can sign and verify - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_384.key')) msg = key.sign_ssh_data(b'ice weasels') self.assertTrue(type(msg) is Message) msg.rewind() @@ -375,7 +376,7 @@ class KeyTest(unittest.TestCase): self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg)) def test_18_load_ecdsa_521(self): - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_521.key')) self.assertEqual('ecdsa-sha2-nistp521', key.get_name()) exp_ecdsa = b(FINGER_ECDSA_521.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) @@ -394,7 +395,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, key2) def test_19_load_ecdsa_password_521(self): - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_521.key'), b'television') + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_password_521.key'), b'television') self.assertEqual('ecdsa-sha2-nistp521', key.get_name()) exp_ecdsa = b(FINGER_ECDSA_521.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) @@ -404,7 +405,7 @@ class KeyTest(unittest.TestCase): def test_20_compare_ecdsa_521(self): # verify that the private & public keys compare equal - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_521.key')) self.assertEqual(key, key) pub = ECDSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -413,7 +414,7 @@ class KeyTest(unittest.TestCase): def test_21_sign_ecdsa_521(self): # verify that the rsa private key can sign and verify - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_521.key')) msg = key.sign_ssh_data(b'ice weasels') self.assertTrue(type(msg) is Message) msg.rewind() @@ -429,7 +430,7 @@ class KeyTest(unittest.TestCase): def test_salt_size(self): # Read an existing encrypted private key - file_ = test_path('test_rsa_password.key') + file_ = _support('test_rsa_password.key') password = 'television' newfile = file_ + '.new' newpassword = 'radio' @@ -446,20 +447,20 @@ class KeyTest(unittest.TestCase): os.remove(newfile) def test_stringification(self): - key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + key = RSAKey.from_private_key_file(_support('test_rsa.key')) comparable = TEST_KEY_BYTESTR_2 if PY2 else TEST_KEY_BYTESTR_3 self.assertEqual(str(key), comparable) def test_ed25519(self): - key1 = Ed25519Key.from_private_key_file(test_path('test_ed25519.key')) + key1 = Ed25519Key.from_private_key_file(_support('test_ed25519.key')) key2 = Ed25519Key.from_private_key_file( - test_path('test_ed25519_password.key'), b'abc123' + _support('test_ed25519_password.key'), b'abc123' ) self.assertNotEqual(key1.asbytes(), key2.asbytes()) def test_ed25519_compare(self): # verify that the private & public keys compare equal - key = Ed25519Key.from_private_key_file(test_path('test_ed25519.key')) + key = Ed25519Key.from_private_key_file(_support('test_ed25519.key')) self.assertEqual(key, key) pub = Ed25519Key(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -467,14 +468,14 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, pub) def test_ed25519_load_from_file_obj(self): - with open(test_path('test_ed25519.key')) as pkey_fileobj: + with open(_support('test_ed25519.key')) as pkey_fileobj: key = Ed25519Key.from_private_key(pkey_fileobj) self.assertEqual(key, key) self.assertTrue(key.can_sign()) def test_keyfile_is_actually_encrypted(self): # Read an existing encrypted private key - file_ = test_path('test_rsa_password.key') + file_ = _support('test_rsa_password.key') password = 'television' newfile = file_ + '.new' newpassword = 'radio' @@ -492,10 +493,10 @@ class KeyTest(unittest.TestCase): # test_client.py; this and nearby cert tests are more about the gritty # details. # PKey.load_certificate - key_path = test_path(os.path.join('cert_support', 'test_rsa.key')) + key_path = _support(os.path.join('cert_support', 'test_rsa.key')) key = RSAKey.from_private_key_file(key_path) self.assertTrue(key.public_blob is None) - cert_path = test_path( + cert_path = _support( os.path.join('cert_support', 'test_rsa.key-cert.pub') ) key.load_certificate(cert_path) @@ -514,10 +515,10 @@ class KeyTest(unittest.TestCase): self.assertEqual(msg.get_int64(), 1234) # Prevented from loading certificate that doesn't match - key_path = test_path(os.path.join('cert_support', 'test_ed25519.key')) + key_path = _support(os.path.join('cert_support', 'test_ed25519.key')) key1 = Ed25519Key.from_private_key_file(key_path) self.assertRaises( ValueError, key1.load_certificate, - test_path('test_rsa.key-cert.pub'), + _support('test_rsa.key-cert.pub'), ) diff --git a/tests/test_sftp.py b/tests/test_sftp.py index b3c7bf98..09a50453 100755..100644 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -32,16 +32,19 @@ import warnings from binascii import hexlify from tempfile import mkstemp +import pytest + import paramiko +import paramiko.util from paramiko.py3compat import PY2, b, u, StringIO from paramiko.common import o777, o600, o666, o644 -from tests import skipUnlessBuiltin -from tests.stub_sftp import StubServer, StubSFTPServer -from tests.loop import LoopSocket -from tests.util import test_path -import paramiko.util from paramiko.sftp_attr import SFTPAttributes +from .util import needs_builtin +from .stub_sftp import StubServer, StubSFTPServer +from .util import _support, slow + + ARTICLE = ''' Insulin sensitivity and liver insulin receptor structure in ducks from two genera @@ -81,303 +84,201 @@ decreased compared with chicken. # Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation byte" NON_UTF8_DATA = b'\xC3\xC3' -FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000') - -sftp = None -tc = None -g_big_file_test = True -# we need to use eval(compile()) here because Py3.2 doesn't support the 'u' marker for unicode -# this test is the only line in the entire program that has to be treated specially to support Py3.2 -unicode_folder = eval(compile(r"u'\u00fcnic\u00f8de'" if PY2 else r"'\u00fcnic\u00f8de'", 'test_sftp.py', 'eval')) +unicode_folder = u'\u00fcnic\u00f8de' if PY2 else '\u00fcnic\u00f8de' utf8_folder = b'/\xc3\xbcnic\xc3\xb8\x64\x65' -def get_sftp(): - global sftp - return sftp - - -class SFTPTest (unittest.TestCase): - @staticmethod - def init(hostname, username, keyfile, passwd): - global sftp, tc - - t = paramiko.Transport(hostname) - tc = t - try: - key = paramiko.RSAKey.from_private_key_file(keyfile, passwd) - except paramiko.PasswordRequiredException: - sys.stderr.write('\n\nparamiko.RSAKey.from_private_key_file REQUIRES PASSWORD.\n') - sys.stderr.write('You have two options:\n') - sys.stderr.write('* Use the "-K" option to point to a different (non-password-protected)\n') - sys.stderr.write(' private key file.\n') - sys.stderr.write('* Use the "-P" option to provide the password needed to unlock this private\n') - sys.stderr.write(' key.\n') - sys.stderr.write('\n') - sys.exit(1) - try: - t.connect(username=username, pkey=key) - except paramiko.SSHException: - t.close() - sys.stderr.write('\n\nparamiko.Transport.connect FAILED.\n') - sys.stderr.write('There are several possible reasons why it might fail so quickly:\n\n') - sys.stderr.write('* The host to connect to (%s) is not a valid SSH server.\n' % hostname) - sys.stderr.write(' (Use the "-H" option to change the host.)\n') - sys.stderr.write('* The username to auth as (%s) is invalid.\n' % username) - sys.stderr.write(' (Use the "-U" option to change the username.)\n') - sys.stderr.write('* The private key given (%s) is not accepted by the server.\n' % keyfile) - sys.stderr.write(' (Use the "-K" option to provide a different key file.)\n') - sys.stderr.write('\n') - sys.exit(1) - sftp = paramiko.SFTP.from_transport(t) - - @staticmethod - def init_loopback(): - global sftp, tc - - socks = LoopSocket() - sockc = LoopSocket() - sockc.link(socks) - tc = paramiko.Transport(sockc) - ts = paramiko.Transport(socks) - - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) - ts.add_server_key(host_key) - event = threading.Event() - server = StubServer() - ts.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer) - ts.start_server(event, server) - tc.connect(username='slowdive', password='pygmalion') - event.wait(1.0) - - sftp = paramiko.SFTP.from_transport(tc) - - @staticmethod - def set_big_file_test(onoff): - global g_big_file_test - g_big_file_test = onoff - - def setUp(self): - global FOLDER - for i in range(1000): - FOLDER = FOLDER[:-3] + '%03d' % i - try: - sftp.mkdir(FOLDER) - break - except (IOError, OSError): - pass - - def tearDown(self): - #sftp.chdir() - sftp.rmdir(FOLDER) - - def test_1_file(self): +@slow +class TestSFTP(object): + def test_1_file(self, sftp): """ verify that we can create a file. """ - f = sftp.open(FOLDER + '/test', 'w') + f = sftp.open(sftp.FOLDER + '/test', 'w') try: - self.assertEqual(f.stat().st_size, 0) + assert f.stat().st_size == 0 finally: f.close() - sftp.remove(FOLDER + '/test') + sftp.remove(sftp.FOLDER + '/test') - def test_2_close(self): + def test_2_close(self, sftp): """ - verify that closing the sftp session doesn't do anything bad, and that - a new one can be opened. + Verify that SFTP session close() causes a socket error on next action. """ - global sftp sftp.close() - try: - sftp.open(FOLDER + '/test2', 'w') - self.fail('expected exception') - except: - pass - sftp = paramiko.SFTP.from_transport(tc) + with pytest.raises(socket.error, match='Socket is closed'): + sftp.open(sftp.FOLDER + '/test2', 'w') - def test_2_sftp_can_be_used_as_context_manager(self): + def test_2_sftp_can_be_used_as_context_manager(self, sftp): """ verify that the sftp session is closed when exiting the context manager """ - global sftp with sftp: pass - try: - sftp.open(FOLDER + '/test2', 'w') - self.fail('expected exception') - except (EOFError, socket.error): - pass - finally: - sftp = paramiko.SFTP.from_transport(tc) + with pytest.raises(socket.error, match='Socket is closed'): + sftp.open(sftp.FOLDER + '/test2', 'w') - def test_3_write(self): + def test_3_write(self, sftp): """ verify that a file can be created and written, and the size is correct. """ try: - with sftp.open(FOLDER + '/duck.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f: f.write(ARTICLE) - self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) + assert sftp.stat(sftp.FOLDER + '/duck.txt').st_size == 1483 finally: - sftp.remove(FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/duck.txt') - def test_3_sftp_file_can_be_used_as_context_manager(self): + def test_3_sftp_file_can_be_used_as_context_manager(self, sftp): """ verify that an opened file can be used as a context manager """ try: - with sftp.open(FOLDER + '/duck.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f: f.write(ARTICLE) - self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) + assert sftp.stat(sftp.FOLDER + '/duck.txt').st_size == 1483 finally: - sftp.remove(FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/duck.txt') - def test_4_append(self): + def test_4_append(self, sftp): """ verify that a file can be opened for append, and tell() still works. """ try: - with sftp.open(FOLDER + '/append.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/append.txt', 'w') as f: f.write('first line\nsecond line\n') - self.assertEqual(f.tell(), 23) + assert f.tell() == 23 - with sftp.open(FOLDER + '/append.txt', 'a+') as f: + with sftp.open(sftp.FOLDER + '/append.txt', 'a+') as f: f.write('third line!!!\n') - self.assertEqual(f.tell(), 37) - self.assertEqual(f.stat().st_size, 37) + assert f.tell() == 37 + assert f.stat().st_size == 37 f.seek(-26, f.SEEK_CUR) - self.assertEqual(f.readline(), 'second line\n') + assert f.readline() == 'second line\n' finally: - sftp.remove(FOLDER + '/append.txt') + sftp.remove(sftp.FOLDER + '/append.txt') - def test_5_rename(self): + def test_5_rename(self, sftp): """ verify that renaming a file works. """ try: - with sftp.open(FOLDER + '/first.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/first.txt', 'w') as f: f.write('content!\n') - sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt') - try: - sftp.open(FOLDER + '/first.txt', 'r') - self.assertTrue(False, 'no exception on reading nonexistent file') - except IOError: - pass - with sftp.open(FOLDER + '/second.txt', 'r') as f: + sftp.rename(sftp.FOLDER + '/first.txt', sftp.FOLDER + '/second.txt') + with pytest.raises(IOError, match='No such file'): + sftp.open(sftp.FOLDER + '/first.txt', 'r') + with sftp.open(sftp.FOLDER + '/second.txt', 'r') as f: f.seek(-6, f.SEEK_END) - self.assertEqual(u(f.read(4)), 'tent') + assert u(f.read(4)) == 'tent' finally: + # TODO: this is gross, make some sort of 'remove if possible' / 'rm + # -f' a-like, jeez try: - sftp.remove(FOLDER + '/first.txt') + sftp.remove(sftp.FOLDER + '/first.txt') except: pass try: - sftp.remove(FOLDER + '/second.txt') + sftp.remove(sftp.FOLDER + '/second.txt') except: pass - def test_5a_posix_rename(self): + def test_5a_posix_rename(self, sftp): """Test posix-rename@openssh.com protocol extension.""" try: # first check that the normal rename works as specified - with sftp.open(FOLDER + '/a', 'w') as f: + with sftp.open(sftp.FOLDER + '/a', 'w') as f: f.write('one') - sftp.rename(FOLDER + '/a', FOLDER + '/b') - with sftp.open(FOLDER + '/a', 'w') as f: + sftp.rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b') + with sftp.open(sftp.FOLDER + '/a', 'w') as f: f.write('two') - try: - sftp.rename(FOLDER + '/a', FOLDER + '/b') - self.assertTrue(False, 'no exception when rename-ing onto existing file') - except (OSError, IOError): - pass + with pytest.raises(IOError): # actual message seems generic + sftp.rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b') # now check with the posix_rename - sftp.posix_rename(FOLDER + '/a', FOLDER + '/b') - with sftp.open(FOLDER + '/b', 'r') as f: + sftp.posix_rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b') + with sftp.open(sftp.FOLDER + '/b', 'r') as f: data = u(f.read()) - self.assertEqual('two', data, "Contents of renamed file not the same as original file") + err = "Contents of renamed file not the same as original file" + assert 'two' == data, err finally: try: - sftp.remove(FOLDER + '/a') + sftp.remove(sftp.FOLDER + '/a') except: pass try: - sftp.remove(FOLDER + '/b') + sftp.remove(sftp.FOLDER + '/b') except: pass - def test_6_folder(self): + def test_6_folder(self, sftp): """ create a temporary folder, verify that we can create a file in it, then remove the folder and verify that we can't create a file in it anymore. """ - sftp.mkdir(FOLDER + '/subfolder') - sftp.open(FOLDER + '/subfolder/test', 'w').close() - sftp.remove(FOLDER + '/subfolder/test') - sftp.rmdir(FOLDER + '/subfolder') - try: - sftp.open(FOLDER + '/subfolder/test') - # shouldn't be able to create that file - self.assertTrue(False, 'no exception at dummy file creation') - except IOError: - pass + sftp.mkdir(sftp.FOLDER + '/subfolder') + sftp.open(sftp.FOLDER + '/subfolder/test', 'w').close() + sftp.remove(sftp.FOLDER + '/subfolder/test') + sftp.rmdir(sftp.FOLDER + '/subfolder') + # shouldn't be able to create that file if dir removed + with pytest.raises(IOError, match="No such file"): + sftp.open(sftp.FOLDER + '/subfolder/test') - def test_7_listdir(self): + def test_7_listdir(self, sftp): """ verify that a folder can be created, a bunch of files can be placed in it, and those files show up in sftp.listdir. """ try: - sftp.open(FOLDER + '/duck.txt', 'w').close() - sftp.open(FOLDER + '/fish.txt', 'w').close() - sftp.open(FOLDER + '/tertiary.py', 'w').close() - - x = sftp.listdir(FOLDER) - self.assertEqual(len(x), 3) - self.assertTrue('duck.txt' in x) - self.assertTrue('fish.txt' in x) - self.assertTrue('tertiary.py' in x) - self.assertTrue('random' not in x) + sftp.open(sftp.FOLDER + '/duck.txt', 'w').close() + sftp.open(sftp.FOLDER + '/fish.txt', 'w').close() + sftp.open(sftp.FOLDER + '/tertiary.py', 'w').close() + + x = sftp.listdir(sftp.FOLDER) + assert len(x) == 3 + assert 'duck.txt' in x + assert 'fish.txt' in x + assert 'tertiary.py' in x + assert 'random' not in x finally: - sftp.remove(FOLDER + '/duck.txt') - sftp.remove(FOLDER + '/fish.txt') - sftp.remove(FOLDER + '/tertiary.py') + sftp.remove(sftp.FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/fish.txt') + sftp.remove(sftp.FOLDER + '/tertiary.py') - def test_7_5_listdir_iter(self): + def test_7_5_listdir_iter(self, sftp): """ listdir_iter version of above test """ try: - sftp.open(FOLDER + '/duck.txt', 'w').close() - sftp.open(FOLDER + '/fish.txt', 'w').close() - sftp.open(FOLDER + '/tertiary.py', 'w').close() - - x = [x.filename for x in sftp.listdir_iter(FOLDER)] - self.assertEqual(len(x), 3) - self.assertTrue('duck.txt' in x) - self.assertTrue('fish.txt' in x) - self.assertTrue('tertiary.py' in x) - self.assertTrue('random' not in x) + sftp.open(sftp.FOLDER + '/duck.txt', 'w').close() + sftp.open(sftp.FOLDER + '/fish.txt', 'w').close() + sftp.open(sftp.FOLDER + '/tertiary.py', 'w').close() + + x = [x.filename for x in sftp.listdir_iter(sftp.FOLDER)] + assert len(x) == 3 + assert 'duck.txt' in x + assert 'fish.txt' in x + assert 'tertiary.py' in x + assert 'random' not in x finally: - sftp.remove(FOLDER + '/duck.txt') - sftp.remove(FOLDER + '/fish.txt') - sftp.remove(FOLDER + '/tertiary.py') + sftp.remove(sftp.FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/fish.txt') + sftp.remove(sftp.FOLDER + '/tertiary.py') - def test_8_setstat(self): + def test_8_setstat(self, sftp): """ verify that the setstat functions (chown, chmod, utime, truncate) work. """ try: - with sftp.open(FOLDER + '/special', 'w') as f: + with sftp.open(sftp.FOLDER + '/special', 'w') as f: f.write('x' * 1024) - stat = sftp.stat(FOLDER + '/special') - sftp.chmod(FOLDER + '/special', (stat.st_mode & ~o777) | o600) - stat = sftp.stat(FOLDER + '/special') + stat = sftp.stat(sftp.FOLDER + '/special') + sftp.chmod(sftp.FOLDER + '/special', (stat.st_mode & ~o777) | o600) + stat = sftp.stat(sftp.FOLDER + '/special') expected_mode = o600 if sys.platform == 'win32': # chmod not really functional on windows @@ -385,35 +286,35 @@ class SFTPTest (unittest.TestCase): if sys.platform == 'cygwin': # even worse. expected_mode = o644 - self.assertEqual(stat.st_mode & o777, expected_mode) - self.assertEqual(stat.st_size, 1024) + assert stat.st_mode & o777 == expected_mode + assert stat.st_size == 1024 mtime = stat.st_mtime - 3600 atime = stat.st_atime - 1800 - sftp.utime(FOLDER + '/special', (atime, mtime)) - stat = sftp.stat(FOLDER + '/special') - self.assertEqual(stat.st_mtime, mtime) + sftp.utime(sftp.FOLDER + '/special', (atime, mtime)) + stat = sftp.stat(sftp.FOLDER + '/special') + assert stat.st_mtime == mtime if sys.platform not in ('win32', 'cygwin'): - self.assertEqual(stat.st_atime, atime) + assert stat.st_atime == atime # can't really test chown, since we'd have to know a valid uid. - sftp.truncate(FOLDER + '/special', 512) - stat = sftp.stat(FOLDER + '/special') - self.assertEqual(stat.st_size, 512) + sftp.truncate(sftp.FOLDER + '/special', 512) + stat = sftp.stat(sftp.FOLDER + '/special') + assert stat.st_size == 512 finally: - sftp.remove(FOLDER + '/special') + sftp.remove(sftp.FOLDER + '/special') - def test_9_fsetstat(self): + def test_9_fsetstat(self, sftp): """ verify that the fsetstat functions (chown, chmod, utime, truncate) work on open files. """ try: - with sftp.open(FOLDER + '/special', 'w') as f: + with sftp.open(sftp.FOLDER + '/special', 'w') as f: f.write('x' * 1024) - with sftp.open(FOLDER + '/special', 'r+') as f: + with sftp.open(sftp.FOLDER + '/special', 'r+') as f: stat = f.stat() f.chmod((stat.st_mode & ~o777) | o600) stat = f.stat() @@ -425,26 +326,26 @@ class SFTPTest (unittest.TestCase): if sys.platform == 'cygwin': # even worse. expected_mode = o644 - self.assertEqual(stat.st_mode & o777, expected_mode) - self.assertEqual(stat.st_size, 1024) + assert stat.st_mode & o777 == expected_mode + assert stat.st_size == 1024 mtime = stat.st_mtime - 3600 atime = stat.st_atime - 1800 f.utime((atime, mtime)) stat = f.stat() - self.assertEqual(stat.st_mtime, mtime) + assert stat.st_mtime == mtime if sys.platform not in ('win32', 'cygwin'): - self.assertEqual(stat.st_atime, atime) + assert stat.st_atime == atime # can't really test chown, since we'd have to know a valid uid. f.truncate(512) stat = f.stat() - self.assertEqual(stat.st_size, 512) + assert stat.st_size == 512 finally: - sftp.remove(FOLDER + '/special') + sftp.remove(sftp.FOLDER + '/special') - def test_A_readline_seek(self): + def test_A_readline_seek(self, sftp): """ create a text file and write a bunch of text into it. then count the lines in the file, and seek around to retrieve particular lines. this should @@ -452,10 +353,10 @@ class SFTPTest (unittest.TestCase): buffering is reset on 'seek'. """ try: - with sftp.open(FOLDER + '/duck.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f: f.write(ARTICLE) - with sftp.open(FOLDER + '/duck.txt', 'r+') as f: + with sftp.open(sftp.FOLDER + '/duck.txt', 'r+') as f: line_number = 0 loc = 0 pos_list = [] @@ -463,35 +364,35 @@ class SFTPTest (unittest.TestCase): line_number += 1 pos_list.append(loc) loc = f.tell() - self.assertTrue(f.seekable()) + assert f.seekable() f.seek(pos_list[6], f.SEEK_SET) - self.assertEqual(f.readline(), 'Nouzilly, France.\n') + assert f.readline(), 'Nouzilly == France.\n' f.seek(pos_list[17], f.SEEK_SET) - self.assertEqual(f.readline()[:4], 'duck') + assert f.readline()[:4] == 'duck' f.seek(pos_list[10], f.SEEK_SET) - self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n') + assert f.readline() == 'duck types were equally resistant to exogenous insulin compared with chicken.\n' finally: - sftp.remove(FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/duck.txt') - def test_B_write_seek(self): + def test_B_write_seek(self, sftp): """ create a text file, seek back and change part of it, and verify that the changes worked. """ try: - with sftp.open(FOLDER + '/testing.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/testing.txt', 'w') as f: f.write('hello kitty.\n') f.seek(-5, f.SEEK_CUR) f.write('dd') - self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13) - with sftp.open(FOLDER + '/testing.txt', 'r') as f: + assert sftp.stat(sftp.FOLDER + '/testing.txt').st_size == 13 + with sftp.open(sftp.FOLDER + '/testing.txt', 'r') as f: data = f.read(20) - self.assertEqual(data, b'hello kiddy.\n') + assert data == b'hello kiddy.\n' finally: - sftp.remove(FOLDER + '/testing.txt') + sftp.remove(sftp.FOLDER + '/testing.txt') - def test_C_symlink(self): + def test_C_symlink(self, sftp): """ create a symlink and then check that lstat doesn't follow it. """ @@ -500,97 +401,85 @@ class SFTPTest (unittest.TestCase): return try: - with sftp.open(FOLDER + '/original.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/original.txt', 'w') as f: f.write('original\n') - sftp.symlink('original.txt', FOLDER + '/link.txt') - self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt') + sftp.symlink('original.txt', sftp.FOLDER + '/link.txt') + assert sftp.readlink(sftp.FOLDER + '/link.txt') == 'original.txt' - with sftp.open(FOLDER + '/link.txt', 'r') as f: - self.assertEqual(f.readlines(), ['original\n']) + with sftp.open(sftp.FOLDER + '/link.txt', 'r') as f: + assert f.readlines() == ['original\n'] cwd = sftp.normalize('.') if cwd[-1] == '/': cwd = cwd[:-1] - abs_path = cwd + '/' + FOLDER + '/original.txt' - sftp.symlink(abs_path, FOLDER + '/link2.txt') - self.assertEqual(abs_path, sftp.readlink(FOLDER + '/link2.txt')) + abs_path = cwd + '/' + sftp.FOLDER + '/original.txt' + sftp.symlink(abs_path, sftp.FOLDER + '/link2.txt') + assert abs_path == sftp.readlink(sftp.FOLDER + '/link2.txt') - self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12) - self.assertEqual(sftp.stat(FOLDER + '/link.txt').st_size, 9) + assert sftp.lstat(sftp.FOLDER + '/link.txt').st_size == 12 + assert sftp.stat(sftp.FOLDER + '/link.txt').st_size == 9 # the sftp server may be hiding extra path members from us, so the # length may be longer than we expect: - self.assertTrue(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path)) - self.assertEqual(sftp.stat(FOLDER + '/link2.txt').st_size, 9) - self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9) + assert sftp.lstat(sftp.FOLDER + '/link2.txt').st_size >= len(abs_path) + assert sftp.stat(sftp.FOLDER + '/link2.txt').st_size == 9 + assert sftp.stat(sftp.FOLDER + '/original.txt').st_size == 9 finally: try: - sftp.remove(FOLDER + '/link.txt') + sftp.remove(sftp.FOLDER + '/link.txt') except: pass try: - sftp.remove(FOLDER + '/link2.txt') + sftp.remove(sftp.FOLDER + '/link2.txt') except: pass try: - sftp.remove(FOLDER + '/original.txt') + sftp.remove(sftp.FOLDER + '/original.txt') except: pass - def test_D_flush_seek(self): + def test_D_flush_seek(self, sftp): """ verify that buffered writes are automatically flushed on seek. """ try: - with sftp.open(FOLDER + '/happy.txt', 'w', 1) as f: + with sftp.open(sftp.FOLDER + '/happy.txt', 'w', 1) as f: f.write('full line.\n') f.write('partial') f.seek(9, f.SEEK_SET) f.write('?\n') - with sftp.open(FOLDER + '/happy.txt', 'r') as f: - self.assertEqual(f.readline(), u('full line?\n')) - self.assertEqual(f.read(7), b'partial') + with sftp.open(sftp.FOLDER + '/happy.txt', 'r') as f: + assert f.readline() == u('full line?\n') + assert f.read(7) == b'partial' finally: try: - sftp.remove(FOLDER + '/happy.txt') + sftp.remove(sftp.FOLDER + '/happy.txt') except: pass - def test_E_realpath(self): + def test_E_realpath(self, sftp): """ test that realpath is returning something non-empty and not an error. """ pwd = sftp.normalize('.') - self.assertTrue(len(pwd) > 0) - f = sftp.normalize('./' + FOLDER) - self.assertTrue(len(f) > 0) - self.assertEqual(os.path.join(pwd, FOLDER), f) + assert len(pwd) > 0 + f = sftp.normalize('./' + sftp.FOLDER) + assert len(f) > 0 + assert os.path.join(pwd, sftp.FOLDER) == f - def test_F_mkdir(self): + def test_F_mkdir(self, sftp): """ verify that mkdir/rmdir work. """ - try: - sftp.mkdir(FOLDER + '/subfolder') - except: - self.assertTrue(False, 'exception creating subfolder') - try: - sftp.mkdir(FOLDER + '/subfolder') - self.assertTrue(False, 'no exception overwriting subfolder') - except IOError: - pass - try: - sftp.rmdir(FOLDER + '/subfolder') - except: - self.assertTrue(False, 'exception removing subfolder') - try: - sftp.rmdir(FOLDER + '/subfolder') - self.assertTrue(False, 'no exception removing nonexistent subfolder') - except IOError: - pass + sftp.mkdir(sftp.FOLDER + '/subfolder') + with pytest.raises(IOError): # generic msg only + sftp.mkdir(sftp.FOLDER + '/subfolder') + sftp.rmdir(sftp.FOLDER + '/subfolder') + with pytest.raises(IOError, match="No such file"): + sftp.rmdir(sftp.FOLDER + '/subfolder') - def test_G_chdir(self): + def test_G_chdir(self, sftp): """ verify that chdir/getcwd work. """ @@ -598,35 +487,35 @@ class SFTPTest (unittest.TestCase): if root[-1] != '/': root += '/' try: - sftp.mkdir(FOLDER + '/alpha') - sftp.chdir(FOLDER + '/alpha') + sftp.mkdir(sftp.FOLDER + '/alpha') + sftp.chdir(sftp.FOLDER + '/alpha') sftp.mkdir('beta') - self.assertEqual(root + FOLDER + '/alpha', sftp.getcwd()) - self.assertEqual(['beta'], sftp.listdir('.')) + assert root + sftp.FOLDER + '/alpha' == sftp.getcwd() + assert ['beta'] == sftp.listdir('.') sftp.chdir('beta') with sftp.open('fish', 'w') as f: f.write('hello\n') sftp.chdir('..') - self.assertEqual(['fish'], sftp.listdir('beta')) + assert ['fish'] == sftp.listdir('beta') sftp.chdir('..') - self.assertEqual(['fish'], sftp.listdir('alpha/beta')) + assert ['fish'] == sftp.listdir('alpha/beta') finally: sftp.chdir(root) try: - sftp.unlink(FOLDER + '/alpha/beta/fish') + sftp.unlink(sftp.FOLDER + '/alpha/beta/fish') except: pass try: - sftp.rmdir(FOLDER + '/alpha/beta') + sftp.rmdir(sftp.FOLDER + '/alpha/beta') except: pass try: - sftp.rmdir(FOLDER + '/alpha') + sftp.rmdir(sftp.FOLDER + '/alpha') except: pass - def test_H_get_put(self): + def test_H_get_put(self, sftp): """ verify that get/put work. """ @@ -641,103 +530,102 @@ class SFTPTest (unittest.TestCase): def progress_callback(x, y): saved_progress.append((x, y)) - sftp.put(localname, FOLDER + '/bunny.txt', progress_callback) + sftp.put(localname, sftp.FOLDER + '/bunny.txt', progress_callback) - with sftp.open(FOLDER + '/bunny.txt', 'rb') as f: - self.assertEqual(text, f.read(128)) - self.assertEqual([(41, 41)], saved_progress) + with sftp.open(sftp.FOLDER + '/bunny.txt', 'rb') as f: + assert text == f.read(128) + assert [(41, 41)] == saved_progress os.unlink(localname) fd, localname = mkstemp() os.close(fd) saved_progress = [] - sftp.get(FOLDER + '/bunny.txt', localname, progress_callback) + sftp.get(sftp.FOLDER + '/bunny.txt', localname, progress_callback) with open(localname, 'rb') as f: - self.assertEqual(text, f.read(128)) - self.assertEqual([(41, 41)], saved_progress) + assert text == f.read(128) + assert [(41, 41)] == saved_progress os.unlink(localname) - sftp.unlink(FOLDER + '/bunny.txt') + sftp.unlink(sftp.FOLDER + '/bunny.txt') - def test_I_check(self): + def test_I_check(self, sftp): """ verify that file.check() works against our own server. (it's an sftp extension that we support, and may be the only ones who support it.) """ - with sftp.open(FOLDER + '/kitty.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/kitty.txt', 'w') as f: f.write('here kitty kitty' * 64) try: - with sftp.open(FOLDER + '/kitty.txt', 'r') as f: + with sftp.open(sftp.FOLDER + '/kitty.txt', 'r') as f: sum = f.check('sha1') - self.assertEqual('91059CFC6615941378D413CB5ADAF4C5EB293402', u(hexlify(sum)).upper()) + assert '91059CFC6615941378D413CB5ADAF4C5EB293402' == u(hexlify(sum)).upper() sum = f.check('md5', 0, 512) - self.assertEqual('93DE4788FCA28D471516963A1FE3856A', u(hexlify(sum)).upper()) + assert '93DE4788FCA28D471516963A1FE3856A' == u(hexlify(sum)).upper() sum = f.check('md5', 0, 0, 510) - self.assertEqual('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6', - u(hexlify(sum)).upper()) + assert u(hexlify(sum)).upper() == 'EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6' # noqa finally: - sftp.unlink(FOLDER + '/kitty.txt') + sftp.unlink(sftp.FOLDER + '/kitty.txt') - def test_J_x_flag(self): + def test_J_x_flag(self, sftp): """ verify that the 'x' flag works when opening a file. """ - sftp.open(FOLDER + '/unusual.txt', 'wx').close() + sftp.open(sftp.FOLDER + '/unusual.txt', 'wx').close() try: try: - sftp.open(FOLDER + '/unusual.txt', 'wx') + sftp.open(sftp.FOLDER + '/unusual.txt', 'wx') self.fail('expected exception') except IOError: pass finally: - sftp.unlink(FOLDER + '/unusual.txt') + sftp.unlink(sftp.FOLDER + '/unusual.txt') - def test_K_utf8(self): + def test_K_utf8(self, sftp): """ verify that unicode strings are encoded into utf8 correctly. """ - with sftp.open(FOLDER + '/something', 'w') as f: + with sftp.open(sftp.FOLDER + '/something', 'w') as f: f.write('okay') try: - sftp.rename(FOLDER + '/something', FOLDER + '/' + unicode_folder) - sftp.open(b(FOLDER) + utf8_folder, 'r') + sftp.rename(sftp.FOLDER + '/something', sftp.FOLDER + '/' + unicode_folder) + sftp.open(b(sftp.FOLDER) + utf8_folder, 'r') except Exception as e: self.fail('exception ' + str(e)) - sftp.unlink(b(FOLDER) + utf8_folder) + sftp.unlink(b(sftp.FOLDER) + utf8_folder) - def test_L_utf8_chdir(self): - sftp.mkdir(FOLDER + '/' + unicode_folder) + def test_L_utf8_chdir(self, sftp): + sftp.mkdir(sftp.FOLDER + '/' + unicode_folder) try: - sftp.chdir(FOLDER + '/' + unicode_folder) + sftp.chdir(sftp.FOLDER + '/' + unicode_folder) with sftp.open('something', 'w') as f: f.write('okay') sftp.unlink('something') finally: sftp.chdir() - sftp.rmdir(FOLDER + '/' + unicode_folder) + sftp.rmdir(sftp.FOLDER + '/' + unicode_folder) - def test_M_bad_readv(self): + def test_M_bad_readv(self, sftp): """ verify that readv at the end of the file doesn't essplode. """ - sftp.open(FOLDER + '/zero', 'w').close() + sftp.open(sftp.FOLDER + '/zero', 'w').close() try: - with sftp.open(FOLDER + '/zero', 'r') as f: + with sftp.open(sftp.FOLDER + '/zero', 'r') as f: f.readv([(0, 12)]) - with sftp.open(FOLDER + '/zero', 'r') as f: + with sftp.open(sftp.FOLDER + '/zero', 'r') as f: file_size = f.stat().st_size f.prefetch(file_size) f.read(100) finally: - sftp.unlink(FOLDER + '/zero') + sftp.unlink(sftp.FOLDER + '/zero') - def test_N_put_without_confirm(self): + def test_N_put_without_confirm(self, sftp): """ verify that get/put work without confirmation. """ @@ -752,138 +640,132 @@ class SFTPTest (unittest.TestCase): def progress_callback(x, y): saved_progress.append((x, y)) - res = sftp.put(localname, FOLDER + '/bunny.txt', progress_callback, False) + res = sftp.put(localname, sftp.FOLDER + '/bunny.txt', progress_callback, False) - self.assertEqual(SFTPAttributes().attr, res.attr) + assert SFTPAttributes().attr == res.attr - with sftp.open(FOLDER + '/bunny.txt', 'r') as f: - self.assertEqual(text, f.read(128)) - self.assertEqual((41, 41), saved_progress[-1]) + with sftp.open(sftp.FOLDER + '/bunny.txt', 'r') as f: + assert text == f.read(128) + assert (41, 41) == saved_progress[-1] os.unlink(localname) - sftp.unlink(FOLDER + '/bunny.txt') + sftp.unlink(sftp.FOLDER + '/bunny.txt') - def test_O_getcwd(self): + def test_O_getcwd(self, sftp): """ verify that chdir/getcwd work. """ - self.assertEqual(None, sftp.getcwd()) + assert sftp.getcwd() == None root = sftp.normalize('.') if root[-1] != '/': root += '/' try: - sftp.mkdir(FOLDER + '/alpha') - sftp.chdir(FOLDER + '/alpha') - self.assertEqual('/' + FOLDER + '/alpha', sftp.getcwd()) + sftp.mkdir(sftp.FOLDER + '/alpha') + sftp.chdir(sftp.FOLDER + '/alpha') + assert sftp.getcwd() == '/' + sftp.FOLDER + '/alpha' finally: sftp.chdir(root) try: - sftp.rmdir(FOLDER + '/alpha') + sftp.rmdir(sftp.FOLDER + '/alpha') except: pass - def XXX_test_M_seek_append(self): + def XXX_test_M_seek_append(self, sftp): """ verify that seek does't affect writes during append. does not work except through paramiko. :( openssh fails. """ try: - with sftp.open(FOLDER + '/append.txt', 'a') as f: + with sftp.open(sftp.FOLDER + '/append.txt', 'a') as f: f.write('first line\nsecond line\n') f.seek(11, f.SEEK_SET) f.write('third line\n') - with sftp.open(FOLDER + '/append.txt', 'r') as f: - self.assertEqual(f.stat().st_size, 34) - self.assertEqual(f.readline(), 'first line\n') - self.assertEqual(f.readline(), 'second line\n') - self.assertEqual(f.readline(), 'third line\n') + with sftp.open(sftp.FOLDER + '/append.txt', 'r') as f: + assert f.stat().st_size == 34 + assert f.readline() == 'first line\n' + assert f.readline() == 'second line\n' + assert f.readline() == 'third line\n' finally: - sftp.remove(FOLDER + '/append.txt') + sftp.remove(sftp.FOLDER + '/append.txt') - def test_putfo_empty_file(self): + def test_putfo_empty_file(self, sftp): """ Send an empty file and confirm it is sent. """ - target = FOLDER + '/empty file.txt' + target = sftp.FOLDER + '/empty file.txt' stream = StringIO() try: attrs = sftp.putfo(stream, target) # the returned attributes should not be null - self.assertNotEqual(attrs, None) + assert attrs is not None finally: sftp.remove(target) - - def test_N_file_with_percent(self): + # TODO: this test doesn't actually fail if the regression (removing '%' + # expansion to '%%' within sftp.py's def _log()) is removed - stacktraces + # appear but they're clearly emitted from subthreads that have no error + # handling. No point running it until that is fixed somehow. + @pytest.mark.skip("Doesn't prove anything right now") + def test_N_file_with_percent(self, sftp): """ verify that we can create a file with a '%' in the filename. ( it needs to be properly escaped by _log() ) """ - self.assertTrue( paramiko.util.get_logger("paramiko").handlers, "This unit test requires logging to be enabled" ) - f = sftp.open(FOLDER + '/test%file', 'w') + f = sftp.open(sftp.FOLDER + '/test%file', 'w') try: - self.assertEqual(f.stat().st_size, 0) + assert f.stat().st_size == 0 finally: f.close() - sftp.remove(FOLDER + '/test%file') - + sftp.remove(sftp.FOLDER + '/test%file') - def test_O_non_utf8_data(self): + def test_O_non_utf8_data(self, sftp): """Test write() and read() of non utf8 data""" try: - with sftp.open('%s/nonutf8data' % FOLDER, 'w') as f: + with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'w') as f: f.write(NON_UTF8_DATA) - with sftp.open('%s/nonutf8data' % FOLDER, 'r') as f: + with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'r') as f: data = f.read() - self.assertEqual(data, NON_UTF8_DATA) - with sftp.open('%s/nonutf8data' % FOLDER, 'wb') as f: + assert data == NON_UTF8_DATA + with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'wb') as f: f.write(NON_UTF8_DATA) - with sftp.open('%s/nonutf8data' % FOLDER, 'rb') as f: + with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'rb') as f: data = f.read() - self.assertEqual(data, NON_UTF8_DATA) + assert data == NON_UTF8_DATA finally: - sftp.remove('%s/nonutf8data' % FOLDER) + sftp.remove('%s/nonutf8data' % sftp.FOLDER) - def test_sftp_attributes_empty_str(self): + def test_sftp_attributes_empty_str(self, sftp): sftp_attributes = SFTPAttributes() - self.assertEqual(str(sftp_attributes), "?--------- 1 0 0 0 (unknown date) ?") + assert str(sftp_attributes) == "?--------- 1 0 0 0 (unknown date) ?" - @skipUnlessBuiltin('buffer') - def test_write_buffer(self): + @needs_builtin('buffer') + def test_write_buffer(self, sftp): """Test write() using a buffer instance.""" data = 3 * b'A potentially large block of data to chunk up.\n' try: - with sftp.open('%s/write_buffer' % FOLDER, 'wb') as f: + with sftp.open('%s/write_buffer' % sftp.FOLDER, 'wb') as f: for offset in range(0, len(data), 8): f.write(buffer(data, offset, 8)) - with sftp.open('%s/write_buffer' % FOLDER, 'rb') as f: - self.assertEqual(f.read(), data) + with sftp.open('%s/write_buffer' % sftp.FOLDER, 'rb') as f: + assert f.read() == data finally: - sftp.remove('%s/write_buffer' % FOLDER) + sftp.remove('%s/write_buffer' % sftp.FOLDER) - @skipUnlessBuiltin('memoryview') - def test_write_memoryview(self): + @needs_builtin('memoryview') + def test_write_memoryview(self, sftp): """Test write() using a memoryview instance.""" data = 3 * b'A potentially large block of data to chunk up.\n' try: - with sftp.open('%s/write_memoryview' % FOLDER, 'wb') as f: + with sftp.open('%s/write_memoryview' % sftp.FOLDER, 'wb') as f: view = memoryview(data) for offset in range(0, len(data), 8): f.write(view[offset:offset+8]) - with sftp.open('%s/write_memoryview' % FOLDER, 'rb') as f: - self.assertEqual(f.read(), data) + with sftp.open('%s/write_memoryview' % sftp.FOLDER, 'rb') as f: + assert f.read() == data finally: - sftp.remove('%s/write_memoryview' % FOLDER) - - -if __name__ == '__main__': - SFTPTest.init_loopback() - # logging is required by test_N_file_with_percent - paramiko.util.log_to_file('test_sftp.log') - from unittest import main - main() + sftp.remove('%s/write_memoryview' % sftp.FOLDER) diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py index cfad5682..a659098d 100644 --- a/tests/test_sftp_big.py +++ b/tests/test_sftp_big.py @@ -31,94 +31,75 @@ import time import unittest from paramiko.common import o660 -from tests.test_sftp import get_sftp -FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000') +from .util import slow -class BigSFTPTest (unittest.TestCase): - - def setUp(self): - global FOLDER - sftp = get_sftp() - for i in range(1000): - FOLDER = FOLDER[:-3] + '%03d' % i - try: - sftp.mkdir(FOLDER) - break - except (IOError, OSError): - pass - - def tearDown(self): - sftp = get_sftp() - sftp.rmdir(FOLDER) - - def test_1_lots_of_files(self): +@slow +class TestBigSFTP(object): + def test_1_lots_of_files(self, sftp): """ create a bunch of files over the same session. """ - sftp = get_sftp() numfiles = 100 try: for i in range(numfiles): - with sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1) as f: + with sftp.open('%s/file%d.txt' % (sftp.FOLDER, i), 'w', 1) as f: f.write('this is file #%d.\n' % i) - sftp.chmod('%s/file%d.txt' % (FOLDER, i), o660) + sftp.chmod('%s/file%d.txt' % (sftp.FOLDER, i), o660) # now make sure every file is there, by creating a list of filenmes # and reading them in random order. numlist = list(range(numfiles)) while len(numlist) > 0: r = numlist[random.randint(0, len(numlist) - 1)] - with sftp.open('%s/file%d.txt' % (FOLDER, r)) as f: - self.assertEqual(f.readline(), 'this is file #%d.\n' % r) + with sftp.open('%s/file%d.txt' % (sftp.FOLDER, r)) as f: + assert f.readline() == 'this is file #%d.\n' % r numlist.remove(r) finally: for i in range(numfiles): try: - sftp.remove('%s/file%d.txt' % (FOLDER, i)) + sftp.remove('%s/file%d.txt' % (sftp.FOLDER, i)) except: pass - def test_2_big_file(self): + def test_2_big_file(self, sftp): """ write a 1MB file with no buffering. """ - sftp = get_sftp() kblob = (1024 * b'x') start = time.time() try: - with sftp.open('%s/hongry.txt' % FOLDER, 'w') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w') as f: for n in range(1024): f.write(kblob) if n % 128 == 0: sys.stderr.write('.') sys.stderr.write(' ') - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 end = time.time() sys.stderr.write('%ds ' % round(end - start)) start = time.time() - with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f: for n in range(1024): data = f.read(1024) - self.assertEqual(data, kblob) + assert data == kblob end = time.time() sys.stderr.write('%ds ' % round(end - start)) finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_3_big_file_pipelined(self): + def test_3_big_file_pipelined(self, sftp): """ write a 1MB file, with no linefeeds, using pipelining. """ - sftp = get_sftp() kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) start = time.time() try: - with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -126,12 +107,12 @@ class BigSFTPTest (unittest.TestCase): sys.stderr.write('.') sys.stderr.write(' ') - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 end = time.time() sys.stderr.write('%ds ' % round(end - start)) start = time.time() - with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f: file_size = f.stat().st_size f.prefetch(file_size) @@ -145,19 +126,18 @@ class BigSFTPTest (unittest.TestCase): chunk = size - n data = f.read(chunk) offset = n % 1024 - self.assertEqual(data, k2blob[offset:offset + chunk]) + assert data == k2blob[offset:offset + chunk] n += chunk end = time.time() sys.stderr.write('%ds ' % round(end - start)) finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_4_prefetch_seek(self): - sftp = get_sftp() + def test_4_prefetch_seek(self, sftp): kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) try: - with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -165,13 +145,13 @@ class BigSFTPTest (unittest.TestCase): sys.stderr.write('.') sys.stderr.write(' ') - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 start = time.time() k2blob = kblob + kblob chunk = 793 for i in range(10): - with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f: file_size = f.stat().st_size f.prefetch(file_size) base_offset = (512 * 1024) + 17 * random.randint(1000, 2000) @@ -183,18 +163,17 @@ class BigSFTPTest (unittest.TestCase): f.seek(offset) data = f.read(chunk) n_offset = offset % 1024 - self.assertEqual(data, k2blob[n_offset:n_offset + chunk]) + assert data == k2blob[n_offset:n_offset + chunk] offset += chunk end = time.time() sys.stderr.write('%ds ' % round(end - start)) finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_5_readv_seek(self): - sftp = get_sftp() + def test_5_readv_seek(self, sftp): kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) try: - with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -202,13 +181,13 @@ class BigSFTPTest (unittest.TestCase): sys.stderr.write('.') sys.stderr.write(' ') - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 start = time.time() k2blob = kblob + kblob chunk = 793 for i in range(10): - with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f: base_offset = (512 * 1024) + 17 * random.randint(1000, 2000) # make a bunch of offsets and put them in random order offsets = [base_offset + j * chunk for j in range(100)] @@ -221,21 +200,20 @@ class BigSFTPTest (unittest.TestCase): for i in range(len(readv_list)): offset = readv_list[i][0] n_offset = offset % 1024 - self.assertEqual(next(ret), k2blob[n_offset:n_offset + chunk]) + assert next(ret) == k2blob[n_offset:n_offset + chunk] end = time.time() sys.stderr.write('%ds ' % round(end - start)) finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_6_lots_of_prefetching(self): + def test_6_lots_of_prefetching(self, sftp): """ prefetch a 1MB file a bunch of times, discarding the file object without using it, to verify that paramiko doesn't get confused. """ - sftp = get_sftp() kblob = (1024 * b'x') try: - with sftp.open('%s/hongry.txt' % FOLDER, 'w') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w') as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -243,32 +221,31 @@ class BigSFTPTest (unittest.TestCase): sys.stderr.write('.') sys.stderr.write(' ') - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 for i in range(10): - with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f: file_size = f.stat().st_size f.prefetch(file_size) - with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f: file_size = f.stat().st_size f.prefetch(file_size) for n in range(1024): data = f.read(1024) - self.assertEqual(data, kblob) + assert data == kblob if n % 128 == 0: sys.stderr.write('.') sys.stderr.write(' ') finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_7_prefetch_readv(self): + def test_7_prefetch_readv(self, sftp): """ verify that prefetch and readv don't conflict with each other. """ - sftp = get_sftp() kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) try: - with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -276,13 +253,13 @@ class BigSFTPTest (unittest.TestCase): sys.stderr.write('.') sys.stderr.write(' ') - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 - with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f: file_size = f.stat().st_size f.prefetch(file_size) data = f.read(1024) - self.assertEqual(data, kblob) + assert data == kblob chunk_size = 793 base_offset = 512 * 1024 @@ -290,23 +267,22 @@ class BigSFTPTest (unittest.TestCase): chunks = [(base_offset + (chunk_size * i), chunk_size) for i in range(20)] for data in f.readv(chunks): offset = base_offset % 1024 - self.assertEqual(chunk_size, len(data)) - self.assertEqual(k2blob[offset:offset + chunk_size], data) + assert chunk_size == len(data) + assert k2blob[offset:offset + chunk_size] == data base_offset += chunk_size sys.stderr.write(' ') finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_8_large_readv(self): + def test_8_large_readv(self, sftp): """ verify that a very large readv is broken up correctly and still returned as a single blob. """ - sftp = get_sftp() kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) try: - with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -314,62 +290,53 @@ class BigSFTPTest (unittest.TestCase): sys.stderr.write('.') sys.stderr.write(' ') - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 - with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f: data = list(f.readv([(23 * 1024, 128 * 1024)])) - self.assertEqual(1, len(data)) + assert len(data) == 1 data = data[0] - self.assertEqual(128 * 1024, len(data)) + assert len(data) == 128 * 1024 sys.stderr.write(' ') finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_9_big_file_big_buffer(self): + def test_9_big_file_big_buffer(self, sftp): """ write a 1MB file, with no linefeeds, and a big buffer. """ - sftp = get_sftp() mblob = (1024 * 1024 * 'x') try: - with sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w', 128 * 1024) as f: f.write(mblob) - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_A_big_file_renegotiate(self): + def test_A_big_file_renegotiate(self, sftp): """ write a 1MB file, forcing key renegotiation in the middle. """ - sftp = get_sftp() t = sftp.sock.get_transport() t.packetizer.REKEY_BYTES = 512 * 1024 k32blob = (32 * 1024 * 'x') try: - with sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w', 128 * 1024) as f: for i in range(32): f.write(k32blob) - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) - self.assertNotEqual(t.H, t.session_id) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 + assert t.H != t.session_id # try to read it too. - with sftp.open('%s/hongry.txt' % FOLDER, 'r', 128 * 1024) as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r', 128 * 1024) as f: file_size = f.stat().st_size f.prefetch(file_size) total = 0 while total < 1024 * 1024: total += len(f.read(32 * 1024)) finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) t.packetizer.REKEY_BYTES = pow(2, 30) - - -if __name__ == '__main__': - from tests.test_sftp import SFTPTest - SFTPTest.init_loopback() - from unittest import main - main() diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index d8d05d2b..f0645e0e 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -29,17 +29,20 @@ import unittest import paramiko -from tests.util import test_path -from tests.test_client import FINGERPRINTS +from .util import _support, needs_gssapi +from .test_client import FINGERPRINTS -class NullServer (paramiko.ServerInterface): +class NullServer (paramiko.ServerInterface): def get_allowed_auths(self, username): return 'gssapi-with-mic,publickey' - def check_auth_gssapi_with_mic(self, username, - gss_authenticated=paramiko.AUTH_FAILED, - cc_file=None): + def check_auth_gssapi_with_mic( + self, + username, + gss_authenticated=paramiko.AUTH_FAILED, + cc_file=None, + ): if gss_authenticated == paramiko.AUTH_SUCCESSFUL: return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED @@ -66,18 +69,15 @@ class NullServer (paramiko.ServerInterface): return True +@needs_gssapi class GSSAuthTest(unittest.TestCase): - @staticmethod - def init(username, hostname): - global krb5_principal, targ_name - krb5_principal = username - targ_name = hostname - def setUp(self): - self.username = krb5_principal - self.hostname = socket.getfqdn(targ_name) + # TODO: username and targ_name should come from os.environ or whatever + # the approved pytest method is for runtime-configuring test data. + self.username = "krb5_principal" + self.hostname = socket.getfqdn("targ_name") self.sockl = socket.socket() - self.sockl.bind((targ_name, 0)) + self.sockl.bind(("targ_name", 0)) self.sockl.listen(1) self.addr, self.port = self.sockl.getsockname() self.event = threading.Event() @@ -148,6 +148,6 @@ class GSSAuthTest(unittest.TestCase): Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding """ self.hostname = "this_host_does_not_exists_and_causes_a_GSSAPI-exception" - self._test_connection(key_filename=[test_path('test_rsa.key')], + self._test_connection(key_filename=[_support('test_rsa.key')], allow_agent=False, look_for_keys=False) diff --git a/tests/test_transport.py b/tests/test_transport.py index 3e352919..00639d04 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -33,7 +33,7 @@ import unittest from paramiko import ( Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, SSHException, - ChannelException, Packetizer, + ChannelException, Packetizer, ) from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED @@ -43,9 +43,9 @@ from paramiko.common import ( ) from paramiko.py3compat import bytes from paramiko.message import Message -from tests import skipUnlessBuiltin -from tests.loop import LoopSocket -from tests.util import test_path + +from .util import needs_builtin, _support, slow +from .loop import LoopSocket LONG_BANNER = """\ @@ -64,7 +64,7 @@ Maybe. class NullServer (ServerInterface): paranoid_did_password = False paranoid_did_public_key = False - paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key')) + paranoid_key = DSSKey.from_private_key_file(_support('test_dss.key')) def get_allowed_auths(self, username): if username == 'slowdive': @@ -130,7 +130,7 @@ class TransportTest(unittest.TestCase): self.sockc.close() def setup_test_server(self, client_options=None, server_options=None): - host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) @@ -189,7 +189,7 @@ class TransportTest(unittest.TestCase): loopback sockets. this is hardly "simple" but it's simpler than the later tests. :) """ - host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -214,7 +214,7 @@ class TransportTest(unittest.TestCase): """ verify that a long banner doesn't mess up the handshake. """ - host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -246,6 +246,7 @@ class TransportTest(unittest.TestCase): self.tc.renegotiate_keys() self.ts.send_ignore(1024) + @slow def test_5_keepalive(self): """ verify that the keepalive will be sent. @@ -809,6 +810,7 @@ class TransportTest(unittest.TestCase): (2**32, MAX_WINDOW_SIZE)]: self.assertEqual(self.tc._sanitize_window_size(val), correct) + @slow def test_L_handshake_timeout(self): """ verify that we can get a hanshake timeout. @@ -829,7 +831,7 @@ class TransportTest(unittest.TestCase): # be fine. Even tho it's a bit squicky. self.tc.packetizer = SlowPacketizer(self.tc.sock) # Continue with regular test red tape. - host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -881,7 +883,7 @@ class TransportTest(unittest.TestCase): expected = text.encode("utf-8") self.assertEqual(sfile.read(len(expected)), expected) - @skipUnlessBuiltin('buffer') + @needs_builtin('buffer') def test_channel_send_buffer(self): """ verify sending buffer instances to a channel @@ -904,7 +906,7 @@ class TransportTest(unittest.TestCase): chan.sendall(buffer(data)) self.assertEqual(sfile.read(len(data)), data) - @skipUnlessBuiltin('memoryview') + @needs_builtin('memoryview') def test_channel_send_memoryview(self): """ verify sending memoryview instances to a channel diff --git a/tests/test_util.py b/tests/test_util.py index 7880e156..90473f43 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -30,6 +30,7 @@ import paramiko.util from paramiko.util import lookup_ssh_host_config as host_config, safe_string from paramiko.py3compat import StringIO, byte_ord, b + # Note some lines in this configuration have trailing spaces on purpose test_config_file = """\ Host * @@ -366,7 +367,7 @@ IdentityFile something_%l_using_fqdn def test_get_hostnames(self): f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) - self.assertEqual(config.get_hostnames(), set(['*', '*.example.com', 'spoo.example.com'])) + self.assertEqual(config.get_hostnames(), {'*', '*.example.com', 'spoo.example.com'}) def test_quoted_host_names(self): test_config_file = """\ @@ -469,12 +470,12 @@ Host param3 parara self.assertRaises(Exception, conf._get_hosts, host) def test_safe_string(self): - vanilla = b("vanilla") - has_bytes = b("has \7\3 bytes") + vanilla = b"vanilla" + has_bytes = b"has \7\3 bytes" safe_vanilla = safe_string(vanilla) safe_has_bytes = safe_string(has_bytes) - expected_bytes = b("has %07%03 bytes") - err = "{0!r} != {1!r}" + expected_bytes = b"has %07%03 bytes" + err = "{!r} != {!r}" msg = err.format(safe_vanilla, vanilla) assert safe_vanilla == vanilla, msg msg = err.format(safe_has_bytes, expected_bytes) diff --git a/tests/util.py b/tests/util.py index c1b43da8..4ca02374 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,6 +1,27 @@ -import os +from os.path import dirname, realpath, join -root_path = os.path.dirname(os.path.realpath(__file__)) +import pytest -def test_path(filename): - return os.path.join(root_path, filename) +from paramiko.py3compat import builtins + + +def _support(filename): + return join(dirname(realpath(__file__)), filename) + + +# TODO: consider using pytest.importorskip('gssapi') instead? We presumably +# still need CLI configurability for the Kerberos parameters, though, so can't +# JUST key off presence of GSSAPI optional dependency... +# TODO: anyway, s/True/os.environ.get('RUN_GSSAPI', False)/ or something. +needs_gssapi = pytest.mark.skipif(True, reason="No GSSAPI to test") + + +def needs_builtin(name): + """ + Skip decorated test if builtin name does not exist. + """ + reason = "Test requires a builtin '{}'".format(name) + return pytest.mark.skipif(not hasattr(builtins, name), reason=reason) + + +slow = pytest.mark.slow |