diff options
58 files changed, 917 insertions, 695 deletions
diff --git a/.travis.yml b/.travis.yml index f8e093c9..324c5f3f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,29 +1,30 @@ +dist: xenial language: python sudo: false cache: directories: - $HOME/.cache/pip python: - - "2.6" - "2.7" - - "3.3" - "3.4" - "3.5" - "3.6" - - "3.7-dev" - - "pypy-5.6.0" + - "3.7" + - "3.8-dev" + - "pypy" + - "pypy3" matrix: allow_failures: - - python: "3.7-dev" + - python: "3.8-dev" # Explicitly test against our oldest supported cryptography.io, in addition # to whatever the latest default is. include: - python: 2.7 env: "OLDEST_CRYPTO=1.5" - - python: 3.6 + - python: 3.7 env: "OLDEST_CRYPTO=1.5" install: - # Ensure modern pip/etc on Python 3.3 workers (not sure WTF, but, eh) + # Ensure modern pip/etc to avoid some issues w/ older worker environs - pip install pip==9.0.1 setuptools==36.6.0 # Grab a specific version of Cryptography if desired. Doing this before other # installations ensures we don't have to do any downgrading/overriding. @@ -31,25 +32,22 @@ install: if [[ -n "$OLDEST_CRYPTO" ]]; then pip install "cryptography==${OLDEST_CRYPTO}" fi + # Self-install for setup.py-driven deps + - pip install -e . # Dev (doc/test running) requirements # TODO: use pipenv + whatever contexty-type stuff it has - pip install codecov # For codecov specifically - pip install -r dev-requirements.txt - # Self-install for setup.py-driven deps - - pip install -e . -script: | - # NOTE: the below hilarity should only exist in 2.0-2.3, 2.4+ should've gone - # back to vague normalcy! - if [[ $TRAVIS_PYTHON_VERSION == '2.6' || $TRAVIS_PYTHON_VERSION == '3.3' ]]; - then - flake8 - coverage run --source=paramiko -m pytest - else - inv travis.blacken - flake8 - inv coverage - inv sites - fi +script: + # Fast syntax check failures for more rapid feedback to submitters + # (Travis-oriented metatask that version checks Python, installs, runs.) + - inv travis.blacken + # I have this in my git pre-push hook, but contributors probably don't + - flake8 + # All (including slow) tests, w/ coverage! + - inv coverage + # Ensure documentation builds, both sites, maxxed nitpicking + - inv sites notifications: irc: channels: "irc.freenode.org#paramiko" @@ -22,7 +22,7 @@ What ---- "Paramiko" is a combination of the Esperanto words for "paranoid" and -"friend". It's a module for Python 2.6+/3.3+ that implements the SSH2 protocol +"friend". It's a module for Python 2.7/3.4+ that implements the SSH2 protocol for secure (encrypted and authenticated) connections to remote machines. Unlike SSL (aka TLS), SSH2 protocol does not require hierarchical certificates signed by a powerful central authority. You may know SSH2 as the protocol that diff --git a/dev-requirements.txt b/dev-requirements.txt index c9fd4965..9a7be339 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -4,9 +4,11 @@ invocations>=1.2.0,<2.0 # NOTE: pytest-relaxed currently only works with pytest >=3, <3.3 pytest==4.6.3 pytest-relaxed==1.1.5 +# pytest-xdist for test dir watching and the inv guard task +pytest-xdist==1.28.0 mock==2.0.0 # Linting! -flake8==2.4.0 +flake8==3.6.0 # Coverage! coverage==3.7.1 codecov==1.6.3 @@ -18,6 +20,3 @@ releases>=1.5,<2.0 semantic_version>=2.4,<2.5 wheel==0.24 twine==1.11.0 -# Test-matrix-oriented pins of production deps; should only exist on 2.0-2.3 -pycparser<2.19 -idna<2.8 diff --git a/paramiko/__init__.py b/paramiko/__init__.py index 5780a9c7..b5b577d3 100644 --- a/paramiko/__init__.py +++ b/paramiko/__init__.py @@ -19,15 +19,6 @@ # flake8: noqa import sys from paramiko._version import __version__, __version_info__ - -if sys.version_info < (2, 6): - raise RuntimeError("You need Python 2.6+ for this module.") - - -__author__ = "Jeff Forcier <jeff@bitprophet.org>" -__license__ = "GNU Lesser General Public License (LGPL)" - - from paramiko.transport import SecurityOptions, Transport from paramiko.client import ( SSHClient, @@ -38,7 +29,7 @@ from paramiko.client import ( ) from paramiko.auth_handler import AuthHandler from paramiko.ssh_gss import GSSAuth, GSS_AUTH_AVAILABLE, GSS_EXCEPTIONS -from paramiko.channel import Channel, ChannelFile +from paramiko.channel import Channel, ChannelFile, ChannelStderrFile from paramiko.ssh_exception import ( SSHException, PasswordRequiredException, @@ -94,42 +85,46 @@ from paramiko.sftp import ( from paramiko.common import io_sleep + +__author__ = "Jeff Forcier <jeff@bitprophet.org>" +__license__ = "GNU Lesser General Public License (LGPL)" + __all__ = [ - "Transport", - "SSHClient", - "MissingHostKeyPolicy", + "Agent", + "AgentKey", + "AuthenticationException", "AutoAddPolicy", - "RejectPolicy", - "WarningPolicy", - "SecurityOptions", - "SubsystemHandler", + "BadAuthenticationType", + "BadHostKeyException", + "BufferedFile", "Channel", - "PKey", - "RSAKey", + "ChannelException", "DSSKey", + "HostKeys", "Message", - "SSHException", - "AuthenticationException", + "MissingHostKeyPolicy", + "PKey", "PasswordRequiredException", - "BadAuthenticationType", - "ChannelException", - "BadHostKeyException", "ProxyCommand", "ProxyCommandFailure", + "RSAKey", + "RejectPolicy", "SFTP", + "SFTPAttributes", + "SFTPClient", + "SFTPError", "SFTPFile", "SFTPHandle", - "SFTPClient", "SFTPServer", - "SFTPError", - "SFTPAttributes", "SFTPServerInterface", - "ServerInterface", - "BufferedFile", - "Agent", - "AgentKey", - "HostKeys", + "SSHClient", "SSHConfig", - "util", + "SSHException", + "SecurityOptions", + "ServerInterface", + "SubsystemHandler", + "Transport", + "WarningPolicy", "io_sleep", + "util", ] diff --git a/paramiko/_version.py b/paramiko/_version.py index fcdc1c31..2e797d40 100644 --- a/paramiko/_version.py +++ b/paramiko/_version.py @@ -1,2 +1,2 @@ -__version_info__ = (2, 3, 3) +__version_info__ = (2, 4, 2) __version__ = ".".join(map(str, __version_info__)) diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py index bbfa1712..26605a16 100644 --- a/paramiko/auth_handler.py +++ b/paramiko/auth_handler.py @@ -46,7 +46,6 @@ from paramiko.common import ( MSG_USERAUTH_REQUEST, MSG_USERAUTH_SUCCESS, MSG_USERAUTH_FAILURE, - cMSG_USERAUTH_BANNER, MSG_USERAUTH_BANNER, MSG_USERAUTH_INFO_REQUEST, MSG_USERAUTH_INFO_RESPONSE, @@ -59,6 +58,7 @@ from paramiko.common import ( MSG_USERAUTH_GSSAPI_ERRTOK, MSG_USERAUTH_GSSAPI_MIC, MSG_NAMES, + cMSG_USERAUTH_BANNER, ) from paramiko.message import Message from paramiko.py3compat import bytestring @@ -95,6 +95,9 @@ class AuthHandler(object): self.gss_host = None self.gss_deleg_creds = True + def _log(self, *args): + return self.transport._log(*args) + def is_authenticated(self): return self.authenticated @@ -269,7 +272,7 @@ class AuthHandler(object): def _parse_service_accept(self, m): service = m.get_text() if service == "ssh-userauth": - self.transport._log(DEBUG, "userauth is OK") + self._log(DEBUG, "userauth is OK") m = Message() m.add_byte(cMSG_USERAUTH_REQUEST) m.add_string(self.username) @@ -347,7 +350,7 @@ class AuthHandler(object): self.transport.send_message(m) else: raise SSHException( - "Received Package: %s" % MSG_NAMES[ptype] + "Received Package: {}".format(MSG_NAMES[ptype]) ) m = Message() m.add_byte(cMSG_USERAUTH_GSSAPI_MIC) @@ -378,7 +381,7 @@ Error Message: {} return else: raise SSHException( - "Received Package: %s" % MSG_NAMES[ptype] + "Received Package: {}".format(MSG_NAMES[ptype]) ) elif ( self.auth_method == "gssapi-keyex" @@ -392,23 +395,23 @@ Error Message: {} pass else: raise SSHException( - 'Unknown auth method "%s"' % self.auth_method + 'Unknown auth method "{}"'.format(self.auth_method) ) self.transport._send_message(m) else: - self.transport._log( - DEBUG, 'Service request "%s" accepted (?)' % service + self._log( + DEBUG, 'Service request "{}" accepted (?)'.format(service) ) def _send_auth_result(self, username, method, result): # okay, send result m = Message() if result == AUTH_SUCCESSFUL: - self.transport._log(INFO, "Auth granted (%s)." % method) + self._log(INFO, "Auth granted ({}).".format(method)) m.add_byte(cMSG_USERAUTH_SUCCESS) self.authenticated = True else: - self.transport._log(INFO, "Auth rejected (%s)." % method) + self._log(INFO, "Auth rejected ({}).".format(method)) m.add_byte(cMSG_USERAUTH_FAILURE) m.add_string( self.transport.server_object.get_allowed_auths(username) @@ -452,10 +455,11 @@ Error Message: {} username = m.get_text() service = m.get_text() method = m.get_text() - self.transport._log( + self._log( DEBUG, - "Auth request (type=%s) service=%s, username=%s" - % (method, service, username), + "Auth request (type={}) service={}, username={}".format( + method, service, username + ), ) if service != "ssh-connection": self._disconnect_service_not_available() @@ -463,7 +467,7 @@ Error Message: {} if (self.auth_username is not None) and ( self.auth_username != username ): - self.transport._log( + self._log( WARNING, "Auth rejected because the client attempted to change username in mid-flight", # noqa ) @@ -488,9 +492,7 @@ Error Message: {} # always treated as failure, since we don't support changing # passwords, but collect the list of valid auth types from # the callback anyway - self.transport._log( - DEBUG, "Auth request to change passwords (rejected)" - ) + self._log(DEBUG, "Auth request to change passwords (rejected)") newpassword = m.get_binary() try: newpassword = newpassword.decode("UTF-8", "replace") @@ -508,13 +510,13 @@ Error Message: {} try: key = self.transport._key_info[keytype](Message(keyblob)) except SSHException as e: - self.transport._log( - INFO, "Auth rejected: public key: %s" % str(e) - ) + self._log(INFO, "Auth rejected: public key: {}".format(str(e))) key = None except Exception as e: - msg = "Auth rejected: unsupported or mangled public key ({0}: {1})" # noqa - self.transport._log(INFO, msg.format(e.__class__.__name__, e)) + msg = ( + "Auth rejected: unsupported or mangled public key ({}: {})" + ) # noqa + self._log(INFO, msg.format(e.__class__.__name__, e)) key = None if key is None: self._disconnect_no_more_auth() @@ -537,9 +539,7 @@ Error Message: {} sig = Message(m.get_binary()) blob = self._get_session_blob(key, service, username) if not key.verify_ssh_sig(blob, sig): - self.transport._log( - INFO, "Auth rejected: invalid signature" - ) + self._log(INFO, "Auth rejected: invalid signature") result = AUTH_FAILED elif method == "keyboard-interactive": submethods = m.get_string() @@ -559,7 +559,7 @@ Error Message: {} # We can't accept more than one OID, so if the SSH client sends # more than one, disconnect. if mechs > 1: - self.transport._log( + self._log( INFO, "Disconnect: Received more than one GSS-API OID mechanism", ) @@ -568,7 +568,7 @@ Error Message: {} mech_ok = sshgss.ssh_check_mech(desired_mech) # if we don't support the mechanism, disconnect. if not mech_ok: - self.transport._log( + self._log( INFO, "Disconnect: Received an invalid GSS-API OID mechanism", ) @@ -615,8 +615,8 @@ Error Message: {} self._send_auth_result(username, method, result) def _parse_userauth_success(self, m): - self.transport._log( - INFO, "Authentication (%s) successful!" % self.auth_method + self._log( + INFO, "Authentication ({}) successful!".format(self.auth_method) ) self.authenticated = True self.transport._auth_trigger() @@ -627,21 +627,23 @@ Error Message: {} authlist = m.get_list() partial = m.get_boolean() if partial: - self.transport._log(INFO, "Authentication continues...") - self.transport._log(DEBUG, "Methods: " + str(authlist)) + self._log(INFO, "Authentication continues...") + self._log(DEBUG, "Methods: " + str(authlist)) self.transport.saved_exception = PartialAuthentication(authlist) elif self.auth_method not in authlist: - self.transport._log( - DEBUG, - "Authentication type (%s) not permitted." % self.auth_method, - ) - self.transport._log(DEBUG, "Allowed methods: " + str(authlist)) + for msg in ( + "Authentication type ({}) not permitted.".format( + self.auth_method + ), + "Allowed methods: {}".format(authlist), + ): + self._log(DEBUG, msg) self.transport.saved_exception = BadAuthenticationType( "Bad authentication type", authlist ) else: - self.transport._log( - INFO, "Authentication (%s) failed." % self.auth_method + self._log( + INFO, "Authentication ({}) failed.".format(self.auth_method) ) self.authenticated = False self.username = None @@ -651,7 +653,7 @@ Error Message: {} def _parse_userauth_banner(self, m): banner = m.get_string() self.banner = banner - self.transport._log(INFO, "Auth banner: %s" % banner) + self._log(INFO, "Auth banner: {}".format(banner)) # who cares. def _parse_userauth_info_request(self, m): @@ -695,10 +697,8 @@ Error Message: {} def _handle_local_gss_failure(self, e): self.transport.saved_exception = e - self.transport._log(DEBUG, "GSSAPI failure: %s" % str(e)) - self.transport._log( - INFO, "Authentication (%s) failed." % self.auth_method - ) + self._log(DEBUG, "GSSAPI failure: {}".format(e)) + self._log(INFO, "Authentication ({}) failed.".format(self.auth_method)) self.authenticated = False self.username = None if self.auth_event is not None: diff --git a/paramiko/ber.py b/paramiko/ber.py index 0991b0dd..92d7121e 100644 --- a/paramiko/ber.py +++ b/paramiko/ber.py @@ -89,9 +89,8 @@ class BER(object): return util.inflate_long(data) else: # 1: boolean (00 false, otherwise true) - raise BERException( - "Unknown ber encoding type %d (robey is lazy)" % ident - ) + msg = "Unknown ber encoding type {:d} (robey is lazy)" + raise BERException(msg.format(ident)) @staticmethod def decode_sequence(data): @@ -127,7 +126,9 @@ class BER(object): elif (type(x) is list) or (type(x) is tuple): self.encode_tlv(0x30, self.encode_sequence(x)) else: - raise BERException("Unknown type for encoding: %s" % repr(type(x))) + raise BERException( + "Unknown type for encoding: {!r}".format(type(x)) + ) @staticmethod def encode_sequence(data): diff --git a/paramiko/channel.py b/paramiko/channel.py index 96381f01..f2aaf26a 100644 --- a/paramiko/channel.py +++ b/paramiko/channel.py @@ -145,7 +145,7 @@ class Channel(ClosingContextManager): """ Return a string representation of this object, for debugging. """ - out = "<paramiko.Channel %d" % self.chanid + out = "<paramiko.Channel {}".format(self.chanid) if self.closed: out += " (closed)" elif self.active: @@ -153,9 +153,9 @@ class Channel(ClosingContextManager): out += " (EOF received)" if self.eof_sent: out += " (EOF sent)" - out += " (open) window=%d" % self.out_window_size + out += " (open) window={}".format(self.out_window_size) if len(self.in_buffer) > 0: - out += " in-buffer=%d" % (len(self.in_buffer),) + out += " in-buffer={}".format(len(self.in_buffer)) out += " -> " + repr(self.transport) out += ">" return out @@ -331,7 +331,7 @@ class Channel(ClosingContextManager): try: self.set_environment_variable(name, value) except SSHException as e: - err = 'Failed to set environment variable "{0}".' + err = 'Failed to set environment variable "{}".' raise SSHException(err.format(name), e) @open_only @@ -991,7 +991,7 @@ class Channel(ClosingContextManager): # a window update self.in_window_threshold = window_size // 10 self.in_window_sofar = 0 - self._log(DEBUG, "Max packet in: %d bytes" % max_packet_size) + self._log(DEBUG, "Max packet in: {} bytes".format(max_packet_size)) def _set_remote_channel(self, chanid, window_size, max_packet_size): self.remote_chanid = chanid @@ -1000,10 +1000,12 @@ class Channel(ClosingContextManager): max_packet_size ) self.active = 1 - self._log(DEBUG, "Max packet out: %d bytes" % self.out_max_packet_size) + self._log( + DEBUG, "Max packet out: {} bytes".format(self.out_max_packet_size) + ) def _request_success(self, m): - self._log(DEBUG, "Sesch channel %d request ok" % self.chanid) + self._log(DEBUG, "Sesch channel {} request ok".format(self.chanid)) self.event_ready = True self.event.set() return @@ -1031,7 +1033,7 @@ class Channel(ClosingContextManager): s = m.get_binary() if code != 1: self._log( - ERROR, "unknown extended_data type %d; discarding" % code + ERROR, "unknown extended_data type {}; discarding".format(code) ) return if self.combine_stderr: @@ -1044,7 +1046,7 @@ class Channel(ClosingContextManager): self.lock.acquire() try: if self.ultra_debug: - self._log(DEBUG, "window up %d" % nbytes) + self._log(DEBUG, "window up {}".format(nbytes)) self.out_window_size += nbytes self.out_buffer_cv.notifyAll() finally: @@ -1131,7 +1133,7 @@ class Channel(ClosingContextManager): else: ok = server.check_channel_forward_agent_request(self) else: - self._log(DEBUG, 'Unhandled channel request "%s"' % key) + self._log(DEBUG, 'Unhandled channel request "{}"'.format(key)) ok = False if want_reply: m = Message() @@ -1153,7 +1155,7 @@ class Channel(ClosingContextManager): self._pipe.set_forever() finally: self.lock.release() - self._log(DEBUG, "EOF received (%s)", self._name) + self._log(DEBUG, "EOF received ({})".format(self._name)) def _handle_close(self, m): self.lock.acquire() @@ -1225,7 +1227,7 @@ class Channel(ClosingContextManager): m.add_byte(cMSG_CHANNEL_EOF) m.add_int(self.remote_chanid) self.eof_sent = True - self._log(DEBUG, "EOF sent (%s)", self._name) + self._log(DEBUG, "EOF sent ({})".format(self._name)) return m def _close_internal(self): @@ -1259,12 +1261,14 @@ class Channel(ClosingContextManager): if self.closed or self.eof_received or not self.active: return 0 if self.ultra_debug: - self._log(DEBUG, "addwindow %d" % n) + self._log(DEBUG, "addwindow {}".format(n)) self.in_window_sofar += n if self.in_window_sofar <= self.in_window_threshold: return 0 if self.ultra_debug: - self._log(DEBUG, "addwindow send %d" % self.in_window_sofar) + self._log( + DEBUG, "addwindow send {}".format(self.in_window_sofar) + ) out = self.in_window_sofar self.in_window_sofar = 0 return out @@ -1307,7 +1311,7 @@ class Channel(ClosingContextManager): size = self.out_max_packet_size - 64 self.out_window_size -= size if self.ultra_debug: - self._log(DEBUG, "window down to %d" % self.out_window_size) + self._log(DEBUG, "window down to {}".format(self.out_window_size)) return size @@ -1344,9 +1348,6 @@ class ChannelFile(BufferedFile): class ChannelStderrFile(ChannelFile): - def __init__(self, channel, mode="r", bufsize=-1): - ChannelFile.__init__(self, channel, mode, bufsize) - def _read(self, size): return self.channel.recv_stderr(size) diff --git a/paramiko/client.py b/paramiko/client.py index 49a9a301..2538d582 100644 --- a/paramiko/client.py +++ b/paramiko/client.py @@ -146,7 +146,9 @@ class SSHClient(ClosingContextManager): for hostname, keys in self._host_keys.items(): for keytype, key in keys.items(): f.write( - "%s %s %s\n" % (hostname, keytype, key.get_base64()) + "{} {} {}\n".format( + hostname, keytype, key.get_base64() + ) ) def get_host_keys(self): @@ -233,6 +235,7 @@ class SSHClient(ClosingContextManager): banner_timeout=None, auth_timeout=None, gss_trust_dns=True, + passphrase=None, ): """ Connect to an SSH server and authenticate to it. The server's host key @@ -273,7 +276,10 @@ class SSHClient(ClosingContextManager): the username to authenticate as (defaults to the current local username) :param str password: - a password to use for authentication or for unlocking a private key + Used for password authentication; is also used for private key + decryption if ``passphrase`` is not given. + :param str passphrase: + Used for decrypting private keys. :param .PKey pkey: an optional private key to use for authentication :param str key_filename: the filename, or list of filenames, of optional private key(s) @@ -319,6 +325,8 @@ class SSHClient(ClosingContextManager): ``gss_deleg_creds`` and ``gss_host`` arguments. .. versionchanged:: 2.3 Added the ``gss_trust_dns`` argument. + .. versionchanged:: 2.4 + Added the ``passphrase`` argument. """ if not sock: errors = {} @@ -374,7 +382,7 @@ class SSHClient(ClosingContextManager): if port == SSH_PORT: server_hostkey_name = hostname else: - server_hostkey_name = "[%s]:%d" % (hostname, port) + server_hostkey_name = "[{}]:{}".format(hostname, port) our_server_keys = None our_server_keys = self._system_host_keys.get(server_hostkey_name) @@ -426,6 +434,7 @@ class SSHClient(ClosingContextManager): gss_kex, gss_deleg_creds, t.gss_host, + passphrase, ) def close(self): @@ -563,14 +572,14 @@ class SSHClient(ClosingContextManager): # TODO: change this to 'Loading' instead of 'Trying' sometime; probably # when #387 is released, since this is a critical log message users are # likely testing/filtering for (bah.) - msg = "Trying discovered key {0} in {1}".format( + msg = "Trying discovered key {} in {}".format( hexlify(key.get_fingerprint()), key_path ) self._log(DEBUG, msg) # Attempt to load cert if it exists. if os.path.isfile(cert_path): key.load_certificate(cert_path) - self._log(DEBUG, "Adding public certificate {0}".format(cert_path)) + self._log(DEBUG, "Adding public certificate {}".format(cert_path)) return key def _auth( @@ -585,6 +594,7 @@ class SSHClient(ClosingContextManager): gss_kex, gss_deleg_creds, gss_host, + passphrase, ): """ Try, in order: @@ -595,13 +605,16 @@ class SSHClient(ClosingContextManager): (if allowed). - Plain username/password auth, if a password was given. - (The password might be needed to unlock a private key, or for - two-factor authentication [for which it is required].) + (The password might be needed to unlock a private key [if 'passphrase' + isn't also given], or for two-factor authentication [for which it is + required].) """ saved_exception = None two_factor = False allowed_types = set() - two_factor_types = set(["keyboard-interactive", "password"]) + two_factor_types = {"keyboard-interactive", "password"} + if passphrase is None and password is not None: + passphrase = password # If GSS-API support and GSS-PI Key Exchange was performed, we attempt # authentication with gssapi-keyex. @@ -628,7 +641,9 @@ class SSHClient(ClosingContextManager): try: self._log( DEBUG, - "Trying SSH key %s" % hexlify(pkey.get_fingerprint()), + "Trying SSH key {}".format( + hexlify(pkey.get_fingerprint()) + ), ) allowed_types = set( self._transport.auth_publickey(username, pkey) @@ -644,7 +659,7 @@ class SSHClient(ClosingContextManager): for pkey_class in (RSAKey, DSSKey, ECDSAKey, Ed25519Key): try: key = self._key_from_filepath( - key_filename, pkey_class, password + key_filename, pkey_class, passphrase ) allowed_types = set( self._transport.auth_publickey(username, key) @@ -662,11 +677,8 @@ class SSHClient(ClosingContextManager): for key in self._agent.get_keys(): try: - self._log( - DEBUG, - "Trying SSH agent key %s" - % hexlify(key.get_fingerprint()), - ) + id_ = hexlify(key.get_fingerprint()) + self._log(DEBUG, "Trying SSH agent key {}".format(id_)) # for 2-factor auth a successfully auth'd key password # will return an allowed 2fac auth method allowed_types = set( @@ -691,7 +703,7 @@ class SSHClient(ClosingContextManager): # ~/ssh/ is for windows for directory in [".ssh", "ssh"]: full_path = os.path.expanduser( - "~/%s/id_%s" % (directory, name) + "~/{}/id_{}".format(directory, name) ) if os.path.isfile(full_path): # TODO: only do this append if below did not run @@ -705,7 +717,7 @@ class SSHClient(ClosingContextManager): for pkey_class, filename in keyfiles: try: key = self._key_from_filepath( - filename, pkey_class, password + filename, pkey_class, passphrase ) # for 2-factor auth a successfully auth'd key will result # in ['password'] @@ -774,8 +786,9 @@ class AutoAddPolicy(MissingHostKeyPolicy): client.save_host_keys(client._host_keys_filename) client._log( DEBUG, - "Adding %s host key for %s: %s" - % (key.get_name(), hostname, hexlify(key.get_fingerprint())), + "Adding {} host key for {}: {}".format( + key.get_name(), hostname, hexlify(key.get_fingerprint()) + ), ) @@ -788,10 +801,13 @@ class RejectPolicy(MissingHostKeyPolicy): def missing_host_key(self, client, hostname, key): client._log( DEBUG, - "Rejecting %s host key for %s: %s" - % (key.get_name(), hostname, hexlify(key.get_fingerprint())), + "Rejecting {} host key for {}: {}".format( + key.get_name(), hostname, hexlify(key.get_fingerprint()) + ), + ) + raise SSHException( + "Server {!r} not found in known_hosts".format(hostname) ) - raise SSHException("Server %r not found in known_hosts" % hostname) class WarningPolicy(MissingHostKeyPolicy): @@ -802,6 +818,7 @@ class WarningPolicy(MissingHostKeyPolicy): def missing_host_key(self, client, hostname, key): warnings.warn( - "Unknown %s host key for %s: %s" - % (key.get_name(), hostname, hexlify(key.get_fingerprint())) + "Unknown {} host key for {}: {}".format( + key.get_name(), hostname, hexlify(key.get_fingerprint()) + ) ) diff --git a/paramiko/config.py b/paramiko/config.py index f7941e0d..21c9dab8 100644 --- a/paramiko/config.py +++ b/paramiko/config.py @@ -65,7 +65,7 @@ class SSHConfig(object): match = re.match(self.SETTINGS_REGEX, line) if not match: - raise Exception("Unparsable line %s" % line) + raise Exception("Unparsable line {}".format(line)) key = match.group(1).lower() value = match.group(2) @@ -235,7 +235,7 @@ class SSHConfig(object): try: return shlex.split(host) except ValueError: - raise Exception("Unparsable host %s" % host) + raise Exception("Unparsable host {}".format(host)) class LazyFqdn(object): diff --git a/paramiko/ecdsakey.py b/paramiko/ecdsakey.py index ab9ec15f..b73a969e 100644 --- a/paramiko/ecdsakey.py +++ b/paramiko/ecdsakey.py @@ -147,14 +147,16 @@ class ECDSAKey(PKey): ) key_types = self._ECDSA_CURVES.get_key_format_identifier_list() cert_types = [ - "{0}-cert-v01@openssh.com".format(x) for x in key_types + "{}-cert-v01@openssh.com".format(x) for x in key_types ] self._check_type_and_load_cert( msg=msg, key_type=key_types, cert_type=cert_types ) curvename = msg.get_text() if curvename != self.ecdsa_curve.nist_name: - raise SSHException("Can't handle curve of type %s" % curvename) + raise SSHException( + "Can't handle curve of type {}".format(curvename) + ) pointinfo = msg.get_binary() try: @@ -264,7 +266,7 @@ class ECDSAKey(PKey): if bits is not None: curve = cls._ECDSA_CURVES.get_by_key_length(bits) if curve is None: - raise ValueError("Unsupported key length: %d" % bits) + raise ValueError("Unsupported key length: {:d}".format(bits)) curve = curve.curve_class() private_key = ec.generate_private_key(curve, backend=default_backend()) diff --git a/paramiko/file.py b/paramiko/file.py index a6933e02..9e9f6eb8 100644 --- a/paramiko/file.py +++ b/paramiko/file.py @@ -341,7 +341,7 @@ class BufferedFile(ClosingContextManager): after rounding up to an internal buffer size) are read. :param int sizehint: desired maximum number of bytes to read. - :returns: `list` of lines read from the file. + :returns: list of lines read from the file. """ lines = [] byte_count = 0 diff --git a/paramiko/hostkeys.py b/paramiko/hostkeys.py index 3aac1341..d0660cc8 100644 --- a/paramiko/hostkeys.py +++ b/paramiko/hostkeys.py @@ -306,7 +306,7 @@ class HostKeys(MutableMapping): salt = decodebytes(b(salt)) assert len(salt) == sha1().digest_size hmac = HMAC(salt, b(hostname), sha1).digest() - hostkey = "|1|%s|%s" % (u(encodebytes(salt)), u(encodebytes(hmac))) + hostkey = "|1|{}|{}".format(u(encodebytes(salt)), u(encodebytes(hmac))) return hostkey.replace("\n", "") @@ -344,10 +344,8 @@ class HostKeyEntry: fields = line.split(" ") if len(fields) < 3: # Bad number of fields - log.info( - "Not enough fields found in known_hosts in line %s (%r)" - % (lineno, line) - ) + msg = "Not enough fields found in known_hosts in line {} ({!r})" + log.info(msg.format(lineno, line)) return None fields = fields[:3] @@ -367,7 +365,7 @@ class HostKeyEntry: elif keytype == "ssh-ed25519": key = Ed25519Key(data=decodebytes(key)) else: - log.info("Unable to handle key of type %s" % (keytype,)) + log.info("Unable to handle key of type {}".format(keytype)) return None except binascii.Error as e: @@ -382,7 +380,7 @@ class HostKeyEntry: included. """ if self.valid: - return "%s %s %s\n" % ( + return "{} {} {}\n".format( ",".join(self.hostnames), self.key.get_name(), self.key.get_base64(), @@ -390,4 +388,4 @@ class HostKeyEntry: return None def __repr__(self): - return "<HostKeyEntry %r: %r>" % (self.hostnames, self.key) + return "<HostKeyEntry {!r}: {!r}>".format(self.hostnames, self.key) diff --git a/paramiko/kex_ecdh_nist.py b/paramiko/kex_ecdh_nist.py index 496805ab..1d87442a 100644 --- a/paramiko/kex_ecdh_nist.py +++ b/paramiko/kex_ecdh_nist.py @@ -45,7 +45,9 @@ class KexNistp256: return self._parse_kexecdh_init(m) elif not self.transport.server_mode and (ptype == _MSG_KEXECDH_REPLY): return self._parse_kexecdh_reply(m) - raise SSHException("KexECDH asked to handle packet type %d" % ptype) + raise SSHException( + "KexECDH asked to handle packet type {:d}".format(ptype) + ) def _generate_key_pair(self): self.P = ec.generate_private_key(self.curve, default_backend()) diff --git a/paramiko/kex_gex.py b/paramiko/kex_gex.py index 371db488..fb8f01fd 100644 --- a/paramiko/kex_gex.py +++ b/paramiko/kex_gex.py @@ -101,9 +101,8 @@ class KexGex(object): return self._parse_kexdh_gex_reply(m) elif ptype == _MSG_KEXDH_GEX_REQUEST_OLD: return self._parse_kexdh_gex_request_old(m) - raise SSHException( - "KexGex %s asked to handle packet type %d" % self.name, ptype - ) + msg = "KexGex {} asked to handle packet type {:d}" + raise SSHException(msg.format(self.name, ptype)) # ...internals... @@ -151,8 +150,9 @@ class KexGex(object): raise SSHException("Can't do server-side gex with no modulus pack") self.transport._log( DEBUG, - "Picking p (%d <= %d <= %d bits)" - % (minbits, preferredbits, maxbits), + "Picking p ({} <= {} <= {} bits)".format( + minbits, preferredbits, maxbits + ), ) self.g, self.p = pack.get_modulus(minbits, preferredbits, maxbits) m = Message() @@ -176,7 +176,7 @@ class KexGex(object): if pack is None: raise SSHException("Can't do server-side gex with no modulus pack") self.transport._log( - DEBUG, "Picking p (~ %d bits)" % (self.preferred_bits,) + DEBUG, "Picking p (~ {} bits)".format(self.preferred_bits) ) self.g, self.p = pack.get_modulus( self.min_bits, self.preferred_bits, self.max_bits @@ -197,9 +197,9 @@ class KexGex(object): if (bitlen < 1024) or (bitlen > 8192): raise SSHException( "Server-generated gex p (don't ask) is out of range " - "(%d bits)" % bitlen + "({} bits)".format(bitlen) ) - self.transport._log(DEBUG, "Got server p (%d bits)" % bitlen) + self.transport._log(DEBUG, "Got server p ({} bits)".format(bitlen)) self._generate_x() # now compute e = g^x mod p self.e = pow(self.g, self.x, self.p) diff --git a/paramiko/kex_group1.py b/paramiko/kex_group1.py index e6cbc4c5..904835d7 100644 --- a/paramiko/kex_group1.py +++ b/paramiko/kex_group1.py @@ -73,7 +73,8 @@ class KexGroup1(object): return self._parse_kexdh_init(m) elif not self.transport.server_mode and (ptype == _MSG_KEXDH_REPLY): return self._parse_kexdh_reply(m) - raise SSHException("KexGroup1 asked to handle packet type %d" % ptype) + msg = "KexGroup1 asked to handle packet type {:d}" + raise SSHException(msg.format(ptype)) # ...internals... diff --git a/paramiko/kex_gss.py b/paramiko/kex_gss.py index d76bb2dd..f83a2dc4 100644 --- a/paramiko/kex_gss.py +++ b/paramiko/kex_gss.py @@ -131,9 +131,8 @@ class KexGSSGroup1(object): return self._parse_kexgss_complete(m) elif ptype == MSG_KEXGSS_ERROR: return self._parse_kexgss_error(m) - raise SSHException( - "GSS KexGroup1 asked to handle packet type %d" % ptype - ) + msg = "GSS KexGroup1 asked to handle packet type {:d}" + raise SSHException(msg.format(ptype)) # ## internals... @@ -306,9 +305,14 @@ class KexGSSGroup1(object): err_msg = m.get_string() m.get_string() # we don't care about the language! raise SSHException( - "GSS-API Error:\nMajor Status: %s\nMinor Status: %s\ - \nError Message: %s\n" - ) % (str(maj_status), str(min_status), err_msg) + """GSS-API Error: +Major Status: {} +Minor Status: {} +Error Message: {} +""".format( + maj_status, min_status, err_msg + ) + ) class KexGSSGroup14(KexGSSGroup1): @@ -386,7 +390,8 @@ class KexGSSGex(object): return self._parse_kexgss_complete(m) elif ptype == MSG_KEXGSS_ERROR: return self._parse_kexgss_error(m) - raise SSHException("KexGex asked to handle packet type %d" % ptype) + msg = "KexGex asked to handle packet type {:d}" + raise SSHException(msg.format(ptype)) # ## internals... @@ -440,8 +445,9 @@ class KexGSSGex(object): raise SSHException("Can't do server-side gex with no modulus pack") self.transport._log( DEBUG, # noqa - "Picking p (%d <= %d <= %d bits)" - % (minbits, preferredbits, maxbits), + "Picking p ({} <= {} <= {} bits)".format( + minbits, preferredbits, maxbits + ), ) self.g, self.p = pack.get_modulus(minbits, preferredbits, maxbits) m = Message() @@ -464,9 +470,11 @@ class KexGSSGex(object): if (bitlen < 1024) or (bitlen > 8192): raise SSHException( "Server-generated gex p (don't ask) is out of range " - "(%d bits)" % bitlen + "({} bits)".format(bitlen) ) - self.transport._log(DEBUG, "Got server p (%d bits)" % bitlen) # noqa + self.transport._log( + DEBUG, "Got server p ({} bits)".format(bitlen) + ) # noqa self._generate_x() # now compute e = g^x mod p self.e = pow(self.g, self.x, self.p) @@ -645,9 +653,14 @@ class KexGSSGex(object): err_msg = m.get_string() m.get_string() # we don't care about the language (lang_tag)! raise SSHException( - "GSS-API Error:\nMajor Status: %s\nMinor Status: %s\ - \nError Message: %s\n" - ) % (str(maj_status), str(min_status), err_msg) + """GSS-API Error: +Major Status: {} +Minor Status: {} +Error Message: {} +""".format( + maj_status, min_status, err_msg + ) + ) class NullHostKey(object): diff --git a/paramiko/message.py b/paramiko/message.py index 869ac6c6..dead3508 100644 --- a/paramiko/message.py +++ b/paramiko/message.py @@ -187,7 +187,7 @@ class Message(object): def get_list(self): """ - Fetch a `list` of `strings <str>` from the stream. + Fetch a list of `strings <str>` from the stream. These are trivially encoded as comma-separated values in a string. """ @@ -281,7 +281,7 @@ class Message(object): a single string of values separated by commas. (Yes, really, that's how SSH2 does it.) - :param list l: list of strings to add + :param l: list of strings to add """ self.add_string(",".join(l)) return self diff --git a/paramiko/packet.py b/paramiko/packet.py index b538f3ba..cb46835e 100644 --- a/paramiko/packet.py +++ b/paramiko/packet.py @@ -382,7 +382,7 @@ class Packetizer(object): if cmd in MSG_NAMES: cmd_name = MSG_NAMES[cmd] else: - cmd_name = "$%x" % cmd + cmd_name = "${:x}".format(cmd) orig_len = len(data) self.__write_lock.acquire() try: @@ -392,7 +392,7 @@ class Packetizer(object): if self.__dump_packets: self._log( DEBUG, - "Write packet <%s>, length %d" % (cmd_name, orig_len), + "Write packet <{}>, length {}".format(cmd_name, orig_len), ) self._log(DEBUG, util.format_binary(packet, "OUT: ")) if self.__block_engine_out is not None: @@ -420,10 +420,9 @@ class Packetizer(object): ) if sent_too_much and not self.__need_rekey: # only ask once for rekeying + msg = "Rekeying (hit {} packets, {} bytes sent)" self._log( - DEBUG, - "Rekeying (hit %d packets, %d bytes sent)" - % (self.__sent_packets, self.__sent_bytes), + DEBUG, msg.format(self.__sent_packets, self.__sent_bytes) ) self.__received_bytes_overflow = 0 self.__received_packets_overflow = 0 @@ -476,7 +475,9 @@ class Packetizer(object): if self.__dump_packets: self._log( DEBUG, - "Got payload (%d bytes, %d padding)" % (packet_size, padding), + "Got payload ({} bytes, {} padding)".format( + packet_size, padding + ), ) if self.__compress_engine_in is not None: @@ -508,10 +509,10 @@ class Packetizer(object): self.__received_bytes >= self.REKEY_BYTES ): # only ask once for rekeying + err = "Rekeying (hit {} packets, {} bytes received)" self._log( DEBUG, - "Rekeying (hit %d packets, %d bytes received)" - % (self.__received_packets, self.__received_bytes), + err.format(self.__received_packets, self.__received_bytes), ) self.__received_bytes_overflow = 0 self.__received_packets_overflow = 0 @@ -521,10 +522,11 @@ class Packetizer(object): if cmd in MSG_NAMES: cmd_name = MSG_NAMES[cmd] else: - cmd_name = "$%x" % cmd + cmd_name = "${:x}".format(cmd) if self.__dump_packets: self._log( - DEBUG, "Read packet <%s>, length %d" % (cmd_name, len(payload)) + DEBUG, + "Read packet <{}>, length {}".format(cmd_name, len(payload)), ) return cmd, msg diff --git a/paramiko/pkey.py b/paramiko/pkey.py index 50e989f6..e37f7751 100644 --- a/paramiko/pkey.py +++ b/paramiko/pkey.py @@ -310,9 +310,10 @@ class PKey(object): # unencryped: done return data # encrypted keyfile: will need a password - if headers["proc-type"] != "4,ENCRYPTED": + proc_type = headers["proc-type"] + if proc_type != "4,ENCRYPTED": raise SSHException( - 'Unknown private key structure "%s"' % headers["proc-type"] + 'Unknown private key structure "{}"'.format(proc_type) ) try: encryption_type, saltstr = headers["dek-info"].split(",") @@ -320,7 +321,7 @@ class PKey(object): raise SSHException("Can't parse DEK-info in private key file") if encryption_type not in self._CIPHER_TABLE: raise SSHException( - 'Unknown private key cipher "%s"' % encryption_type + 'Unknown private key cipher "{}"'.format(encryption_type) ) # if no password was passed in, # raise an exception pointing out that we need one @@ -411,7 +412,7 @@ class PKey(object): # (requires going back into per-type subclasses.) msg.get_string() else: - err = "Invalid key (class: {0}, data type: {1}" + err = "Invalid key (class: {}, data type: {}" raise SSHException(err.format(self.__class__.__name__, type_)) def load_certificate(self, value): @@ -441,7 +442,7 @@ class PKey(object): constructor = "from_string" blob = getattr(PublicBlob, constructor)(value) if not blob.key_type.startswith(self.get_name()): - err = "PublicBlob type {0} incompatible with key type {1}" + err = "PublicBlob type {} incompatible with key type {}" raise ValueError(err.format(blob.key_type, self.get_name())) self.public_blob = blob @@ -493,7 +494,7 @@ class PublicBlob(object): """ fields = string.split(None, 2) if len(fields) < 2: - msg = "Not enough fields for public blob: {0}" + msg = "Not enough fields for public blob: {}" raise ValueError(msg.format(fields)) key_type = fields[0] key_blob = decodebytes(b(fields[1])) @@ -506,8 +507,10 @@ class PublicBlob(object): m = Message(key_blob) blob_type = m.get_text() if blob_type != key_type: - msg = "Invalid PublicBlob contents: key type={0!r}, but blob type={1!r}" # noqa - raise ValueError(msg.format(key_type, blob_type)) + deets = "key type={!r}, but blob type={!r}".format( + key_type, blob_type + ) + raise ValueError("Invalid PublicBlob contents: {}".format(deets)) # All good? All good. return cls(type_=key_type, blob=key_blob, comment=comment) @@ -523,9 +526,9 @@ class PublicBlob(object): return cls(type_=type_, blob=message.asbytes()) def __str__(self): - ret = "{0} public key/certificate".format(self.key_type) + ret = "{} public key/certificate".format(self.key_type) if self.comment: - ret += "- {0}".format(self.comment) + ret += "- {}".format(self.comment) return ret def __eq__(self, other): diff --git a/paramiko/primes.py b/paramiko/primes.py index 22e536b8..8dff7683 100644 --- a/paramiko/primes.py +++ b/paramiko/primes.py @@ -61,9 +61,15 @@ class ModulusPack(object): self.discarded = [] def _parse_modulus(self, line): - timestamp, mod_type, tests, tries, size, generator, modulus = ( - line.split() - ) + ( + timestamp, + mod_type, + tests, + tries, + size, + generator, + modulus, + ) = line.split() mod_type = int(mod_type) tests = int(tests) tries = int(tries) @@ -93,7 +99,7 @@ class ModulusPack(object): bl = util.bit_length(modulus) if (bl != size) and (bl != size + 1): self.discarded.append( - (modulus, "incorrectly reported bit length %d" % size) + (modulus, "incorrectly reported bit length {}".format(size)) ) return if bl not in self.pack: diff --git a/paramiko/py3compat.py b/paramiko/py3compat.py index 314c557a..cbe20ca6 100644 --- a/paramiko/py3compat.py +++ b/paramiko/py3compat.py @@ -62,7 +62,7 @@ if PY2: elif isinstance(s, buffer): # NOQA return s else: - raise TypeError("Expected unicode or bytes, got %r" % s) + raise TypeError("Expected unicode or bytes, got {!r}".format(s)) def u(s, encoding="utf8"): # NOQA """cast bytes or unicode to unicode""" @@ -73,7 +73,7 @@ if PY2: elif isinstance(s, buffer): # NOQA return s.decode(encoding) else: - raise TypeError("Expected unicode or bytes, got %r" % s) + raise TypeError("Expected unicode or bytes, got {!r}".format(s)) def b2s(s): return s @@ -148,7 +148,7 @@ else: elif isinstance(s, str): return s.encode(encoding) else: - raise TypeError("Expected unicode or bytes, got %r" % s) + raise TypeError("Expected unicode or bytes, got {!r}".format(s)) def u(s, encoding="utf8"): """cast bytes or unicode to unicode""" @@ -157,7 +157,7 @@ else: elif isinstance(s, str): return s else: - raise TypeError("Expected unicode or bytes, got %r" % s) + raise TypeError("Expected unicode or bytes, got {!r}".format(s)) def b2s(s): return s.decode() if isinstance(s, bytes) else s diff --git a/paramiko/server.py b/paramiko/server.py index 3eb1a170..2fe9cc19 100644 --- a/paramiko/server.py +++ b/paramiko/server.py @@ -226,7 +226,7 @@ class ServerInterface(object): The default implementation always returns ``AUTH_FAILED``. - :param list responses: list of `str` responses from the client + :param responses: list of `str` responses from the client :return: ``AUTH_FAILED`` if the authentication fails; ``AUTH_SUCCESSFUL`` if it succeeds; ``AUTH_PARTIALLY_SUCCESSFUL`` if the interactive auth @@ -678,13 +678,13 @@ class SubsystemHandler(threading.Thread): def _run(self): try: self.__transport._log( - DEBUG, "Starting handler for subsystem %s" % self.__name + DEBUG, "Starting handler for subsystem {}".format(self.__name) ) self.start_subsystem(self.__name, self.__transport, self.__channel) except Exception as e: self.__transport._log( ERROR, - 'Exception in subsystem handler for "{0}": {1}'.format( + 'Exception in subsystem handler for "{}": {}'.format( self.__name, e ), ) diff --git a/paramiko/sftp_attr.py b/paramiko/sftp_attr.py index 5ef8ff14..f16ac746 100644 --- a/paramiko/sftp_attr.py +++ b/paramiko/sftp_attr.py @@ -82,7 +82,7 @@ class SFTPAttributes(object): return attr def __repr__(self): - return "<SFTPAttributes: %s>" % self._debug_str() + return "<SFTPAttributes: {}>".format(self._debug_str()) # ...internals... @classmethod @@ -146,15 +146,15 @@ class SFTPAttributes(object): def _debug_str(self): out = "[ " if self.st_size is not None: - out += "size=%d " % self.st_size + out += "size={} ".format(self.st_size) if (self.st_uid is not None) and (self.st_gid is not None): - out += "uid=%d gid=%d " % (self.st_uid, self.st_gid) + out += "uid={} gid={} ".format(self.st_uid, self.st_gid) if self.st_mode is not None: out += "mode=" + oct(self.st_mode) + " " if (self.st_atime is not None) and (self.st_mtime is not None): - out += "atime=%d mtime=%d " % (self.st_atime, self.st_mtime) + out += "atime={} mtime={} ".format(self.st_atime, self.st_mtime) for k, v in self.attr.items(): - out += '"%s"=%r ' % (str(k), v) + out += '"{}"={!r} '.format(str(k), v) out += "]" return out @@ -227,6 +227,9 @@ class SFTPAttributes(object): if size is None: size = 0 + # TODO: not sure this actually worked as expected beforehand, leaving + # it untouched for the time being, re: .format() upgrade, until someone + # has time to doublecheck return "%s 1 %-8d %-8d %8d %-12s %s" % ( ks, uid, diff --git a/paramiko/sftp_client.py b/paramiko/sftp_client.py index 425aa87d..1a9147fc 100644 --- a/paramiko/sftp_client.py +++ b/paramiko/sftp_client.py @@ -131,7 +131,10 @@ class SFTPClient(BaseSFTP, ClosingContextManager): except EOFError: raise SSHException("EOF during negotiation") self._log( - INFO, "Opened sftp connection (server version %d)" % server_version + INFO, + "Opened sftp connection (server version {})".format( + server_version + ), ) @classmethod @@ -171,6 +174,8 @@ class SFTPClient(BaseSFTP, ClosingContextManager): for m in msg: self._log(level, m, *args) else: + # NOTE: these bits MUST continue using %-style format junk because + # logging.Logger.log() explicitly requires it. Grump. # escape '%' in msg (they could come from file or directory names) # before logging msg = msg.replace("%", "%%") @@ -230,7 +235,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): .. versionadded:: 1.2 """ path = self._adjust_cwd(path) - self._log(DEBUG, "listdir(%r)" % path) + self._log(DEBUG, "listdir({!r})".format(path)) t, msg = self._request(CMD_OPENDIR, path) if t != CMD_HANDLE: raise SFTPError("Expected handle") @@ -269,7 +274,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): .. versionadded:: 1.15 """ path = self._adjust_cwd(path) - self._log(DEBUG, "listdir(%r)" % path) + self._log(DEBUG, "listdir({!r})".format(path)) t, msg = self._request(CMD_OPENDIR, path) if t != CMD_HANDLE: @@ -351,7 +356,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :raises: ``IOError`` -- if the file could not be opened. """ filename = self._adjust_cwd(filename) - self._log(DEBUG, "open(%r, %r)" % (filename, mode)) + self._log(DEBUG, "open({!r}, {!r})".format(filename, mode)) imode = 0 if ("r" in mode) or ("+" in mode): imode |= SFTP_FLAG_READ @@ -369,7 +374,10 @@ class SFTPClient(BaseSFTP, ClosingContextManager): raise SFTPError("Expected handle") handle = msg.get_binary() self._log( - DEBUG, "open(%r, %r) -> %s" % (filename, mode, u(hexlify(handle))) + DEBUG, + "open({!r}, {!r}) -> {}".format( + filename, mode, u(hexlify(handle)) + ), ) return SFTPFile(self, handle, mode, bufsize) @@ -386,7 +394,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :raises: ``IOError`` -- if the path refers to a folder (directory) """ path = self._adjust_cwd(path) - self._log(DEBUG, "remove(%r)" % path) + self._log(DEBUG, "remove({!r})".format(path)) self._request(CMD_REMOVE, path) unlink = remove @@ -411,7 +419,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): """ oldpath = self._adjust_cwd(oldpath) newpath = self._adjust_cwd(newpath) - self._log(DEBUG, "rename(%r, %r)" % (oldpath, newpath)) + self._log(DEBUG, "rename({!r}, {!r})".format(oldpath, newpath)) self._request(CMD_RENAME, oldpath, newpath) def posix_rename(self, oldpath, newpath): @@ -431,7 +439,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): """ oldpath = self._adjust_cwd(oldpath) newpath = self._adjust_cwd(newpath) - self._log(DEBUG, "posix_rename(%r, %r)" % (oldpath, newpath)) + self._log(DEBUG, "posix_rename({!r}, {!r})".format(oldpath, newpath)) self._request( CMD_EXTENDED, "posix-rename@openssh.com", oldpath, newpath ) @@ -446,7 +454,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :param int mode: permissions (posix-style) for the newly-created folder """ path = self._adjust_cwd(path) - self._log(DEBUG, "mkdir(%r, %r)" % (path, mode)) + self._log(DEBUG, "mkdir({!r}, {!r})".format(path, mode)) attr = SFTPAttributes() attr.st_mode = mode self._request(CMD_MKDIR, path, attr) @@ -458,7 +466,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :param str path: name of the folder to remove """ path = self._adjust_cwd(path) - self._log(DEBUG, "rmdir(%r)" % path) + self._log(DEBUG, "rmdir({!r})".format(path)) self._request(CMD_RMDIR, path) def stat(self, path): @@ -481,7 +489,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): file """ path = self._adjust_cwd(path) - self._log(DEBUG, "stat(%r)" % path) + self._log(DEBUG, "stat({!r})".format(path)) t, msg = self._request(CMD_STAT, path) if t != CMD_ATTRS: raise SFTPError("Expected attributes") @@ -499,7 +507,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): file """ path = self._adjust_cwd(path) - self._log(DEBUG, "lstat(%r)" % path) + self._log(DEBUG, "lstat({!r})".format(path)) t, msg = self._request(CMD_LSTAT, path) if t != CMD_ATTRS: raise SFTPError("Expected attributes") @@ -513,7 +521,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :param str dest: path of the newly created symlink """ dest = self._adjust_cwd(dest) - self._log(DEBUG, "symlink(%r, %r)" % (source, dest)) + self._log(DEBUG, "symlink({!r}, {!r})".format(source, dest)) source = bytestring(source) self._request(CMD_SYMLINK, source, dest) @@ -527,7 +535,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :param int mode: new permissions """ path = self._adjust_cwd(path) - self._log(DEBUG, "chmod(%r, %r)" % (path, mode)) + self._log(DEBUG, "chmod({!r}, {!r})".format(path, mode)) attr = SFTPAttributes() attr.st_mode = mode self._request(CMD_SETSTAT, path, attr) @@ -544,7 +552,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :param int gid: new group id """ path = self._adjust_cwd(path) - self._log(DEBUG, "chown(%r, %r, %r)" % (path, uid, gid)) + self._log(DEBUG, "chown({!r}, {!r}, {!r})".format(path, uid, gid)) attr = SFTPAttributes() attr.st_uid, attr.st_gid = uid, gid self._request(CMD_SETSTAT, path, attr) @@ -566,7 +574,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): path = self._adjust_cwd(path) if times is None: times = (time.time(), time.time()) - self._log(DEBUG, "utime(%r, %r)" % (path, times)) + self._log(DEBUG, "utime({!r}, {!r})".format(path, times)) attr = SFTPAttributes() attr.st_atime, attr.st_mtime = times self._request(CMD_SETSTAT, path, attr) @@ -581,7 +589,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :param int size: the new size of the file """ path = self._adjust_cwd(path) - self._log(DEBUG, "truncate(%r, %r)" % (path, size)) + self._log(DEBUG, "truncate({!r}, {!r})".format(path, size)) attr = SFTPAttributes() attr.st_size = size self._request(CMD_SETSTAT, path, attr) @@ -596,7 +604,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :return: target path, as a `str` """ path = self._adjust_cwd(path) - self._log(DEBUG, "readlink(%r)" % path) + self._log(DEBUG, "readlink({!r})".format(path)) t, msg = self._request(CMD_READLINK, path) if t != CMD_NAME: raise SFTPError("Expected name response") @@ -604,7 +612,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): if count == 0: return None if count != 1: - raise SFTPError("Readlink returned %d results" % count) + raise SFTPError("Readlink returned {} results".format(count)) return _to_unicode(msg.get_string()) def normalize(self, path): @@ -620,13 +628,13 @@ class SFTPClient(BaseSFTP, ClosingContextManager): :raises: ``IOError`` -- if the path can't be resolved on the server """ path = self._adjust_cwd(path) - self._log(DEBUG, "normalize(%r)" % path) + self._log(DEBUG, "normalize({!r})".format(path)) t, msg = self._request(CMD_REALPATH, path) if t != CMD_NAME: raise SFTPError("Expected name response") count = msg.get_int() if count != 1: - raise SFTPError("Realpath returned %d results" % count) + raise SFTPError("Realpath returned {} results".format(count)) return msg.get_text() def chdir(self, path=None): @@ -649,9 +657,8 @@ class SFTPClient(BaseSFTP, ClosingContextManager): self._cwd = None return if not stat.S_ISDIR(self.stat(path).st_mode): - raise SFTPError( - errno.ENOTDIR, "%s: %s" % (os.strerror(errno.ENOTDIR), path) - ) + code = errno.ENOTDIR + raise SFTPError(code, "{}: {}".format(os.strerror(code), path)) self._cwd = b(self.normalize(path)) def getcwd(self): @@ -713,7 +720,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): s = self.stat(remotepath) if s.st_size != size: raise IOError( - "size mismatch in put! %d != %d" % (s.st_size, size) + "size mismatch in put! {} != {}".format(s.st_size, size) ) else: s = SFTPAttributes() @@ -796,7 +803,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): s = os.stat(localpath) if s.st_size != size: raise IOError( - "size mismatch in get! %d != %d" % (s.st_size, size) + "size mismatch in get! {} != {}".format(s.st_size, size) ) # ...internals... @@ -835,7 +842,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): try: t, data = self._read_packet() except EOFError as e: - raise SSHException("Server connection dropped: %s" % str(e)) + raise SSHException("Server connection dropped: {}".format(e)) msg = Message(data) num = msg.get_int() self._lock.acquire() @@ -843,7 +850,7 @@ class SFTPClient(BaseSFTP, ClosingContextManager): if num not in self._expecting: # might be response for a file that was closed before # responses came back - self._log(DEBUG, "Unexpected response #%d" % (num,)) + self._log(DEBUG, "Unexpected response #{}".format(num)) if waitfor is None: # just doing a single check break diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index 08003e43..0104d857 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -91,7 +91,7 @@ class SFTPFile(BufferedFile): # __del__.) if self._closed: return - self.sftp._log(DEBUG, "close(%s)" % u(hexlify(self.handle))) + self.sftp._log(DEBUG, "close({})".format(u(hexlify(self.handle)))) if self.pipelined: self.sftp._finish_responses(self) BufferedFile.close(self) @@ -293,7 +293,9 @@ class SFTPFile(BufferedFile): :param int mode: new permissions """ - self.sftp._log(DEBUG, "chmod(%s, %r)" % (hexlify(self.handle), mode)) + self.sftp._log( + DEBUG, "chmod({}, {!r})".format(hexlify(self.handle), mode) + ) attr = SFTPAttributes() attr.st_mode = mode self.sftp._request(CMD_FSETSTAT, self.handle, attr) @@ -309,7 +311,8 @@ class SFTPFile(BufferedFile): :param int gid: new group id """ self.sftp._log( - DEBUG, "chown(%s, %r, %r)" % (hexlify(self.handle), uid, gid) + DEBUG, + "chown({}, {!r}, {!r})".format(hexlify(self.handle), uid, gid), ) attr = SFTPAttributes() attr.st_uid, attr.st_gid = uid, gid @@ -330,7 +333,9 @@ class SFTPFile(BufferedFile): """ if times is None: times = (time.time(), time.time()) - self.sftp._log(DEBUG, "utime(%s, %r)" % (hexlify(self.handle), times)) + self.sftp._log( + DEBUG, "utime({}, {!r})".format(hexlify(self.handle), times) + ) attr = SFTPAttributes() attr.st_atime, attr.st_mtime = times self.sftp._request(CMD_FSETSTAT, self.handle, attr) @@ -344,7 +349,7 @@ class SFTPFile(BufferedFile): :param size: the new size of the file """ self.sftp._log( - DEBUG, "truncate(%s, %r)" % (hexlify(self.handle), size) + DEBUG, "truncate({}, {!r})".format(hexlify(self.handle), size) ) attr = SFTPAttributes() attr.st_size = size @@ -484,7 +489,9 @@ class SFTPFile(BufferedFile): .. versionadded:: 1.5.4 """ - self.sftp._log(DEBUG, "readv(%s, %r)" % (hexlify(self.handle), chunks)) + self.sftp._log( + DEBUG, "readv({}, {!r})".format(hexlify(self.handle), chunks) + ) read_chunks = [] for offset, size in chunks: diff --git a/paramiko/sftp_server.py b/paramiko/sftp_server.py index 5c23ea2b..8265df96 100644 --- a/paramiko/sftp_server.py +++ b/paramiko/sftp_server.py @@ -138,7 +138,7 @@ class SFTPServer(BaseSFTP, SubsystemHandler): def start_subsystem(self, name, transport, channel): self.sock = channel - self._log(DEBUG, "Started sftp server on channel %s" % repr(channel)) + self._log(DEBUG, "Started sftp server on channel {!r}".format(channel)) self._send_server_version() self.server.session_started() while True: @@ -238,9 +238,7 @@ class SFTPServer(BaseSFTP, SubsystemHandler): item._pack(msg) else: raise Exception( - "unknown type for {0!r} type {1!r}".format( - item, type(item) - ) + "unknown type for {!r} type {!r}".format(item, type(item)) ) self._send_packet(t, msg) @@ -249,7 +247,7 @@ class SFTPServer(BaseSFTP, SubsystemHandler): # must be error code self._send_status(request_number, handle) return - handle._set_name(b("hx%d" % self.next_handle)) + handle._set_name(b("hx{:d}".format(self.next_handle))) self.next_handle += 1 if folder: self.folder_table[handle._get_name()] = handle @@ -378,7 +376,7 @@ class SFTPServer(BaseSFTP, SubsystemHandler): return flags def _process(self, t, request_number, msg): - self._log(DEBUG, "Request: %s" % CMD_NAMES[t]) + self._log(DEBUG, "Request: {}".format(CMD_NAMES[t])) if t == CMD_OPEN: path = msg.get_text() flags = self._convert_pflags(msg.get_int()) diff --git a/paramiko/ssh_exception.py b/paramiko/ssh_exception.py index 4865288f..52bb23be 100644 --- a/paramiko/ssh_exception.py +++ b/paramiko/ssh_exception.py @@ -67,7 +67,7 @@ class BadAuthenticationType(AuthenticationException): self.args = (explanation, types) def __str__(self): - return "{0} (allowed_types={1!r})".format( + return "{} (allowed_types={!r})".format( SSHException.__str__(self), self.allowed_types ) @@ -115,7 +115,7 @@ class BadHostKeyException(SSHException): def __init__(self, hostname, got_key, expected_key): message = ( - "Host key for server {0} does not match: got {1}, expected {2}" + "Host key for server {} does not match: got {}, expected {}" ) # noqa message = message.format( hostname, got_key.get_base64(), expected_key.get_base64() @@ -139,8 +139,9 @@ class ProxyCommandFailure(SSHException): def __init__(self, command, error): SSHException.__init__( self, - '"ProxyCommand (%s)" returned non-zero exit status: %s' - % (command, error), + '"ProxyCommand ({})" returned non-zero exit status: {}'.format( + command, error + ), ) self.error = error # for unpickling diff --git a/paramiko/ssh_gss.py b/paramiko/ssh_gss.py index 025d9928..eb8826e0 100644 --- a/paramiko/ssh_gss.py +++ b/paramiko/ssh_gss.py @@ -296,9 +296,7 @@ class _SSH_GSSAPI(_SSH_GSSAuth): else: token = self._gss_ctxt.step(recv_token) except gssapi.GSSException: - message = "{0} Target: {1}".format( - sys.exc_info()[1], self._gss_host - ) + message = "{} Target: {}".format(sys.exc_info()[1], self._gss_host) raise gssapi.GSSException(message) self._gss_ctxt_status = self._gss_ctxt.established return token @@ -461,7 +459,7 @@ class _SSH_SSPI(_SSH_GSSAuth): error, token = self._gss_ctxt.authorize(recv_token) token = token[0].Buffer except pywintypes.error as e: - e.strerror += ", Target: {1}".format(e, self._gss_host) + e.strerror += ", Target: {}".format(e, self._gss_host) raise if error == 0: diff --git a/paramiko/transport.py b/paramiko/transport.py index 22348f87..f72eebaf 100644 --- a/paramiko/transport.py +++ b/paramiko/transport.py @@ -139,7 +139,7 @@ class Transport(threading.Thread, ClosingContextManager): _DECRYPT = object() _PROTO_ID = "2.0" - _CLIENT_ID = "paramiko_%s" % paramiko.__version__ + _CLIENT_ID = "paramiko_{}".format(paramiko.__version__) # These tuples of algorithm identifiers are in preference order; do not # reorder without reason! @@ -369,7 +369,7 @@ class Transport(threading.Thread, ClosingContextManager): break else: raise SSHException( - "Unable to connect to %s: %s" % (hostname, reason) + "Unable to connect to {}: {}".format(hostname, reason) ) # okay, normal socket-ish flow here... threading.Thread.__init__(self) @@ -456,17 +456,20 @@ class Transport(threading.Thread, ClosingContextManager): """ Returns a string representation of this object, for debugging. """ - out = "<paramiko.Transport at %s" % hex(long(id(self)) & xffffffff) + id_ = hex(long(id(self)) & xffffffff) + out = "<paramiko.Transport at {}".format(id_) if not self.active: out += " (unconnected)" else: if self.local_cipher != "": - out += " (cipher %s, %d bits)" % ( + out += " (cipher {}, {:d} bits)".format( self.local_cipher, self._cipher_info[self.local_cipher]["key-size"] * 8, ) if self.is_authenticated(): - out += " (active; %d open channel(s))" % len(self._channels) + out += " (active; {} open channel(s))".format( + len(self._channels) + ) elif self.initial_kex_done: out += " (connected; awaiting auth)" else: @@ -1106,7 +1109,7 @@ class Transport(threading.Thread, ClosingContextManager): m.add_boolean(wait) if data is not None: m.add(*data) - self._log(DEBUG, 'Sending global request "%s"' % kind) + self._log(DEBUG, 'Sending global request "{}"'.format(kind)) self._send_user_message(m) if not wait: return None @@ -1226,15 +1229,20 @@ class Transport(threading.Thread, ClosingContextManager): self._log(DEBUG, "Bad host key from server") self._log( DEBUG, - "Expected: %s: %s" - % (hostkey.get_name(), repr(hostkey.asbytes())), + "Expected: {}: {}".format( + hostkey.get_name(), repr(hostkey.asbytes()) + ), ) self._log( DEBUG, - "Got : %s: %s" % (key.get_name(), repr(key.asbytes())), + "Got : {}: {}".format( + key.get_name(), repr(key.asbytes()) + ), ) raise SSHException("Bad host key from server") - self._log(DEBUG, "Host key verified (%s)" % hostkey.get_name()) + self._log( + DEBUG, "Host key verified ({})".format(hostkey.get_name()) + ) if (pkey is not None) or (password is not None) or gss_auth or gss_kex: if gss_auth: @@ -1345,7 +1353,7 @@ class Transport(threading.Thread, ClosingContextManager): :param str username: the username to authenticate as :return: - `list` of auth types permissible for the next stage of + list of auth types permissible for the next stage of authentication (normally empty) :raises: @@ -1400,7 +1408,7 @@ class Transport(threading.Thread, ClosingContextManager): ``True`` if an attempt at an automated "interactive" password auth should be made if the server doesn't support normal password auth :return: - `list` of auth types permissible for the next stage of + list of auth types permissible for the next stage of authentication (normally empty) :raises: @@ -1473,7 +1481,7 @@ class Transport(threading.Thread, ClosingContextManager): an event to trigger when the authentication attempt is complete (whether it was successful or not) :return: - `list` of auth types permissible for the next stage of + list of auth types permissible for the next stage of authentication (normally empty) :raises: @@ -1531,7 +1539,7 @@ class Transport(threading.Thread, ClosingContextManager): :param callable handler: a handler for responding to server questions :param str submethods: a string list of desired submethods (optional) :return: - `list` of auth types permissible for the next stage of + list of auth types permissible for the next stage of authentication (normally empty). :raises: `.BadAuthenticationType` -- if public-key authentication isn't @@ -1583,7 +1591,6 @@ class Transport(threading.Thread, ClosingContextManager): :param bool gss_deleg_creds: Delegate credentials or not :return: list of auth types permissible for the next stage of authentication (normally empty) - :rtype: list :raises: `.BadAuthenticationType` -- if gssapi-with-mic isn't allowed by the server (and no event was passed in) :raises: @@ -1607,7 +1614,7 @@ class Transport(threading.Thread, ClosingContextManager): :param str username: The username to authenticate as. :returns: - a `list` of auth types permissible for the next stage of + a list of auth types permissible for the next stage of authentication (normally empty) :raises: `.BadAuthenticationType` -- if GSS-API Key Exchange was not performed (and no event was passed @@ -1805,7 +1812,9 @@ class Transport(threading.Thread, ClosingContextManager): raise SSHException("Unknown host key type") if not key.verify_ssh_sig(self.H, Message(sig)): raise SSHException( - "Signature verification (%s) failed." % self.host_key_type + "Signature verification ({}) failed.".format( + self.host_key_type + ) ) # noqa self.host_key = key @@ -1819,9 +1828,8 @@ class Transport(threading.Thread, ClosingContextManager): # Fallback to SHA1 for kex engines that fail to specify a hex # algorithm, or for e.g. transport tests that don't run kexinit. hash_algo = getattr(self.kex_engine, "hash_algo", None) - hash_select_msg = "kex engine %s specified hash_algo %r" % ( - self.kex_engine.__class__.__name__, - hash_algo, + hash_select_msg = "kex engine {} specified hash_algo {!r}".format( + self.kex_engine.__class__.__name__, hash_algo ) if hash_algo is None: hash_algo = sha1 @@ -1945,14 +1953,15 @@ class Transport(threading.Thread, ClosingContextManager): _active_threads.append(self) tid = hex(long(id(self)) & xffffffff) if self.server_mode: - self._log(DEBUG, "starting thread (server mode): %s" % tid) + self._log(DEBUG, "starting thread (server mode): {}".format(tid)) else: - self._log(DEBUG, "starting thread (client mode): %s" % tid) + self._log(DEBUG, "starting thread (client mode): {}".format(tid)) try: try: self.packetizer.write_all(b(self.local_version + "\r\n")) self._log( - DEBUG, "Local version/idstring: %s" % self.local_version + DEBUG, + "Local version/idstring: {}".format(self.local_version), ) # noqa self._check_banner() # The above is actually very much part of the handshake, but @@ -1984,8 +1993,9 @@ class Transport(threading.Thread, ClosingContextManager): if len(self._expected_packet) > 0: if ptype not in self._expected_packet: raise SSHException( - "Expecting packet from %r, got %d" - % (self._expected_packet, ptype) + "Expecting packet from {!r}, got {:d}".format( + self._expected_packet, ptype + ) ) # noqa self._expected_packet = tuple() if (ptype >= 30) and (ptype <= 41): @@ -2006,15 +2016,17 @@ class Transport(threading.Thread, ClosingContextManager): elif chanid in self.channels_seen: self._log( DEBUG, - "Ignoring message for dead channel %d" - % chanid, - ) # noqa + "Ignoring message for dead channel {:d}".format( # noqa + chanid + ), + ) else: self._log( ERROR, - "Channel request for unknown channel %d" - % chanid, - ) # noqa + "Channel request for unknown channel {:d}".format( # noqa + chanid + ), + ) break elif ( self.auth_handler is not None @@ -2030,7 +2042,7 @@ class Transport(threading.Thread, ClosingContextManager): # itself literally MSG_UNIMPLEMENTED, in which case, we # just shut up to avoid causing a useless loop). name = MSG_NAMES[ptype] - warning = "Oops, unhandled type {0} ({1!r})".format( + warning = "Oops, unhandled type {} ({!r})".format( ptype, name ) self._log(WARNING, warning) @@ -2050,7 +2062,7 @@ class Transport(threading.Thread, ClosingContextManager): except socket.error as e: if type(e.args) is tuple: if e.args: - emsg = "%s (%d)" % (e.args[1], e.args[0]) + emsg = "{} ({:d})".format(e.args[1], e.args[0]) else: # empty tuple, e.g. socket.timeout emsg = str(e) or repr(e) else: @@ -2091,11 +2103,11 @@ class Transport(threading.Thread, ClosingContextManager): # Log useful, non-duplicative line re: an agreed-upon algorithm. # Old code implied algorithms could be asymmetrical (different for # inbound vs outbound) so we preserve that possibility. - msg = "{0} agreed: ".format(which) + msg = "{} agreed: ".format(which) if local == remote: msg += local else: - msg += "local={0}, remote={1}".format(local, remote) + msg += "local={}, remote={}".format(local, remote) self._log(DEBUG, msg) # protocol stages @@ -2137,7 +2149,7 @@ class Transport(threading.Thread, ClosingContextManager): raise SSHException('Indecipherable protocol version "' + buf + '"') # save this server version string for later self.remote_version = buf - self._log(DEBUG, "Remote version/idstring: %s" % buf) + self._log(DEBUG, "Remote version/idstring: {}".format(buf)) # pull off any attached comment # NOTE: comment used to be stored in a variable and then...never used. # since 2003. ca 877cd974b8182d26fa76d566072917ea67b64e67 @@ -2151,9 +2163,9 @@ class Transport(threading.Thread, ClosingContextManager): version = segs[1] client = segs[2] if version != "1.99" and version != "2.0": - msg = "Incompatible version ({0} instead of 2.0)" + msg = "Incompatible version ({} instead of 2.0)" raise SSHException(msg.format(version)) - msg = "Connected (version {0}, client {1})".format(version, client) + msg = "Connected (version {}, client {})".format(version, client) self._log(INFO, msg) def _send_kex_init(self): @@ -2270,7 +2282,7 @@ class Transport(threading.Thread, ClosingContextManager): "Incompatible ssh peer (no acceptable kex algorithm)" ) # noqa self.kex_engine = self._kex_info[agreed_kex[0]](self) - self._log(DEBUG, "Kex agreed: %s" % agreed_kex[0]) + self._log(DEBUG, "Kex agreed: {}".format(agreed_kex[0])) if self.server_mode: available_server_keys = list( @@ -2387,7 +2399,8 @@ class Transport(threading.Thread, ClosingContextManager): len(agreed_local_compression) == 0 or len(agreed_remote_compression) == 0 ): - msg = "Incompatible ssh server (no acceptable compression) {0!r} {1!r} {2!r}" # noqa + msg = "Incompatible ssh server (no acceptable compression)" + msg += " {!r} {!r} {!r}" raise SSHException( msg.format( agreed_local_compression, @@ -2529,15 +2542,16 @@ class Transport(threading.Thread, ClosingContextManager): def _parse_disconnect(self, m): code = m.get_int() desc = m.get_text() - self._log(INFO, "Disconnect (code %d): %s" % (code, desc)) + self._log(INFO, "Disconnect (code {:d}): {}".format(code, desc)) def _parse_global_request(self, m): kind = m.get_text() - self._log(DEBUG, 'Received global request "%s"' % kind) + self._log(DEBUG, 'Received global request "{}"'.format(kind)) want_reply = m.get_boolean() if not self.server_mode: self._log( - DEBUG, 'Rejecting "%s" global request from server.' % kind + DEBUG, + 'Rejecting "{}" global request from server.'.format(kind), ) ok = False elif kind == "tcpip-forward": @@ -2592,7 +2606,7 @@ class Transport(threading.Thread, ClosingContextManager): chan._set_remote_channel( server_chanid, server_window_size, server_max_packet_size ) - self._log(DEBUG, "Secsh channel %d opened." % chanid) + self._log(DEBUG, "Secsh channel {:d} opened.".format(chanid)) if chanid in self.channel_events: self.channel_events[chanid].set() del self.channel_events[chanid] @@ -2608,8 +2622,9 @@ class Transport(threading.Thread, ClosingContextManager): reason_text = CONNECTION_FAILED_CODE.get(reason, "(unknown code)") self._log( ERROR, - "Secsh channel %d open FAILED: %s: %s" - % (chanid, reason_str, reason_text), + "Secsh channel {:d} open FAILED: {}: {}".format( + chanid, reason_str, reason_text + ), ) self.lock.acquire() try: @@ -2644,8 +2659,9 @@ class Transport(threading.Thread, ClosingContextManager): origin_port = m.get_int() self._log( DEBUG, - "Incoming x11 connection from %s:%d" - % (origin_addr, origin_port), + "Incoming x11 connection from {}:{:d}".format( + origin_addr, origin_port + ), ) self.lock.acquire() try: @@ -2659,8 +2675,9 @@ class Transport(threading.Thread, ClosingContextManager): origin_port = m.get_int() self._log( DEBUG, - "Incoming tcp forwarded connection from %s:%d" - % (origin_addr, origin_port), + "Incoming tcp forwarded connection from {}:{:d}".format( + origin_addr, origin_port + ), ) self.lock.acquire() try: @@ -2669,7 +2686,8 @@ class Transport(threading.Thread, ClosingContextManager): self.lock.release() elif not self.server_mode: self._log( - DEBUG, 'Rejecting "%s" channel request from server.' % kind + DEBUG, + 'Rejecting "{}" channel request from server.'.format(kind), ) reject = True reason = OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED @@ -2696,7 +2714,8 @@ class Transport(threading.Thread, ClosingContextManager): ) if reason != OPEN_SUCCEEDED: self._log( - DEBUG, 'Rejecting "%s" channel request from client.' % kind + DEBUG, + 'Rejecting "{}" channel request from client.'.format(kind), ) reject = True if reject: @@ -2730,7 +2749,9 @@ class Transport(threading.Thread, ClosingContextManager): m.add_int(self.default_window_size) m.add_int(self.default_max_packet_size) self._send_message(m) - self._log(DEBUG, "Secsh channel %d (%s) opened.", my_chanid, kind) + self._log( + DEBUG, "Secsh channel {:d} ({}) opened.".format(my_chanid, kind) + ) if kind == "auth-agent@openssh.com": self._forward_agent_handler(chan) elif kind == "x11": @@ -2747,7 +2768,7 @@ class Transport(threading.Thread, ClosingContextManager): m.get_boolean() # always_display msg = m.get_string() m.get_string() # language - self._log(DEBUG, "Debug msg: {0}".format(util.safe_string(msg))) + self._log(DEBUG, "Debug msg: {}".format(util.safe_string(msg))) def _get_subsystem_handler(self, name): try: @@ -2803,7 +2824,7 @@ class SecurityOptions(object): """ Returns a string representation of this object, for debugging. """ - return "<paramiko.SecurityOptions for %s>" % repr(self._transport) + return "<paramiko.SecurityOptions for {!r}>".format(self._transport) def _set(self, name, orig, x): if type(x) is list: diff --git a/paramiko/util.py b/paramiko/util.py index 1fb73da4..29c52bfb 100644 --- a/paramiko/util.py +++ b/paramiko/util.py @@ -102,19 +102,21 @@ def format_binary(data, prefix=""): def format_binary_line(data): - left = " ".join(["%02X" % byte_ord(c) for c in data]) - right = "".join([(".%c.." % c)[(byte_ord(c) + 63) // 95] for c in data]) - return "%-50s %s" % (left, right) + left = " ".join(["{:02X}".format(byte_ord(c)) for c in data]) + right = "".join( + [".{:c}..".format(byte_ord(c))[(byte_ord(c) + 63) // 95] for c in data] + ) + return "{:50s} {}".format(left, right) def safe_string(s): - out = b("") + out = b"" for c in s: i = byte_ord(c) if 32 <= i <= 127: out += byte_chr(i) else: - out += b("%%%02X" % i) + out += b("%{:02X}".format(i)) return out diff --git a/paramiko/win_pageant.py b/paramiko/win_pageant.py index d6afd5bd..a550b7f3 100644 --- a/paramiko/win_pageant.py +++ b/paramiko/win_pageant.py @@ -44,7 +44,7 @@ win32con_WM_COPYDATA = 74 def _get_pageant_window_object(): - return ctypes.windll.user32.FindWindowA(b("Pageant"), b("Pageant")) + return ctypes.windll.user32.FindWindowA(b"Pageant", b"Pageant") def can_talk_to_agent(): @@ -8,7 +8,7 @@ license_file = LICENSE omit = paramiko/_winapi.py [flake8] -exclude = sites,.git,build,dist,demos,tests +exclude = sites,.git,build,dist,demos # NOTE: W503, E203 are concessions to black 18.0b5 and could be reinstated # later if fixed on that end. # NOTE: E722 seems to only have started popping up on move to flake8 3.6.0 from @@ -22,3 +22,6 @@ max-line-length = 79 addopts = -p no:relaxed # Loop on failure looponfailroots = tests paramiko +# Ignore some warnings we cannot easily handle. +filterwarnings = + ignore::DeprecationWarning:pkg_resources @@ -65,15 +65,13 @@ setup( "Topic :: Security :: Cryptography", "Programming Language :: Python", "Programming Language :: Python :: 2", - "Programming Language :: Python :: 2.6", "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.2", - "Programming Language :: Python :: 3.3", "Programming Language :: Python :: 3.4", "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", ], install_requires=[ "bcrypt>=3.1.3", diff --git a/sites/shared_conf.py b/sites/shared_conf.py index 4d6a7b27..7bb503ce 100644 --- a/sites/shared_conf.py +++ b/sites/shared_conf.py @@ -21,12 +21,12 @@ html_sidebars = { } # Everything intersphinx's to Python -intersphinx_mapping = {"python": ("http://docs.python.org/2.6", None)} +intersphinx_mapping = {"python": ("https://docs.python.org/2.7/", None)} # Regular settings project = "Paramiko" year = datetime.now().year -copyright = "%d Jeff Forcier" % year +copyright = "{} Jeff Forcier".format(year) master_doc = "index" templates_path = ["_templates"] exclude_trees = ["_build"] diff --git a/sites/www/changelog.rst b/sites/www/changelog.rst index 91d170c1..86419aa7 100644 --- a/sites/www/changelog.rst +++ b/sites/www/changelog.rst @@ -11,6 +11,7 @@ Changelog import location of ``MutableMapping`` (used in host key management) to avoid the old location becoming deprecated in Python 3.8. Thanks to Josh Karpel for catch & patch. +- :release:`2.4.2 <2018-09-18>` - :release:`2.3.3 <2018-09-18>` - :release:`2.2.4 <2018-09-18>` - :release:`2.1.6 <2018-09-18>` @@ -52,6 +53,7 @@ Changelog - :support:`1262 backported` Add ``*.pub`` files to the MANIFEST so distributed source packages contain some necessary test assets. Credit: Alexander Kapshuna. +- :release:`2.4.1 <2018-03-12>` - :release:`2.3.2 <2018-03-12>` - :release:`2.2.3 <2018-03-12>` - :release:`2.1.5 <2018-03-12>` @@ -66,10 +68,32 @@ Changelog - :bug:`1039` Ed25519 auth key decryption raised an unexpected exception when given a unicode password string (typical in python 3). Report by Theodor van Nahl and fix by Pierce Lopez. +- :release:`2.4.0 <2017-11-14>` +- :feature:`-` Add a new ``passphrase`` kwarg to `SSHClient.connect + <paramiko.client.SSHClient.connect>` so users may disambiguate key-decryption + passphrases from password-auth passwords. (This is a backwards compatible + change; ``password`` will still pull double duty as a passphrase when + ``passphrase`` is not given.) +- :support:`-` Update ``tearDown`` of client test suite to avoid hangs due to + eternally blocking ``accept()`` calls on the internal server thread (which + can occur when test code raises an exception before actually connecting to + the server.) - :bug:`1108 (1.17+)` Rename a private method keyword argument (which was named ``async``) so that we're compatible with the upcoming Python 3.7 release (where ``async`` is a new keyword.) Thanks to ``@vEpiphyte`` for the report. +- :support:`1100` Updated the test suite & related docs/metadata/config to be + compatible with pytest instead of using the old, custom, crufty + unittest-based ``test.py``. + + This includes marking known-slow tests (mostly the SFTP ones) so they can be + filtered out by ``inv test``'s default behavior; as well as other minor + tweaks to test collection and/or display (for example, GSSAPI tests are + collected, but skipped, instead of not even being collected by default as in + ``test.py``.) - :support:`- backported` Include LICENSE file in wheel archives. +- :support:`1070` Drop Python 2.6 and Python 3.3 support; now only 2.7 and 3.4+ + are supported. If you're unable to upgrade from 2.6 or 3.3, please stick to + the Paramiko 2.3.x (or below) release lines. - :release:`2.3.1 <2017-09-22>` - :bug:`1071` Certificate support broke the no-certificate case for Ed25519 keys (symptom is an ``AttributeError`` about ``public_blob``.) This went diff --git a/sites/www/index.rst b/sites/www/index.rst index f0a5db8a..26961f24 100644 --- a/sites/www/index.rst +++ b/sites/www/index.rst @@ -1,11 +1,11 @@ Welcome to Paramiko! ==================== -Paramiko is a Python (2.6+, 3.3+) implementation of the SSHv2 protocol [#]_, +Paramiko is a Python (2.7, 3.4+) implementation of the SSHv2 protocol [#]_, providing both client and server functionality. While it leverages a Python C -extension for low level cryptography -(`Cryptography <https://cryptography.io>`_), Paramiko itself is a pure Python -interface around SSH networking concepts. +extension for low level cryptography (`Cryptography +<https://cryptography.io>`_), Paramiko itself is a pure Python interface around +SSH networking concepts. This website covers project information for Paramiko such as the changelog, contribution guidelines, development roadmap, news/blog, and so forth. Detailed diff --git a/sites/www/installing.rst b/sites/www/installing.rst index f335a9e7..e6db2dca 100644 --- a/sites/www/installing.rst +++ b/sites/www/installing.rst @@ -19,8 +19,8 @@ via `pip <http://pip-installer.org>`_:: $ pip install paramiko -We currently support **Python 2.6, 2.7, 3.3+, and PyPy**. Users on Python 2.5 -or older (or 3.2 or older) are urged to upgrade. +We currently support **Python 2.7, 3.4+, and PyPy**. Users on Python 2.6 or +older (or 3.3 or older) are urged to upgrade. Paramiko has only one direct hard dependency: the Cryptography library. See :ref:`cryptography`. @@ -7,6 +7,7 @@ from invocations import travis from invocations.checks import blacken from invocations.docs import docs, www, sites from invocations.packaging.release import ns as release_coll, publish +from invocations.testing import count_errors # TODO: this screams out for the invoke missing-feature of "I just wrap task X, @@ -39,7 +40,7 @@ def test( # running headless? Probably? if color: opts += " --color=yes" - opts += " --capture={0}".format(capture) + opts += " --capture={}".format(capture) if "-m" not in opts and not include_slow: opts += " -m 'not slow'" if k is not None and not ("-k" in opts if opts else False): @@ -122,7 +123,16 @@ def release(ctx, sdist=True, wheel=True, sign=True, dry_run=False, index=None): release_coll.tasks["publish"] = release ns = Collection( - test, coverage, guard, release_coll, docs, www, sites, travis, blacken + test, + coverage, + guard, + release_coll, + docs, + www, + sites, + count_errors, + travis, + blacken, ) ns.configure( { diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py index 100076d6..1528a0b8 100644 --- a/tests/stub_sftp.py +++ b/tests/stub_sftp.py @@ -21,18 +21,17 @@ A stub SFTP server for loopback SFTP testing. """ import os -import sys from paramiko import ( - ServerInterface, - SFTPServerInterface, - SFTPServer, + AUTH_SUCCESSFUL, + OPEN_SUCCEEDED, SFTPAttributes, SFTPHandle, - SFTP_OK, + SFTPServer, + SFTPServerInterface, SFTP_FAILURE, - AUTH_SUCCESSFUL, - OPEN_SUCCEEDED, + SFTP_OK, + ServerInterface, ) from paramiko.common import o666 @@ -65,7 +64,8 @@ class StubSFTPHandle(SFTPHandle): class StubSFTPServer(SFTPServerInterface): # assume current folder is a fine root - # (the tests always create and eventually delete a subfolder, so there shouldn't be any mess) + # (the tests always create and eventually delete a subfolder, so there + # shouldn't be any mess) ROOT = os.getcwd() def _realpath(self, path): @@ -206,7 +206,8 @@ class StubSFTPServer(SFTPServerInterface): # compute relative to path abspath = os.path.join(os.path.dirname(path), target_path) if abspath[: len(self.ROOT)] != self.ROOT: - # this symlink isn't going to work anyway -- just break it immediately + # this symlink isn't going to work anyway -- just break it + # immediately target_path = "<error>" try: os.symlink(target_path, path) diff --git a/tests/test_auth.py b/tests/test_auth.py index 6358a053..01fbac5b 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -142,7 +142,7 @@ class AuthTest(unittest.TestCase): self.assertTrue(self.event.is_set()) self.assertTrue(self.ts.is_active()) - def test_1_bad_auth_type(self): + def test_bad_auth_type(self): """ verify that we get the right exception when an unsupported auth type is requested. @@ -160,7 +160,7 @@ class AuthTest(unittest.TestCase): self.assertEqual(BadAuthenticationType, etype) self.assertEqual(["publickey"], evalue.allowed_types) - def test_2_bad_password(self): + def test_bad_password(self): """ verify that a bad password gets the right exception, and that a retry with the right password works. @@ -176,7 +176,7 @@ class AuthTest(unittest.TestCase): self.tc.auth_password(username="slowdive", password="pygmalion") self.verify_finished() - def test_3_multipart_auth(self): + def test_multipart_auth(self): """ verify that multipart auth works. """ @@ -191,7 +191,7 @@ class AuthTest(unittest.TestCase): self.assertEqual([], remain) self.verify_finished() - def test_4_interactive_auth(self): + def test_interactive_auth(self): """ verify keyboard-interactive auth works. """ @@ -210,7 +210,7 @@ class AuthTest(unittest.TestCase): self.assertEqual([], remain) self.verify_finished() - def test_5_interactive_auth_fallback(self): + def test_interactive_auth_fallback(self): """ verify that a password auth attempt will fallback to "interactive" if password auth isn't supported but interactive is. @@ -221,7 +221,7 @@ class AuthTest(unittest.TestCase): self.assertEqual([], remain) self.verify_finished() - def test_6_auth_utf8(self): + def test_auth_utf8(self): """ verify that utf-8 encoding happens in authentication. """ @@ -231,7 +231,7 @@ class AuthTest(unittest.TestCase): self.assertEqual([], remain) self.verify_finished() - def test_7_auth_non_utf8(self): + def test_auth_non_utf8(self): """ verify that non-utf-8 encoded passwords can be used for broken servers. @@ -242,7 +242,7 @@ class AuthTest(unittest.TestCase): self.assertEqual([], remain) self.verify_finished() - def test_8_auth_gets_disconnected(self): + def test_auth_gets_disconnected(self): """ verify that we catch a server disconnecting during auth, and report it as an auth failure. @@ -250,12 +250,13 @@ class AuthTest(unittest.TestCase): self.start_server() self.tc.connect(hostkey=self.public_host_key) try: - remain = self.tc.auth_password("bad-server", "hello") + self.tc.auth_password("bad-server", "hello") except: etype, evalue, etb = sys.exc_info() self.assertTrue(issubclass(etype, AuthenticationException)) - def test_9_auth_non_responsive(self): + @slow + def test_auth_non_responsive(self): """ verify that authentication times out if server takes to long to respond (or never responds). @@ -264,7 +265,7 @@ class AuthTest(unittest.TestCase): self.start_server() self.tc.connect() try: - remain = self.tc.auth_password("unresponsive-server", "hello") + self.tc.auth_password("unresponsive-server", "hello") except: etype, evalue, etb = sys.exc_info() self.assertTrue(issubclass(etype, AuthenticationException)) diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py index 28d6e4a2..61c99cc0 100644 --- a/tests/test_buffered_pipe.py +++ b/tests/test_buffered_pipe.py @@ -26,7 +26,6 @@ import unittest from paramiko.buffered_pipe import BufferedPipe, PipeTimeout from paramiko import pipe -from paramiko.py3compat import b def delay_thread(p): @@ -42,7 +41,7 @@ def close_thread(p): class BufferedPipeTest(unittest.TestCase): - def test_1_buffered_pipe(self): + def test_buffered_pipe(self): p = BufferedPipe() self.assertTrue(not p.read_ready()) p.feed("hello.") @@ -59,7 +58,7 @@ class BufferedPipeTest(unittest.TestCase): self.assertTrue(not p.read_ready()) self.assertEqual(b"", p.read(1)) - def test_2_delay(self): + def test_delay(self): p = BufferedPipe() self.assertTrue(not p.read_ready()) threading.Thread(target=delay_thread, args=(p,)).start() @@ -72,13 +71,13 @@ class BufferedPipeTest(unittest.TestCase): self.assertEqual(b"b", p.read(1, 1.0)) self.assertEqual(b"", p.read(1)) - def test_3_close_while_reading(self): + def test_close_while_reading(self): p = BufferedPipe() threading.Thread(target=close_thread, args=(p,)).start() data = p.read(1, 1.0) self.assertEqual(b"", data) - def test_4_or_pipe(self): + def test_or_pipe(self): p = pipe.make_pipe() p1, p2 = pipe.make_or_pipe(p) self.assertFalse(p._set) diff --git a/tests/test_channelfile.py b/tests/test_channelfile.py new file mode 100644 index 00000000..ffcbea3f --- /dev/null +++ b/tests/test_channelfile.py @@ -0,0 +1,41 @@ +from mock import patch, MagicMock + +from paramiko import Channel, ChannelFile, ChannelStderrFile + + +class TestChannelFile(object): + @patch("paramiko.channel.ChannelFile._set_mode") + def test_defaults_to_unbuffered_reading(self, setmode): + ChannelFile(Channel(None)) + setmode.assert_called_once_with("r", -1) + + @patch("paramiko.channel.ChannelFile._set_mode") + def test_can_override_mode_and_bufsize(self, setmode): + ChannelFile(Channel(None), mode="w", bufsize=25) + setmode.assert_called_once_with("w", 25) + + def test_read_recvs_from_channel(self): + chan = MagicMock() + cf = ChannelFile(chan) + cf.read(100) + chan.recv.assert_called_once_with(100) + + def test_write_calls_channel_sendall(self): + chan = MagicMock() + cf = ChannelFile(chan, mode="w") + cf.write("ohai") + chan.sendall.assert_called_once_with(b"ohai") + + +class TestChannelStderrFile(object): + def test_read_calls_channel_recv_stderr(self): + chan = MagicMock() + cf = ChannelStderrFile(chan) + cf.read(100) + chan.recv_stderr.assert_called_once_with(100) + + def test_write_calls_channel_sendall(self): + chan = MagicMock() + cf = ChannelStderrFile(chan, mode="w") + cf.write("ohai") + chan.sendall_stderr.assert_called_once_with(b"ohai") diff --git a/tests/test_client.py b/tests/test_client.py index bfbd395f..26de2d37 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -20,7 +20,7 @@ Some unit tests for SSHClient. """ -from __future__ import with_statement +from __future__ import with_statement, print_function import gc import os @@ -33,18 +33,23 @@ import warnings import weakref from tempfile import mkstemp +from pytest_relaxed import raises + import paramiko from paramiko.pkey import PublicBlob -from paramiko.common import PY2 from paramiko.ssh_exception import SSHException, AuthenticationException from .util import _support, slow +requires_gss_auth = unittest.skipUnless( + paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available" +) + FINGERPRINTS = { - "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", - "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", - "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", + "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", # noqa + "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", # noqa + "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", # noqa "ssh-ed25519": b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80', } @@ -107,7 +112,7 @@ class NullServer(paramiko.ServerInterface): return True -class SSHClientTest(unittest.TestCase): +class ClientTest(unittest.TestCase): def setUp(self): self.sockl = socket.socket() self.sockl.bind(("localhost", 0)) @@ -120,16 +125,42 @@ class SSHClientTest(unittest.TestCase): look_for_keys=False, ) self.event = threading.Event() + self.kill_event = threading.Event() def tearDown(self): - for attr in "tc ts socks sockl".split(): - if hasattr(self, attr): - getattr(self, attr).close() - - def _run(self, allowed_keys=None, delay=0, public_blob=None): + # Shut down client Transport + if hasattr(self, "tc"): + self.tc.close() + # Shut down shared socket + if hasattr(self, "sockl"): + # Signal to server thread that it should shut down early; it checks + # this immediately after accept(). (In scenarios where connection + # actually succeeded during the test, this becomes a no-op.) + self.kill_event.set() + # Forcibly connect to server sock in case the server thread is + # hanging out in its accept() (e.g. if the client side of the test + # fails before it even gets to connecting); there's no other good + # way to force an accept() to exit. + put_a_sock_in_it = socket.socket() + put_a_sock_in_it.connect((self.addr, self.port)) + put_a_sock_in_it.close() + # Then close "our" end of the socket (which _should_ cause the + # accept() to bail out, but does not, for some reason. I blame + # threading.) + self.sockl.close() + + def _run( + self, allowed_keys=None, delay=0, public_blob=None, kill_event=None + ): if allowed_keys is None: allowed_keys = FINGERPRINTS.keys() self.socks, addr = self.sockl.accept() + # If the kill event was set at this point, it indicates an early + # shutdown, so bail out now and don't even try setting up a Transport + # (which will just verbosely die.) + if kill_event and kill_event.is_set(): + self.socks.close() + return self.ts = paramiko.Transport(self.socks) keypath = _support("test_rsa.key") host_key = paramiko.RSAKey.from_private_key_file(keypath) @@ -149,7 +180,7 @@ class SSHClientTest(unittest.TestCase): The exception is ``allowed_keys`` which is stripped and handed to the ``NullServer`` used for testing. """ - run_kwargs = {} + run_kwargs = {"kill_event": self.kill_event} for key in ("allowed_keys", "public_blob"): run_kwargs[key] = kwargs.pop(key, None) # Server setup @@ -180,6 +211,12 @@ class SSHClientTest(unittest.TestCase): stdin, stdout, stderr = self.tc.exec_command("yes") schan = self.ts.accept(1.0) + # Nobody else tests the API of exec_command so let's do it here for + # now. :weary: + assert isinstance(stdin, paramiko.ChannelFile) + assert isinstance(stdout, paramiko.ChannelFile) + assert isinstance(stderr, paramiko.ChannelStderrFile) + schan.send("Hello there.\n") schan.send_stderr("This is on stderr.\n") schan.close() @@ -194,13 +231,15 @@ class SSHClientTest(unittest.TestCase): stdout.close() stderr.close() - def test_1_client(self): + +class SSHClientTest(ClientTest): + def test_client(self): """ verify that the SSHClient stuff works too. """ self._test_connection(password="pygmalion") - def test_2_client_dsa(self): + def test_client_dsa(self): """ verify that SSHClient works with a DSA key. """ @@ -212,7 +251,7 @@ class SSHClientTest(unittest.TestCase): """ self._test_connection(key_filename=_support("test_rsa.key")) - def test_2_5_client_ecdsa(self): + def test_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ @@ -221,7 +260,7 @@ class SSHClientTest(unittest.TestCase): def test_client_ed25519(self): self._test_connection(key_filename=_support("test_ed25519.key")) - def test_3_multiple_key_files(self): + def test_multiple_key_files(self): """ verify that SSHClient accepts and tries multiple key files. """ @@ -242,7 +281,7 @@ class SSHClientTest(unittest.TestCase): try: self._test_connection( key_filename=[ - _support("test_{0}.key".format(x)) for x in attempt + _support("test_{}.key".format(x)) for x in attempt ], allowed_keys=[types_[x] for x in accept], ) @@ -271,7 +310,7 @@ class SSHClientTest(unittest.TestCase): # server-side behavior is 100% identical.) # NOTE: only bothered whipping up one cert per overall class/family. for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"): - cert_name = "test_{0}.key-cert.pub".format(type_) + cert_name = "test_{}.key-cert.pub".format(type_) cert_path = _support(os.path.join("cert_support", cert_name)) self._test_connection( key_filename=cert_path, @@ -286,12 +325,12 @@ class SSHClientTest(unittest.TestCase): # that a specific cert was found, along with regular authorization # succeeding proving that the overall flow works. for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"): - key_name = "test_{0}.key".format(type_) + key_name = "test_{}.key".format(type_) key_path = _support(os.path.join("cert_support", key_name)) self._test_connection( key_filename=key_path, public_blob=PublicBlob.from_file( - "{0}-cert.pub".format(key_path) + "{}-cert.pub".format(key_path) ), ) @@ -301,7 +340,7 @@ class SSHClientTest(unittest.TestCase): # code path (!) so we're punting too, sob. pass - def test_4_auto_add_policy(self): + def test_auto_add_policy(self): """ verify that SSHClient's AutoAddPolicy works. """ @@ -324,7 +363,7 @@ class SSHClientTest(unittest.TestCase): new_host_key = list(self.tc.get_host_keys()[hostname].values())[0] self.assertEqual(public_host_key, new_host_key) - def test_5_save_host_keys(self): + def test_save_host_keys(self): """ verify that SSHClient correctly saves a known_hosts file. """ @@ -353,7 +392,7 @@ class SSHClientTest(unittest.TestCase): os.unlink(localname) - def test_6_cleanup(self): + def test_cleanup(self): """ verify that when an SSHClient is collected, its transport (and the transport's packetizer) is closed. @@ -406,7 +445,7 @@ class SSHClientTest(unittest.TestCase): self.assertTrue(self.tc._transport is None) - def test_7_banner_timeout(self): + def test_banner_timeout(self): """ verify that the SSHClient has a configurable banner timeout. """ @@ -425,7 +464,7 @@ class SSHClientTest(unittest.TestCase): kwargs = dict(self.connect_kwargs, banner_timeout=0.5) self.assertRaises(paramiko.SSHException, self.tc.connect, **kwargs) - def test_8_auth_trickledown(self): + def test_auth_trickledown(self): """ Failed key auth doesn't prevent subsequent pw auth from succeeding """ @@ -445,7 +484,8 @@ class SSHClientTest(unittest.TestCase): ) self._test_connection(**kwargs) - def test_9_auth_timeout(self): + @slow + def test_auth_timeout(self): """ verify that the SSHClient has a configurable auth timeout """ @@ -457,25 +497,23 @@ class SSHClientTest(unittest.TestCase): auth_timeout=0.5, ) - def test_10_auth_trickledown_gsskex(self): + @requires_gss_auth + def test_auth_trickledown_gsskex(self): """ - Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding + Failed gssapi-keyex doesn't prevent subsequent key from succeeding """ - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")]) self._test_connection(**kwargs) - def test_11_auth_trickledown_gssauth(self): + @requires_gss_auth + def test_auth_trickledown_gssauth(self): """ - Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding + Failed gssapi-with-mic doesn't prevent subsequent key from succeeding """ - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")]) self._test_connection(**kwargs) - def test_12_reject_policy(self): + def test_reject_policy(self): """ verify that SSHClient's RejectPolicy works. """ @@ -491,14 +529,14 @@ class SSHClientTest(unittest.TestCase): **self.connect_kwargs ) - def test_13_reject_policy_gsskex(self): + @requires_gss_auth + def test_reject_policy_gsskex(self): """ verify that SSHClient's RejectPolicy works, even if gssapi-keyex was enabled but not used. """ - # Test for a bug present in paramiko versions released before 2017-08-01 - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest + # Test for a bug present in paramiko versions released before + # 2017-08-01 threading.Thread(target=self._run).start() self.tc = paramiko.SSHClient() @@ -558,10 +596,7 @@ class SSHClientTest(unittest.TestCase): def test_host_key_negotiation_4(self): self._client_host_key_good(paramiko.RSAKey, "test_rsa.key") - def test_update_environment(self): - """ - Verify that environment variables can be set by the client. - """ + def _setup_for_env(self): threading.Thread(target=self._run).start() self.tc = paramiko.SSHClient() @@ -575,6 +610,11 @@ class SSHClientTest(unittest.TestCase): self.assertTrue(self.event.isSet()) self.assertTrue(self.ts.is_active()) + def test_update_environment(self): + """ + Verify that environment variables can be set by the client. + """ + self._setup_for_env() target_env = {b"A": b"B", b"C": b"d"} self.tc.exec_command("yes", environment=target_env) @@ -582,22 +622,20 @@ class SSHClientTest(unittest.TestCase): self.assertEqual(target_env, getattr(schan, "env", {})) schan.close() - # Cannot use assertRaises in context manager mode as it is not supported - # in Python 2.6. - try: + @unittest.skip("Clients normally fail silently, thus so do we, for now") + def test_env_update_failures(self): + self._setup_for_env() + with self.assertRaises(SSHException) as manager: # Verify that a rejection by the server can be detected self.tc.exec_command("yes", environment={b"INVALID_ENV": b""}) - except SSHException as e: - self.assertTrue( - "INVALID_ENV" in str(e), - "Expected variable name in error message", - ) - self.assertTrue( - isinstance(e.args[1], SSHException), - "Expected original SSHException in exception", - ) - else: - self.assertFalse(False, "SSHException was not thrown.") + self.assertTrue( + "INVALID_ENV" in str(manager.exception), + "Expected variable name in error message", + ) + self.assertTrue( + isinstance(manager.exception.args[1], SSHException), + "Expected original SSHException in exception", + ) def test_missing_key_policy_accepts_classes_or_instances(self): """ @@ -614,3 +652,49 @@ class SSHClientTest(unittest.TestCase): # Hand in just the class (new behavior) client.set_missing_host_key_policy(paramiko.AutoAddPolicy) assert isinstance(client._policy, paramiko.AutoAddPolicy) + + +class PasswordPassphraseTests(ClientTest): + # TODO: most of these could reasonably be set up to use mocks/assertions + # (e.g. "gave passphrase -> expect PKey was given it as the passphrase") + # instead of suffering a real connection cycle. + # TODO: in that case, move the below to be part of an integration suite? + + def test_password_kwarg_works_for_password_auth(self): + # Straightforward / duplicate of earlier basic password test. + self._test_connection(password="pygmalion") + + # TODO: more granular exception pending #387; should be signaling "no auth + # methods available" because no key and no password + @raises(SSHException) + def test_passphrase_kwarg_not_used_for_password_auth(self): + # Using the "right" password in the "wrong" field shouldn't work. + self._test_connection(passphrase="pygmalion") + + def test_passphrase_kwarg_used_for_key_passphrase(self): + # Straightforward again, with new passphrase kwarg. + self._test_connection( + key_filename=_support("test_rsa_password.key"), + passphrase="television", + ) + + def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given( + self + ): # noqa + # Backwards compatibility: passphrase in the password field. + self._test_connection( + key_filename=_support("test_rsa_password.key"), + password="television", + ) + + @raises(AuthenticationException) # TODO: more granular + def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( # noqa + self + ): + # Sanity: if we're given both fields, the password field is NOT used as + # a passphrase. + self._test_connection( + key_filename=_support("test_rsa_password.key"), + password="television", + passphrase="wat? lol no", + ) diff --git a/tests/test_file.py b/tests/test_file.py index fba14b1b..2a3da74b 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -52,7 +52,7 @@ class LoopbackFile(BufferedFile): class BufferedFileTest(unittest.TestCase): - def test_1_simple(self): + def test_simple(self): f = LoopbackFile("r") try: f.write(b"hi") @@ -69,7 +69,7 @@ class BufferedFileTest(unittest.TestCase): pass f.close() - def test_2_readline(self): + def test_readline(self): f = LoopbackFile("r+U") f.write( b"First line.\nSecond line.\r\nThird line.\n" @@ -96,7 +96,7 @@ class BufferedFileTest(unittest.TestCase): self.assertTrue(crlf in f.newlines) self.assertTrue(cr_byte not in f.newlines) - def test_3_lf(self): + def test_lf(self): """ try to trick the linefeed detector. """ @@ -108,7 +108,7 @@ class BufferedFileTest(unittest.TestCase): f.close() self.assertEqual(f.newlines, crlf) - def test_4_write(self): + def test_write(self): """ verify that write buffering is on. """ @@ -120,7 +120,7 @@ class BufferedFileTest(unittest.TestCase): self.assertEqual(f.readline(), "Incomplete line...\n") f.close() - def test_5_flush(self): + def test_flush(self): """ verify that flush will force a write. """ @@ -134,7 +134,7 @@ class BufferedFileTest(unittest.TestCase): self.assertEqual(f.read(3), b"") f.close() - def test_6_buffering(self): + def test_buffering_flushes(self): """ verify that flushing happens automatically on buffer crossing. """ @@ -147,7 +147,7 @@ class BufferedFileTest(unittest.TestCase): self.assertEqual(f.read(20), b"Too small. Enough.") f.close() - def test_7_read_all(self): + def test_read_all(self): """ verify that read(-1) returns everything left in the file. """ @@ -162,30 +162,30 @@ class BufferedFileTest(unittest.TestCase): ) f.close() - def test_8_buffering(self): + def test_buffering_writes(self): """ verify that buffered objects can be written """ if sys.version_info[0] == 2: f = LoopbackFile("r+", 16) - f.write(buffer(b"Too small.")) + f.write(buffer(b"Too small.")) # noqa f.close() - def test_9_readable(self): + def test_readable(self): f = LoopbackFile("r") self.assertTrue(f.readable()) self.assertFalse(f.writable()) self.assertFalse(f.seekable()) f.close() - def test_A_writable(self): + def test_writable(self): f = LoopbackFile("w") self.assertTrue(f.writable()) self.assertFalse(f.readable()) self.assertFalse(f.seekable()) f.close() - def test_B_readinto(self): + def test_readinto(self): data = bytearray(5) f = LoopbackFile("r+") f._write(b"hello") @@ -215,7 +215,7 @@ class BufferedFileTest(unittest.TestCase): offsets = range(0, len(data), 8) with LoopbackFile("rb+") as f: for offset in offsets: - f.write(buffer(data, offset, 8)) + f.write(buffer(data, offset, 8)) # noqa self.assertEqual(f.read(), data) @needs_builtin("memoryview") diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index 3e8c39e8..46d5bbd1 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -30,14 +30,14 @@ from .util import needs_gssapi @needs_gssapi class GSSAPITest(unittest.TestCase): - def setup(): + def setup(self): # TODO: these vars should all come from os.environ or whatever the # approved pytest method is for runtime-configuring test data. self.krb5_mech = "1.2.840.113554.1.2.2" self.targ_name = "hostname" self.server_mode = False - def test_1_pyasn1(self): + def test_pyasn1(self): """ Test the used methods of pyasn1. """ @@ -48,7 +48,7 @@ class GSSAPITest(unittest.TestCase): mech, __ = decoder.decode(oid) self.assertEquals(self.krb5_mech, mech.__str__()) - def test_2_gssapi_sspi(self): + def test_gssapi_sspi(self): """ Test the used methods of python-gssapi or sspi, sspicon from pywin32. """ diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py index 295153dd..da47362c 100644 --- a/tests/test_hostkeys.py +++ b/tests/test_hostkeys.py @@ -62,7 +62,7 @@ class HostKeysTest(unittest.TestCase): def tearDown(self): os.unlink("hostfile.temp") - def test_1_load(self): + def test_load(self): hostdict = paramiko.HostKeys("hostfile.temp") self.assertEqual(2, len(hostdict)) self.assertEqual(1, len(list(hostdict.values())[0])) @@ -72,7 +72,7 @@ class HostKeysTest(unittest.TestCase): ).upper() self.assertEqual(b"E6684DB30E109B67B70FF1DC5C7F1363", fp) - def test_2_add(self): + def test_add(self): hostdict = paramiko.HostKeys("hostfile.temp") hh = "|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c=" key = paramiko.RSAKey(data=decodebytes(keyblob)) @@ -83,7 +83,7 @@ class HostKeysTest(unittest.TestCase): self.assertEqual(b"7EC91BB336CB6D810B124B1353C32396", fp) self.assertTrue(hostdict.check("foo.example.com", key)) - def test_3_dict(self): + def test_dict(self): hostdict = paramiko.HostKeys("hostfile.temp") self.assertTrue("secure.example.com" in hostdict) self.assertTrue("not.example.com" not in hostdict) @@ -98,7 +98,7 @@ class HostKeysTest(unittest.TestCase): i += 1 self.assertEqual(2, i) - def test_4_dict_set(self): + def test_dict_set(self): hostdict = paramiko.HostKeys("hostfile.temp") key = paramiko.RSAKey(data=decodebytes(keyblob)) key_dss = paramiko.DSSKey(data=decodebytes(keyblob_dss)) @@ -122,10 +122,10 @@ class HostKeysTest(unittest.TestCase): def test_delitem(self): hostdict = paramiko.HostKeys("hostfile.temp") target = "happy.example.com" - entry = hostdict[target] # will KeyError if not present + hostdict[target] # will KeyError if not present del hostdict[target] try: - entry = hostdict[target] + hostdict[target] except KeyError: pass # Good else: diff --git a/tests/test_kex.py b/tests/test_kex.py index 65eb9a17..69492ee2 100644 --- a/tests/test_kex.py +++ b/tests/test_kex.py @@ -33,8 +33,6 @@ from paramiko.kex_gex import KexGex, KexGexSHA256 from paramiko import Message from paramiko.common import byte_chr from paramiko.kex_ecdh_nist import KexNistp256 -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.asymmetric import ec def dummy_urandom(n): @@ -42,8 +40,8 @@ def dummy_urandom(n): def dummy_generate_key_pair(obj): - private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037 - public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989" + private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037 # noqa + public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989" # noqa public_key_numbers_obj = ec.EllipticCurvePublicNumbers.from_encoded_point( ec.SECP256R1(), unhexlify(public_key_numbers) ) @@ -72,7 +70,7 @@ class FakeKey(object): class FakeModulusPack(object): - P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF + P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF # noqa G = 2 def get_modulus(self, min, ask, max): @@ -113,7 +111,7 @@ class FakeTransport(object): class KexTest(unittest.TestCase): - K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504 + K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504 # noqa def setUp(self): self._original_urandom = os.urandom @@ -125,12 +123,12 @@ class KexTest(unittest.TestCase): os.urandom = self._original_urandom KexNistp256._generate_key_pair = self._original_generate_key_pair - def test_1_group1_client(self): + def test_group1_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGroup1(transport) kex.start_kex() - x = b"1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" + x = b"1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect @@ -149,7 +147,7 @@ class KexTest(unittest.TestCase): self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) - def test_2_group1_server(self): + def test_group1_server(self): transport = FakeTransport() transport.server_mode = True kex = KexGroup1(transport) @@ -163,13 +161,13 @@ class KexTest(unittest.TestCase): msg.rewind() kex.parse_next(paramiko.kex_group1._MSG_KEXDH_INIT, msg) H = b"B16BF34DD10945EDE84E9C1EF24A14BFDC843389" - x = b"1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" + x = b"1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(self.K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) - def test_3_gex_client(self): + def test_gex_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGex(transport) @@ -185,7 +183,7 @@ class KexTest(unittest.TestCase): msg.add_mpint(FakeModulusPack.G) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) - x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" + x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect @@ -203,7 +201,7 @@ class KexTest(unittest.TestCase): self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) - def test_4_gex_old_client(self): + def test_gex_old_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGex(transport) @@ -219,7 +217,7 @@ class KexTest(unittest.TestCase): msg.add_mpint(FakeModulusPack.G) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) - x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" + x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect @@ -237,7 +235,7 @@ class KexTest(unittest.TestCase): self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) - def test_5_gex_server(self): + def test_gex_server(self): transport = FakeTransport() transport.server_mode = True kex = KexGex(transport) @@ -256,7 +254,7 @@ class KexTest(unittest.TestCase): msg.add_int(4096) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg) - x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" + x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect @@ -266,15 +264,15 @@ class KexTest(unittest.TestCase): msg.add_mpint(12345) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) - K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 + K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa H = b"CE754197C21BF3452863B4F44D0B3951F12516EF" - x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" + x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) - def test_6_gex_server_with_old_client(self): + def test_gex_server_with_old_client(self): transport = FakeTransport() transport.server_mode = True kex = KexGex(transport) @@ -291,7 +289,7 @@ class KexTest(unittest.TestCase): msg.add_int(2048) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg) - x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" + x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect @@ -301,15 +299,15 @@ class KexTest(unittest.TestCase): msg.add_mpint(12345) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) - K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 + K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa H = b"B41A06B2E59043CEFC1AE16EC31F1E2D12EC455B" - x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" + x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) - def test_7_gex_sha256_client(self): + def test_gex_sha256_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGexSHA256(transport) @@ -325,7 +323,7 @@ class KexTest(unittest.TestCase): msg.add_mpint(FakeModulusPack.G) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) - x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" + x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect @@ -343,7 +341,7 @@ class KexTest(unittest.TestCase): self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) - def test_8_gex_sha256_old_client(self): + def test_gex_sha256_old_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGexSHA256(transport) @@ -359,7 +357,7 @@ class KexTest(unittest.TestCase): msg.add_mpint(FakeModulusPack.G) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) - x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" + x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect @@ -377,7 +375,7 @@ class KexTest(unittest.TestCase): self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) - def test_9_gex_sha256_server(self): + def test_gex_sha256_server(self): transport = FakeTransport() transport.server_mode = True kex = KexGexSHA256(transport) @@ -396,7 +394,7 @@ class KexTest(unittest.TestCase): msg.add_int(4096) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg) - x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" + x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect @@ -406,15 +404,15 @@ class KexTest(unittest.TestCase): msg.add_mpint(12345) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) - K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 + K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa H = b"CCAC0497CF0ABA1DBF55E1A3995D17F4CC31824B0E8D95CDF8A06F169D050D80" - x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" + x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) - def test_10_gex_sha256_server_with_old_client(self): + def test_gex_sha256_server_with_old_client(self): transport = FakeTransport() transport.server_mode = True kex = KexGexSHA256(transport) @@ -431,7 +429,7 @@ class KexTest(unittest.TestCase): msg.add_int(2048) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg) - x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" + x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect @@ -441,16 +439,16 @@ class KexTest(unittest.TestCase): msg.add_mpint(12345) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) - K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 + K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa H = b"3DDD2AD840AD095E397BA4D0573972DC60F6461FD38A187CACA6615A5BC8ADBB" - x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" + x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) - def test_11_kex_nistp256_client(self): - K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 + def test_kex_nistp256_client(self): + K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 # noqa transport = FakeTransport() transport.server_mode = False kex = KexNistp256(transport) @@ -463,7 +461,7 @@ class KexTest(unittest.TestCase): msg = Message() msg.add_string("fake-host-key") Q_S = unhexlify( - "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" + "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" # noqa ) msg.add_string(Q_S) msg.add_string("fake-sig") @@ -475,8 +473,8 @@ class KexTest(unittest.TestCase): self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) - def test_12_kex_nistp256_server(self): - K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 + def test_kex_nistp256_server(self): + K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 # noqa transport = FakeTransport() transport.server_mode = True kex = KexNistp256(transport) @@ -488,7 +486,7 @@ class KexTest(unittest.TestCase): # fake init msg = Message() Q_C = unhexlify( - "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" + "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" # noqa ) H = b"2EF4957AFD530DD3F05DBEABF68D724FACC060974DA9704F2AEE4C3DE861E7CA" msg.add_string(Q_C) diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index c71ff91c..42e0a101 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -142,7 +142,7 @@ class GSSKexTest(unittest.TestCase): stdout.close() stderr.close() - def test_1_gsskex_and_auth(self): + def test_gsskex_and_auth(self): """ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated Diffie-Hellman Key Exchange and user authentication with the GSS-API @@ -150,7 +150,7 @@ class GSSKexTest(unittest.TestCase): """ self._test_gsskex_and_auth(gss_host=None) - def test_2_gsskex_and_auth_rekey(self): + def test_gsskex_and_auth_rekey(self): """ Verify that Paramiko can rekey. """ diff --git a/tests/test_message.py b/tests/test_message.py index b843a705..57766d90 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -29,14 +29,14 @@ from paramiko.common import byte_chr, zero_byte class MessageTest(unittest.TestCase): __a = ( - b"\x00\x00\x00\x17\x07\x60\xe0\x90\x00\x00\x00\x01\x71\x00\x00\x00\x05\x68\x65\x6c\x6c\x6f\x00\x00\x03\xe8" + b"\x00\x00\x00\x17\x07\x60\xe0\x90\x00\x00\x00\x01\x71\x00\x00\x00\x05\x68\x65\x6c\x6c\x6f\x00\x00\x03\xe8" # noqa + b"x" * 1000 ) - __b = b"\x01\x00\xf3\x00\x3f\x00\x00\x00\x10\x68\x75\x65\x79\x2c\x64\x65\x77\x65\x79\x2c\x6c\x6f\x75\x69\x65" - __c = b"\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x01\x11\x00\x00\x00\x07\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x06\x9a\x1b\x2c\x3d\x4e\xf7" - __d = b"\x00\x00\x00\x05\xff\x00\x00\x00\x05\x11\x22\x33\x44\x55\xff\x00\x00\x00\x0a\x00\xf0\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03\x63\x61\x74\x00\x00\x00\x03\x61\x2c\x62" + __b = b"\x01\x00\xf3\x00\x3f\x00\x00\x00\x10\x68\x75\x65\x79\x2c\x64\x65\x77\x65\x79\x2c\x6c\x6f\x75\x69\x65" # noqa + __c = b"\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x01\x11\x00\x00\x00\x07\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x06\x9a\x1b\x2c\x3d\x4e\xf7" # noqa + __d = b"\x00\x00\x00\x05\xff\x00\x00\x00\x05\x11\x22\x33\x44\x55\xff\x00\x00\x00\x0a\x00\xf0\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03\x63\x61\x74\x00\x00\x00\x03\x61\x2c\x62" # noqa - def test_1_encode(self): + def test_encode(self): msg = Message() msg.add_int(23) msg.add_int(123789456) @@ -62,7 +62,7 @@ class MessageTest(unittest.TestCase): msg.add_mpint(-0x65e4d3c2b109) self.assertEqual(msg.asbytes(), self.__c) - def test_2_decode(self): + def test_decode(self): msg = Message(self.__a) self.assertEqual(msg.get_int(), 23) self.assertEqual(msg.get_int(), 123789456) @@ -84,7 +84,7 @@ class MessageTest(unittest.TestCase): self.assertEqual(msg.get_mpint(), 0xf5e4d3c2b109) self.assertEqual(msg.get_mpint(), -0x65e4d3c2b109) - def test_3_add(self): + def test_add(self): msg = Message() msg.add(5) msg.add(0x1122334455) @@ -94,7 +94,7 @@ class MessageTest(unittest.TestCase): msg.add(["a", "b"]) self.assertEqual(msg.asbytes(), self.__d) - def test_4_misc(self): + def test_misc(self): msg = Message(self.__d) self.assertEqual(msg.get_adaptive_int(), 5) self.assertEqual(msg.get_adaptive_int(), 0x1122334455) diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py index 6920f08e..de80770e 100644 --- a/tests/test_packetizer.py +++ b/tests/test_packetizer.py @@ -38,7 +38,7 @@ x1f = byte_chr(0x1f) class PacketizerTest(unittest.TestCase): - def test_1_write(self): + def test_write(self): rsock = LoopSocket() wsock = LoopSocket() rsock.link(wsock) @@ -64,11 +64,11 @@ class PacketizerTest(unittest.TestCase): # 32 + 12 bytes of MAC = 44 self.assertEqual(44, len(data)) self.assertEqual( - b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0", + b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0", # noqa data[:16], ) - def test_2_read(self): + def test_read(self): rsock = LoopSocket() wsock = LoopSocket() rsock.link(wsock) @@ -82,7 +82,7 @@ class PacketizerTest(unittest.TestCase): ).decryptor() p.set_inbound_cipher(decryptor, 16, sha1, 12, x1f * 20) wsock.send( - b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0\x90\xd2\x16\x56\x0d\x71\x73\x61\x38\x7c\x4c\x3d\xfb\x97\x7d\xe2\x6e\x03\xb1\xa0\xc2\x1c\xd6\x41\x41\x4c\xb4\x59" + b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0\x90\xd2\x16\x56\x0d\x71\x73\x61\x38\x7c\x4c\x3d\xfb\x97\x7d\xe2\x6e\x03\xb1\xa0\xc2\x1c\xd6\x41\x41\x4c\xb4\x59" # noqa ) cmd, m = p.read_message() self.assertEqual(100, cmd) @@ -90,7 +90,7 @@ class PacketizerTest(unittest.TestCase): self.assertEqual(1, m.get_int()) self.assertEqual(900, m.get_int()) - def test_3_closed(self): + def test_closed(self): if sys.platform.startswith("win"): # no SIGALRM on windows return rsock = LoopSocket() diff --git a/tests/test_pkey.py b/tests/test_pkey.py index a43140ed..0e60126a 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -25,7 +25,6 @@ import unittest import os from binascii import hexlify from hashlib import md5 -import base64 from paramiko import RSAKey, DSSKey, ECDSAKey, Ed25519Key, Message, util from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2 @@ -34,18 +33,18 @@ from .util import _support # from openssh's ssh-keygen -PUB_RSA = "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c=" -PUB_DSS = "ssh-dss AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgwj/oLjYbVtjAy6pl/h0EKCWx2rf1IetyNsTxWrniA9I6HeDj65X1FyDkg6g8tvCnaNB8Xp/UUhuzHuGsMIipRxBxw9LF608EqZcj1E3ytktoW5B5OcjrkEoz3xG7C+rpIjYvAAAAFQDwz4UnmsGiSNu5iqjn3uTzwUpshwAAAIEAkxfFeY8P2wZpDjX0MimZl5wkoFQDL25cPzGBuB4OnB8NoUk/yjAHIIpEShw8V+LzouMK5CTJQo5+Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg4Ok10+XFDxlqZo8Y+wAAACARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmnjO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacIBlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgE=" -PUB_ECDSA_256 = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJSPZm3ZWkvk/Zx8WP+fZRZ5/NBBHnGQwR6uIC6XHGPDIHuWUzIjAwA0bzqkOUffEsbLe+uQgKl5kbc/L8KA/eo=" -PUB_ECDSA_384 = "ecdsa-sha2-nistp384 AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBBbGibQLW9AAZiGN2hEQxWYYoFaWKwN3PKSaDJSMqmIn1Z9sgRUuw8Y/w502OGvXL/wFk0i2z50l3pWZjD7gfMH7gX5TUiCzwrQkS+Hn1U2S9aF5WJp0NcIzYxXw2r4M2A==" -PUB_ECDSA_521 = "ecdsa-sha2-nistp521 AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBACaOaFLZGuxa5AW16qj6VLypFbLrEWrt9AZUloCMefxO8bNLjK/O5g0rAVasar1TnyHE9qj4NwzANZASWjQNbc4MAG8vzqezFwLIn/kNyNTsXNfqEko9OgHZknlj2Z79dwTJcRAL4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA==" +PUB_RSA = "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c=" # noqa +PUB_DSS = "ssh-dss AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgwj/oLjYbVtjAy6pl/h0EKCWx2rf1IetyNsTxWrniA9I6HeDj65X1FyDkg6g8tvCnaNB8Xp/UUhuzHuGsMIipRxBxw9LF608EqZcj1E3ytktoW5B5OcjrkEoz3xG7C+rpIjYvAAAAFQDwz4UnmsGiSNu5iqjn3uTzwUpshwAAAIEAkxfFeY8P2wZpDjX0MimZl5wkoFQDL25cPzGBuB4OnB8NoUk/yjAHIIpEShw8V+LzouMK5CTJQo5+Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg4Ok10+XFDxlqZo8Y+wAAACARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmnjO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacIBlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgE=" # noqa +PUB_ECDSA_256 = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJSPZm3ZWkvk/Zx8WP+fZRZ5/NBBHnGQwR6uIC6XHGPDIHuWUzIjAwA0bzqkOUffEsbLe+uQgKl5kbc/L8KA/eo=" # noqa +PUB_ECDSA_384 = "ecdsa-sha2-nistp384 AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBBbGibQLW9AAZiGN2hEQxWYYoFaWKwN3PKSaDJSMqmIn1Z9sgRUuw8Y/w502OGvXL/wFk0i2z50l3pWZjD7gfMH7gX5TUiCzwrQkS+Hn1U2S9aF5WJp0NcIzYxXw2r4M2A==" # noqa +PUB_ECDSA_521 = "ecdsa-sha2-nistp521 AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBACaOaFLZGuxa5AW16qj6VLypFbLrEWrt9AZUloCMefxO8bNLjK/O5g0rAVasar1TnyHE9qj4NwzANZASWjQNbc4MAG8vzqezFwLIn/kNyNTsXNfqEko9OgHZknlj2Z79dwTJcRAL4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA==" # noqa FINGER_RSA = "1024 60:73:38:44:cb:51:86:65:7f:de:da:a2:2b:5a:57:d5" FINGER_DSS = "1024 44:78:f0:b9:a2:3c:c5:18:20:09:ff:75:5b:c1:d2:6c" FINGER_ECDSA_256 = "256 25:19:eb:55:e6:a1:47:ff:4f:38:d2:75:6f:a5:d5:60" FINGER_ECDSA_384 = "384 c1:8d:a0:59:09:47:41:8e:a8:a6:07:01:29:23:b4:65" FINGER_ECDSA_521 = "521 44:58:22:52:12:33:16:0e:ce:0e:be:2c:7c:7e:cc:1e" -SIGNED_RSA = "20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8" +SIGNED_RSA = "20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8" # noqa RSA_PRIVATE_OUT = """\ -----BEGIN RSA PRIVATE KEY----- @@ -109,8 +108,8 @@ L4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA== x1234 = b"\x01\x02\x03\x04" -TEST_KEY_BYTESTR_2 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x81\x00\xd3\x8fV\xea\x07\x85\xa6k%\x8d<\x1f\xbc\x8dT\x98\xa5\x96$\xf3E#\xbe>\xbc\xd2\x93\x93\x87f\xceD\x18\xdb \x0c\xb3\xa1a\x96\xf8e#\xcc\xacS\x8a#\xefVlE\x83\x1epv\xc1o\x17M\xef\xdf\x89DUXL\xa6\x8b\xaa<\x06\x10\xd7\x93w\xec\xaf\xe2\xaf\x95\xd8\xfb\xd9\xbfw\xcb\x9f0)#y{\x10\x90\xaa\x85l\tPru\x8c\t\x19\xce\xa0\xf1\xd2\xdc\x8e/\x8b\xa8f\x9c0\xdey\x84\xd2F\xf7\xcbmm\x1f\x87" -TEST_KEY_BYTESTR_3 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\x07k%<\x1fT$E#>ғfD\x18 \x0cae#̬S#VlE\x1epvo\x17M߉DUXL<\x06\x10דw\u2bd5ٿw˟0)#y{\x10l\tPru\t\x19Π\u070e/f0yFmm\x1f" +TEST_KEY_BYTESTR_2 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x81\x00\xd3\x8fV\xea\x07\x85\xa6k%\x8d<\x1f\xbc\x8dT\x98\xa5\x96$\xf3E#\xbe>\xbc\xd2\x93\x93\x87f\xceD\x18\xdb \x0c\xb3\xa1a\x96\xf8e#\xcc\xacS\x8a#\xefVlE\x83\x1epv\xc1o\x17M\xef\xdf\x89DUXL\xa6\x8b\xaa<\x06\x10\xd7\x93w\xec\xaf\xe2\xaf\x95\xd8\xfb\xd9\xbfw\xcb\x9f0)#y{\x10\x90\xaa\x85l\tPru\x8c\t\x19\xce\xa0\xf1\xd2\xdc\x8e/\x8b\xa8f\x9c0\xdey\x84\xd2F\xf7\xcbmm\x1f\x87" # noqa +TEST_KEY_BYTESTR_3 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\x07k%<\x1fT$E#>ғfD\x18 \x0cae#̬S#VlE\x1epvo\x17M߉DUXL<\x06\x10דw\u2bd5ٿw˟0)#y{\x10l\tPru\t\x19Π\u070e/f0yFmm\x1f" # noqa class KeyTest(unittest.TestCase): @@ -131,12 +130,12 @@ class KeyTest(unittest.TestCase): self.assertEqual(fh.readline()[:-1], "Proc-Type: 4,ENCRYPTED") self.assertEqual(fh.readline()[0:10], "DEK-Info: ") - def test_1_generate_key_bytes(self): + def test_generate_key_bytes(self): key = util.generate_key_bytes(md5, x1234, "happy birthday", 30) - exp = b"\x61\xE1\xF2\x72\xF4\xC1\xC4\x56\x15\x86\xBD\x32\x24\x98\xC0\xE9\x24\x67\x27\x80\xF4\x7B\xB3\x7D\xDA\x7D\x54\x01\x9E\x64" + exp = b"\x61\xE1\xF2\x72\xF4\xC1\xC4\x56\x15\x86\xBD\x32\x24\x98\xC0\xE9\x24\x67\x27\x80\xF4\x7B\xB3\x7D\xDA\x7D\x54\x01\x9E\x64" # noqa self.assertEqual(exp, key) - def test_2_load_rsa(self): + def test_load_rsa(self): key = RSAKey.from_private_key_file(_support("test_rsa.key")) self.assertEqual("ssh-rsa", key.get_name()) exp_rsa = b(FINGER_RSA.split()[1].replace(":", "")) @@ -152,7 +151,7 @@ class KeyTest(unittest.TestCase): key2 = RSAKey.from_private_key(s) self.assertEqual(key, key2) - def test_3_load_rsa_password(self): + def test_load_rsa_password(self): key = RSAKey.from_private_key_file( _support("test_rsa_password.key"), "television" ) @@ -163,7 +162,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(PUB_RSA.split()[1], key.get_base64()) self.assertEqual(1024, key.get_bits()) - def test_4_load_dss(self): + def test_load_dss(self): key = DSSKey.from_private_key_file(_support("test_dss.key")) self.assertEqual("ssh-dss", key.get_name()) exp_dss = b(FINGER_DSS.split()[1].replace(":", "")) @@ -179,7 +178,7 @@ class KeyTest(unittest.TestCase): key2 = DSSKey.from_private_key(s) self.assertEqual(key, key2) - def test_5_load_dss_password(self): + def test_load_dss_password(self): key = DSSKey.from_private_key_file( _support("test_dss_password.key"), "television" ) @@ -190,7 +189,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(PUB_DSS.split()[1], key.get_base64()) self.assertEqual(1024, key.get_bits()) - def test_6_compare_rsa(self): + def test_compare_rsa(self): # verify that the private & public keys compare equal key = RSAKey.from_private_key_file(_support("test_rsa.key")) self.assertEqual(key, key) @@ -199,7 +198,7 @@ class KeyTest(unittest.TestCase): self.assertTrue(not pub.can_sign()) self.assertEqual(key, pub) - def test_7_compare_dss(self): + def test_compare_dss(self): # verify that the private & public keys compare equal key = DSSKey.from_private_key_file(_support("test_dss.key")) self.assertEqual(key, key) @@ -208,7 +207,7 @@ class KeyTest(unittest.TestCase): self.assertTrue(not pub.can_sign()) self.assertEqual(key, pub) - def test_8_sign_rsa(self): + def test_sign_rsa(self): # verify that the rsa private key can sign and verify key = RSAKey.from_private_key_file(_support("test_rsa.key")) msg = key.sign_ssh_data(b"ice weasels") @@ -223,7 +222,7 @@ class KeyTest(unittest.TestCase): pub = RSAKey(data=key.asbytes()) self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg)) - def test_9_sign_dss(self): + def test_sign_dss(self): # verify that the dss private key can sign and verify key = DSSKey.from_private_key_file(_support("test_dss.key")) msg = key.sign_ssh_data(b"ice weasels") @@ -238,19 +237,19 @@ class KeyTest(unittest.TestCase): pub = DSSKey(data=key.asbytes()) self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg)) - def test_A_generate_rsa(self): + def test_generate_rsa(self): key = RSAKey.generate(1024) msg = key.sign_ssh_data(b"jerri blank") msg.rewind() self.assertTrue(key.verify_ssh_sig(b"jerri blank", msg)) - def test_B_generate_dss(self): + def test_generate_dss(self): key = DSSKey.generate(1024) msg = key.sign_ssh_data(b"jerri blank") msg.rewind() self.assertTrue(key.verify_ssh_sig(b"jerri blank", msg)) - def test_C_generate_ecdsa(self): + def test_generate_ecdsa(self): key = ECDSAKey.generate() msg = key.sign_ssh_data(b"jerri blank") msg.rewind() @@ -279,7 +278,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key.get_bits(), 521) self.assertEqual(key.get_name(), "ecdsa-sha2-nistp521") - def test_10_load_ecdsa_256(self): + def test_load_ecdsa_256(self): key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) self.assertEqual("ecdsa-sha2-nistp256", key.get_name()) exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(":", "")) @@ -295,7 +294,7 @@ class KeyTest(unittest.TestCase): key2 = ECDSAKey.from_private_key(s) self.assertEqual(key, key2) - def test_11_load_ecdsa_password_256(self): + def test_load_ecdsa_password_256(self): key = ECDSAKey.from_private_key_file( _support("test_ecdsa_password_256.key"), b"television" ) @@ -306,7 +305,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(PUB_ECDSA_256.split()[1], key.get_base64()) self.assertEqual(256, key.get_bits()) - def test_12_compare_ecdsa_256(self): + def test_compare_ecdsa_256(self): # verify that the private & public keys compare equal key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) self.assertEqual(key, key) @@ -315,7 +314,7 @@ class KeyTest(unittest.TestCase): self.assertTrue(not pub.can_sign()) self.assertEqual(key, pub) - def test_13_sign_ecdsa_256(self): + def test_sign_ecdsa_256(self): # verify that the rsa private key can sign and verify key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) msg = key.sign_ssh_data(b"ice weasels") @@ -331,7 +330,7 @@ class KeyTest(unittest.TestCase): pub = ECDSAKey(data=key.asbytes()) self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg)) - def test_14_load_ecdsa_384(self): + def test_load_ecdsa_384(self): key = ECDSAKey.from_private_key_file(_support("test_ecdsa_384.key")) self.assertEqual("ecdsa-sha2-nistp384", key.get_name()) exp_ecdsa = b(FINGER_ECDSA_384.split()[1].replace(":", "")) @@ -347,7 +346,7 @@ class KeyTest(unittest.TestCase): key2 = ECDSAKey.from_private_key(s) self.assertEqual(key, key2) - def test_15_load_ecdsa_password_384(self): + def test_load_ecdsa_password_384(self): key = ECDSAKey.from_private_key_file( _support("test_ecdsa_password_384.key"), b"television" ) @@ -358,7 +357,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(PUB_ECDSA_384.split()[1], key.get_base64()) self.assertEqual(384, key.get_bits()) - def test_16_compare_ecdsa_384(self): + def test_compare_ecdsa_384(self): # verify that the private & public keys compare equal key = ECDSAKey.from_private_key_file(_support("test_ecdsa_384.key")) self.assertEqual(key, key) @@ -367,7 +366,7 @@ class KeyTest(unittest.TestCase): self.assertTrue(not pub.can_sign()) self.assertEqual(key, pub) - def test_17_sign_ecdsa_384(self): + def test_sign_ecdsa_384(self): # verify that the rsa private key can sign and verify key = ECDSAKey.from_private_key_file(_support("test_ecdsa_384.key")) msg = key.sign_ssh_data(b"ice weasels") @@ -383,7 +382,7 @@ class KeyTest(unittest.TestCase): pub = ECDSAKey(data=key.asbytes()) self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg)) - def test_18_load_ecdsa_521(self): + def test_load_ecdsa_521(self): key = ECDSAKey.from_private_key_file(_support("test_ecdsa_521.key")) self.assertEqual("ecdsa-sha2-nistp521", key.get_name()) exp_ecdsa = b(FINGER_ECDSA_521.split()[1].replace(":", "")) @@ -402,7 +401,7 @@ class KeyTest(unittest.TestCase): key2 = ECDSAKey.from_private_key(s) self.assertEqual(key, key2) - def test_19_load_ecdsa_password_521(self): + def test_load_ecdsa_password_521(self): key = ECDSAKey.from_private_key_file( _support("test_ecdsa_password_521.key"), b"television" ) @@ -413,7 +412,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(PUB_ECDSA_521.split()[1], key.get_base64()) self.assertEqual(521, key.get_bits()) - def test_20_compare_ecdsa_521(self): + def test_compare_ecdsa_521(self): # verify that the private & public keys compare equal key = ECDSAKey.from_private_key_file(_support("test_ecdsa_521.key")) self.assertEqual(key, key) @@ -422,7 +421,7 @@ class KeyTest(unittest.TestCase): self.assertTrue(not pub.can_sign()) self.assertEqual(key, pub) - def test_21_sign_ecdsa_521(self): + def test_sign_ecdsa_521(self): # verify that the rsa private key can sign and verify key = ECDSAKey.from_private_key_file(_support("test_ecdsa_521.key")) msg = key.sign_ssh_data(b"ice weasels") @@ -491,7 +490,7 @@ class KeyTest(unittest.TestCase): def test_ed25519_nonbytes_password(self): # https://github.com/paramiko/paramiko/issues/1039 - key = Ed25519Key.from_private_key_file( + Ed25519Key.from_private_key_file( _support("test_ed25519_password.key"), # NOTE: not a bytes. Amusingly, the test above for same key DOES # explicitly cast to bytes...code smell! @@ -540,7 +539,7 @@ class KeyTest(unittest.TestCase): # Delve into blob contents, for test purposes msg = Message(key.public_blob.key_blob) self.assertEqual(msg.get_text(), "ssh-rsa-cert-v01@openssh.com") - nonce = msg.get_string() + msg.get_string() e = msg.get_mpint() n = msg.get_mpint() self.assertEqual(e, key.public_numbers.e) diff --git a/tests/test_sftp.py b/tests/test_sftp.py index 87c57340..e4e18e5a 100644 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -26,23 +26,18 @@ do test file operations in (so no existing files will be harmed). import os import socket import sys -import threading -import unittest import warnings from binascii import hexlify from tempfile import mkstemp import pytest -import paramiko -import paramiko.util from paramiko.py3compat import PY2, b, u, StringIO from paramiko.common import o777, o600, o666, o644 from paramiko.sftp_attr import SFTPAttributes from .util import needs_builtin -from .stub_sftp import StubServer, StubSFTPServer -from .util import _support, slow +from .util import slow ARTICLE = """ @@ -74,14 +69,21 @@ decreased compared with chicken. # Here is how unicode characters are encoded over 1 to 6 bytes in utf-8 -# U-00000000 - U-0000007F: 0xxxxxxx -# U-00000080 - U-000007FF: 110xxxxx 10xxxxxx -# U-00000800 - U-0000FFFF: 1110xxxx 10xxxxxx 10xxxxxx -# U-00010000 - U-001FFFFF: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx -# U-00200000 - U-03FFFFFF: 111110xx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx -# U-04000000 - U-7FFFFFFF: 1111110x 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx +# U-00000000 - U-0000007F: +# 0xxxxxxx +# U-00000080 - U-000007FF: +# 110xxxxx 10xxxxxx +# U-00000800 - U-0000FFFF: +# 1110xxxx 10xxxxxx 10xxxxxx +# U-00010000 - U-001FFFFF: +# 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx +# U-00200000 - U-03FFFFFF: +# 111110xx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx +# U-04000000 - U-7FFFFFFF: +# 1111110x 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx # Note that: hex(int('11000011',2)) == '0xc3' -# Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation byte" +# Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation +# byte" NON_UTF8_DATA = b"\xC3\xC3" unicode_folder = u"\u00fcnic\u00f8de" if PY2 else "\u00fcnic\u00f8de" @@ -90,7 +92,7 @@ utf8_folder = b"/\xc3\xbcnic\xc3\xb8\x64\x65" @slow class TestSFTP(object): - def test_1_file(self, sftp): + def test_file(self, sftp): """ verify that we can create a file. """ @@ -101,7 +103,7 @@ class TestSFTP(object): f.close() sftp.remove(sftp.FOLDER + "/test") - def test_2_close(self, sftp): + def test_close(self, sftp): """ Verify that SFTP session close() causes a socket error on next action. """ @@ -109,7 +111,7 @@ class TestSFTP(object): with pytest.raises(socket.error, match="Socket is closed"): sftp.open(sftp.FOLDER + "/test2", "w") - def test_2_sftp_can_be_used_as_context_manager(self, sftp): + def test_sftp_can_be_used_as_context_manager(self, sftp): """ verify that the sftp session is closed when exiting the context manager """ @@ -118,7 +120,7 @@ class TestSFTP(object): with pytest.raises(socket.error, match="Socket is closed"): sftp.open(sftp.FOLDER + "/test2", "w") - def test_3_write(self, sftp): + def test_write(self, sftp): """ verify that a file can be created and written, and the size is correct. """ @@ -129,7 +131,7 @@ class TestSFTP(object): finally: sftp.remove(sftp.FOLDER + "/duck.txt") - def test_3_sftp_file_can_be_used_as_context_manager(self, sftp): + def test_sftp_file_can_be_used_as_context_manager(self, sftp): """ verify that an opened file can be used as a context manager """ @@ -140,7 +142,7 @@ class TestSFTP(object): finally: sftp.remove(sftp.FOLDER + "/duck.txt") - def test_4_append(self, sftp): + def test_append(self, sftp): """ verify that a file can be opened for append, and tell() still works. """ @@ -158,7 +160,7 @@ class TestSFTP(object): finally: sftp.remove(sftp.FOLDER + "/append.txt") - def test_5_rename(self, sftp): + def test_rename(self, sftp): """ verify that renaming a file works. """ @@ -185,7 +187,7 @@ class TestSFTP(object): except: pass - def test_5a_posix_rename(self, sftp): + def testa_posix_rename(self, sftp): """Test posix-rename@openssh.com protocol extension.""" try: # first check that the normal rename works as specified @@ -194,20 +196,15 @@ class TestSFTP(object): sftp.rename(sftp.FOLDER + "/a", sftp.FOLDER + "/b") with sftp.open(sftp.FOLDER + "/a", "w") as f: f.write("two") - try: + with pytest.raises(IOError): # actual message seems generic sftp.rename(sftp.FOLDER + "/a", sftp.FOLDER + "/b") - self.assertTrue( - False, "no exception when rename-ing onto existing file" - ) - except (OSError, IOError): - pass # now check with the posix_rename sftp.posix_rename(sftp.FOLDER + "/a", sftp.FOLDER + "/b") with sftp.open(sftp.FOLDER + "/b", "r") as f: data = u(f.read()) err = "Contents of renamed file not the same as original file" - assert data == "two", err + assert "two" == data, err finally: try: @@ -219,7 +216,7 @@ class TestSFTP(object): except: pass - def test_6_folder(self, sftp): + def test_folder(self, sftp): """ create a temporary folder, verify that we can create a file in it, then remove the folder and verify that we can't create a file in it anymore. @@ -232,7 +229,7 @@ class TestSFTP(object): with pytest.raises(IOError, match="No such file"): sftp.open(sftp.FOLDER + "/subfolder/test") - def test_7_listdir(self, sftp): + def test_listdir(self, sftp): """ verify that a folder can be created, a bunch of files can be placed in it, and those files show up in sftp.listdir. @@ -253,7 +250,7 @@ class TestSFTP(object): sftp.remove(sftp.FOLDER + "/fish.txt") sftp.remove(sftp.FOLDER + "/tertiary.py") - def test_7_5_listdir_iter(self, sftp): + def test_listdir_iter(self, sftp): """ listdir_iter version of above test """ @@ -273,7 +270,7 @@ class TestSFTP(object): sftp.remove(sftp.FOLDER + "/fish.txt") sftp.remove(sftp.FOLDER + "/tertiary.py") - def test_8_setstat(self, sftp): + def test_setstat(self, sftp): """ verify that the setstat functions (chown, chmod, utime, truncate) work. """ @@ -310,7 +307,7 @@ class TestSFTP(object): finally: sftp.remove(sftp.FOLDER + "/special") - def test_9_fsetstat(self, sftp): + def test_fsetstat(self, sftp): """ verify that the fsetstat functions (chown, chmod, utime, truncate) work on open files. @@ -350,12 +347,12 @@ class TestSFTP(object): finally: sftp.remove(sftp.FOLDER + "/special") - def test_A_readline_seek(self, sftp): + def test_readline_seek(self, sftp): """ - create a text file and write a bunch of text into it. then count the lines - in the file, and seek around to retrieve particular lines. this should - verify that read buffering and 'tell' work well together, and that read - buffering is reset on 'seek'. + create a text file and write a bunch of text into it. then count the + lines in the file, and seek around to retrieve particular lines. this + should verify that read buffering and 'tell' work well together, and + that read buffering is reset on 'seek'. """ try: with sftp.open(sftp.FOLDER + "/duck.txt", "w") as f: @@ -375,17 +372,14 @@ class TestSFTP(object): f.seek(pos_list[17], f.SEEK_SET) assert f.readline()[:4] == "duck" f.seek(pos_list[10], f.SEEK_SET) - assert ( - f.readline() - == "duck types were equally resistant to exogenous insulin compared with chicken.\n" - ) + expected = "duck types were equally resistant to exogenous insulin compared with chicken.\n" # noqa + assert f.readline() == expected finally: sftp.remove(sftp.FOLDER + "/duck.txt") - def test_B_write_seek(self, sftp): + def test_write_seek(self, sftp): """ - create a text file, seek back and change part of it, and verify that the - changes worked. + Create a text file, seek back, change it, and verify. """ try: with sftp.open(sftp.FOLDER + "/testing.txt", "w") as f: @@ -400,7 +394,7 @@ class TestSFTP(object): finally: sftp.remove(sftp.FOLDER + "/testing.txt") - def test_C_symlink(self, sftp): + def test_symlink(self, sftp): """ create a symlink and then check that lstat doesn't follow it. """ @@ -447,7 +441,7 @@ class TestSFTP(object): except: pass - def test_D_flush_seek(self, sftp): + def test_flush_seek(self, sftp): """ verify that buffered writes are automatically flushed on seek. """ @@ -467,7 +461,7 @@ class TestSFTP(object): except: pass - def test_E_realpath(self, sftp): + def test_realpath(self, sftp): """ test that realpath is returning something non-empty and not an error. @@ -478,7 +472,7 @@ class TestSFTP(object): assert len(f) > 0 assert os.path.join(pwd, sftp.FOLDER) == f - def test_F_mkdir(self, sftp): + def test_mkdir(self, sftp): """ verify that mkdir/rmdir work. """ @@ -489,7 +483,7 @@ class TestSFTP(object): with pytest.raises(IOError, match="No such file"): sftp.rmdir(sftp.FOLDER + "/subfolder") - def test_G_chdir(self, sftp): + def test_chdir(self, sftp): """ verify that chdir/getcwd work. """ @@ -525,7 +519,7 @@ class TestSFTP(object): except: pass - def test_H_get_put(self, sftp): + def test_get_put(self, sftp): """ verify that get/put work. """ @@ -560,7 +554,7 @@ class TestSFTP(object): os.unlink(localname) sftp.unlink(sftp.FOLDER + "/bunny.txt") - def test_I_check(self, sftp): + def test_check(self, sftp): """ verify that file.check() works against our own server. (it's an sftp extension that we support, and may be the only ones who @@ -582,14 +576,12 @@ class TestSFTP(object): == u(hexlify(sum)).upper() ) sum = f.check("md5", 0, 0, 510) - assert ( - u(hexlify(sum)).upper() - == "EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6" - ) # noqa + expected = "EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6" # noqa + assert u(hexlify(sum)).upper() == expected finally: sftp.unlink(sftp.FOLDER + "/kitty.txt") - def test_J_x_flag(self, sftp): + def test_x_flag(self, sftp): """ verify that the 'x' flag works when opening a file. """ @@ -604,7 +596,7 @@ class TestSFTP(object): finally: sftp.unlink(sftp.FOLDER + "/unusual.txt") - def test_K_utf8(self, sftp): + def test_utf8(self, sftp): """ verify that unicode strings are encoded into utf8 correctly. """ @@ -620,7 +612,7 @@ class TestSFTP(object): self.fail("exception " + str(e)) sftp.unlink(b(sftp.FOLDER) + utf8_folder) - def test_L_utf8_chdir(self, sftp): + def test_utf8_chdir(self, sftp): sftp.mkdir(sftp.FOLDER + "/" + unicode_folder) try: sftp.chdir(sftp.FOLDER + "/" + unicode_folder) @@ -631,7 +623,7 @@ class TestSFTP(object): sftp.chdir() sftp.rmdir(sftp.FOLDER + "/" + unicode_folder) - def test_M_bad_readv(self, sftp): + def test_bad_readv(self, sftp): """ verify that readv at the end of the file doesn't essplode. """ @@ -647,7 +639,7 @@ class TestSFTP(object): finally: sftp.unlink(sftp.FOLDER + "/zero") - def test_N_put_without_confirm(self, sftp): + def test_put_without_confirm(self, sftp): """ verify that get/put work without confirmation. """ @@ -676,11 +668,11 @@ class TestSFTP(object): os.unlink(localname) sftp.unlink(sftp.FOLDER + "/bunny.txt") - def test_O_getcwd(self, sftp): + def test_getcwd(self, sftp): """ verify that chdir/getcwd work. """ - assert sftp.getcwd() == None + assert sftp.getcwd() is None root = sftp.normalize(".") if root[-1] != "/": root += "/" @@ -695,7 +687,7 @@ class TestSFTP(object): except: pass - def XXX_test_M_seek_append(self, sftp): + def test_seek_append(self, sftp): """ verify that seek does't affect writes during append. @@ -733,7 +725,7 @@ class TestSFTP(object): # appear but they're clearly emitted from subthreads that have no error # handling. No point running it until that is fixed somehow. @pytest.mark.skip("Doesn't prove anything right now") - def test_N_file_with_percent(self, sftp): + def test_file_with_percent(self, sftp): """ verify that we can create a file with a '%' in the filename. ( it needs to be properly escaped by _log() ) @@ -745,7 +737,7 @@ class TestSFTP(object): f.close() sftp.remove(sftp.FOLDER + "/test%file") - def test_O_non_utf8_data(self, sftp): + def test_non_utf8_data(self, sftp): """Test write() and read() of non utf8 data""" try: with sftp.open("%s/nonutf8data" % sftp.FOLDER, "w") as f: @@ -775,7 +767,7 @@ class TestSFTP(object): try: with sftp.open("%s/write_buffer" % sftp.FOLDER, "wb") as f: for offset in range(0, len(data), 8): - f.write(buffer(data, offset, 8)) + f.write(buffer(data, offset, 8)) # noqa with sftp.open("%s/write_buffer" % sftp.FOLDER, "rb") as f: assert f.read() == data diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py index 9df566e8..fc556faf 100644 --- a/tests/test_sftp_big.py +++ b/tests/test_sftp_big.py @@ -23,12 +23,10 @@ a real actual sftp server is contacted, and a new folder is created there to do test file operations in (so no existing files will be harmed). """ -import os import random import struct import sys import time -import unittest from paramiko.common import o660 @@ -37,7 +35,7 @@ from .util import slow @slow class TestBigSFTP(object): - def test_1_lots_of_files(self, sftp): + def test_lots_of_files(self, sftp): """ create a bunch of files over the same session. """ @@ -65,7 +63,7 @@ class TestBigSFTP(object): except: pass - def test_2_big_file(self, sftp): + def test_big_file(self, sftp): """ write a 1MB file with no buffering. """ @@ -96,7 +94,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_3_big_file_pipelined(self, sftp): + def test_big_file_pipelined(self, sftp): """ write a 1MB file, with no linefeeds, using pipelining. """ @@ -122,7 +120,8 @@ class TestBigSFTP(object): file_size = f.stat().st_size f.prefetch(file_size) - # read on odd boundaries to make sure the bytes aren't getting scrambled + # read on odd boundaries to make sure the bytes aren't getting + # scrambled n = 0 k2blob = kblob + kblob chunk = 629 @@ -140,7 +139,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_4_prefetch_seek(self, sftp): + def test_prefetch_seek(self, sftp): kblob = bytes().join([struct.pack(">H", n) for n in range(512)]) try: with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f: @@ -180,7 +179,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_5_readv_seek(self, sftp): + def test_readv_seek(self, sftp): kblob = bytes().join([struct.pack(">H", n) for n in range(512)]) try: with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f: @@ -220,7 +219,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_6_lots_of_prefetching(self, sftp): + def test_lots_of_prefetching(self, sftp): """ prefetch a 1MB file a bunch of times, discarding the file object without using it, to verify that paramiko doesn't get confused. @@ -255,7 +254,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_7_prefetch_readv(self, sftp): + def test_prefetch_readv(self, sftp): """ verify that prefetch and readv don't conflict with each other. """ @@ -296,7 +295,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_8_large_readv(self, sftp): + def test_large_readv(self, sftp): """ verify that a very large readv is broken up correctly and still returned as a single blob. @@ -325,7 +324,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_9_big_file_big_buffer(self, sftp): + def test_big_file_big_buffer(self, sftp): """ write a 1MB file, with no linefeeds, and a big buffer. """ @@ -342,7 +341,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_A_big_file_renegotiate(self, sftp): + def test_big_file_renegotiate(self, sftp): """ write a 1MB file, forcing key renegotiation in the middle. """ diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index b6b50152..d8c151f9 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -139,16 +139,16 @@ class GSSAuthTest(unittest.TestCase): stdout.close() stderr.close() - def test_1_gss_auth(self): + def test_gss_auth(self): """ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication (gssapi-with-mic) in client and server mode. """ self._test_connection(allow_agent=False, look_for_keys=False) - def test_2_auth_trickledown(self): + def test_auth_trickledown(self): """ - Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding + Failed gssapi-with-mic doesn't prevent subsequent key from succeeding """ self.hostname = ( "this_host_does_not_exists_and_causes_a_GSSAPI-exception" diff --git a/tests/test_transport.py b/tests/test_transport.py index 2b8ee3bc..ad267e28 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -28,36 +28,32 @@ import socket import time import threading import random -from hashlib import sha1 import unittest from mock import Mock from paramiko import ( - Transport, - SecurityOptions, - ServerInterface, - RSAKey, - DSSKey, - SSHException, + AuthHandler, ChannelException, + DSSKey, Packetizer, - Channel, - AuthHandler, + RSAKey, + SSHException, + SecurityOptions, + ServerInterface, + Transport, ) from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED from paramiko.common import ( - MSG_KEXINIT, - cMSG_CHANNEL_WINDOW_ADJUST, - cMSG_UNIMPLEMENTED, + DEFAULT_MAX_PACKET_SIZE, + DEFAULT_WINDOW_SIZE, + MAX_WINDOW_SIZE, MIN_PACKET_SIZE, MIN_WINDOW_SIZE, - MAX_WINDOW_SIZE, - DEFAULT_WINDOW_SIZE, - DEFAULT_MAX_PACKET_SIZE, - MSG_NAMES, - MSG_UNIMPLEMENTED, + MSG_KEXINIT, MSG_USERAUTH_SUCCESS, + cMSG_CHANNEL_WINDOW_ADJUST, + cMSG_UNIMPLEMENTED, ) from paramiko.py3compat import bytes, byte_chr from paramiko.message import Message @@ -185,7 +181,7 @@ class TransportTest(unittest.TestCase): self.assertTrue(event.is_set()) self.assertTrue(self.ts.is_active()) - def test_1_security_options(self): + def test_security_options(self): o = self.tc.get_security_options() self.assertEqual(type(o), SecurityOptions) self.assertTrue(("aes256-cbc", "blowfish-cbc") != o.ciphers) @@ -202,7 +198,7 @@ class TransportTest(unittest.TestCase): except TypeError: pass - def test_1b_security_options_reset(self): + def testb_security_options_reset(self): o = self.tc.get_security_options() # should not throw any exceptions o.ciphers = o.ciphers @@ -211,17 +207,17 @@ class TransportTest(unittest.TestCase): o.kex = o.kex o.compression = o.compression - def test_2_compute_key(self): - self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929 - self.tc.H = b"\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3" + def test_compute_key(self): + self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929 # noqa + self.tc.H = b"\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3" # noqa self.tc.session_id = self.tc.H key = self.tc._compute_key("C", 32) self.assertEqual( - b"207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995", + b"207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995", # noqa hexlify(key).upper(), ) - def test_3_simple(self): + def test_simple(self): """ verify that we can establish an ssh link with ourselves across the loopback sockets. this is hardly "simple" but it's simpler than the @@ -249,7 +245,7 @@ class TransportTest(unittest.TestCase): self.assertEqual(True, self.tc.is_authenticated()) self.assertEqual(True, self.ts.is_authenticated()) - def test_3a_long_banner(self): + def testa_long_banner(self): """ verify that a long banner doesn't mess up the handshake. """ @@ -268,7 +264,7 @@ class TransportTest(unittest.TestCase): self.assertTrue(event.is_set()) self.assertTrue(self.ts.is_active()) - def test_4_special(self): + def test_special(self): """ verify that the client can demand odd handshake settings, and can renegotiate keys in mid-stream. @@ -289,7 +285,7 @@ class TransportTest(unittest.TestCase): self.ts.send_ignore(1024) @slow - def test_5_keepalive(self): + def test_keepalive(self): """ verify that the keepalive will be sent. """ @@ -299,7 +295,7 @@ class TransportTest(unittest.TestCase): time.sleep(2) self.assertEqual("keepalive@lag.net", self.server._global_request) - def test_6_exec_command(self): + def test_exec_command(self): """ verify that exec_command() does something reasonable. """ @@ -343,7 +339,7 @@ class TransportTest(unittest.TestCase): self.assertEqual("This is on stderr.\n", f.readline()) self.assertEqual("", f.readline()) - def test_6a_channel_can_be_used_as_context_manager(self): + def testa_channel_can_be_used_as_context_manager(self): """ verify that exec_command() does something reasonable. """ @@ -359,7 +355,7 @@ class TransportTest(unittest.TestCase): self.assertEqual("Hello there.\n", f.readline()) self.assertEqual("", f.readline()) - def test_7_invoke_shell(self): + def test_invoke_shell(self): """ verify that invoke_shell() does something reasonable. """ @@ -373,18 +369,18 @@ class TransportTest(unittest.TestCase): chan.close() self.assertEqual("", f.readline()) - def test_8_channel_exception(self): + def test_channel_exception(self): """ verify that ChannelException is thrown for a bad open-channel request. """ self.setup_test_server() try: - chan = self.tc.open_channel("bogus") + self.tc.open_channel("bogus") self.fail("expected exception") except ChannelException as e: self.assertTrue(e.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED) - def test_9_exit_status(self): + def test_exit_status(self): """ verify that get_exit_status() works. """ @@ -413,7 +409,7 @@ class TransportTest(unittest.TestCase): self.assertEqual(23, chan.recv_exit_status()) chan.close() - def test_A_select(self): + def test_select(self): """ verify that select() on a channel works. """ @@ -468,7 +464,7 @@ class TransportTest(unittest.TestCase): # ...and now is closed. self.assertEqual(True, p._closed) - def test_B_renegotiate(self): + def test_renegotiate(self): """ verify that a transport can correctly renegotiate mid-stream. """ @@ -492,7 +488,7 @@ class TransportTest(unittest.TestCase): schan.close() - def test_C_compression(self): + def test_compression(self): """ verify that zlib compression is basically working. """ @@ -510,14 +506,15 @@ class TransportTest(unittest.TestCase): bytes2 = self.tc.packetizer._Packetizer__sent_bytes block_size = self.tc._cipher_info[self.tc.local_cipher]["block-size"] mac_size = self.tc._mac_info[self.tc.local_mac]["size"] - # tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :) + # tests show this is actually compressed to *52 bytes*! including + # packet overhead! nice!! :) self.assertTrue(bytes2 - bytes < 1024) self.assertEqual(16 + block_size + mac_size, bytes2 - bytes) chan.close() schan.close() - def test_D_x11(self): + def test_x11(self): """ verify that an x11 port can be requested and opened. """ @@ -555,7 +552,7 @@ class TransportTest(unittest.TestCase): chan.close() schan.close() - def test_E_reverse_port_forwarding(self): + def test_reverse_port_forwarding(self): """ verify that a client can ask the server to open a reverse port for forwarding. @@ -563,7 +560,7 @@ class TransportTest(unittest.TestCase): self.setup_test_server() chan = self.tc.open_session() chan.exec_command("yes") - schan = self.ts.accept(1.0) + self.ts.accept(1.0) requested = [] @@ -594,7 +591,7 @@ class TransportTest(unittest.TestCase): self.tc.cancel_port_forward("127.0.0.1", port) self.assertTrue(self.server._listen is None) - def test_F_port_forwarding(self): + def test_port_forwarding(self): """ verify that a client can forward new connections from a locally- forwarded port. @@ -602,7 +599,7 @@ class TransportTest(unittest.TestCase): self.setup_test_server() chan = self.tc.open_session() chan.exec_command("yes") - schan = self.ts.accept(1.0) + self.ts.accept(1.0) # open a port on the "server" that the client will ask to forward to. greeting_server = socket.socket() @@ -626,7 +623,7 @@ class TransportTest(unittest.TestCase): self.assertEqual(b"Hello!\n", cs.recv(7)) cs.close() - def test_G_stderr_select(self): + def test_stderr_select(self): """ verify that select() on a channel works even if only stderr is receiving data. @@ -665,7 +662,7 @@ class TransportTest(unittest.TestCase): schan.close() chan.close() - def test_H_send_ready(self): + def test_send_ready(self): """ verify that send_ready() indicates when a send would not block. """ @@ -689,9 +686,10 @@ class TransportTest(unittest.TestCase): chan.close() self.assertEqual(chan.send_ready(), True) - def test_I_rekey_deadlock(self): + def test_rekey_deadlock(self): """ - Regression test for deadlock when in-transit messages are received after MSG_KEXINIT is sent + Regression test for deadlock when in-transit messages are received + after MSG_KEXINIT is sent Note: When this test fails, it may leak threads. """ @@ -714,12 +712,15 @@ class TransportTest(unittest.TestCase): # MSG_KEXINIT to the remote host. # # On the remote host (using any SSH implementation): - # 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST is sent. - # 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is sent. + # 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST + # is sent. + # 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is + # sent. # # In the main thread: # 7. The user's program calls Channel.send(). - # 8. Channel.send acquires Channel.lock, then calls Transport._send_user_message(). + # 8. Channel.send acquires Channel.lock, then calls + # Transport._send_user_message(). # 9. Transport._send_user_message waits for Transport.clear_to_send # to be set (i.e., it waits for re-keying to complete). # Channel.lock is still held. @@ -854,7 +855,7 @@ class TransportTest(unittest.TestCase): schan.close() chan.close() - def test_J_sanitze_packet_size(self): + def test_sanitze_packet_size(self): """ verify that we conform to the rfc of packet and window sizes. """ @@ -865,7 +866,7 @@ class TransportTest(unittest.TestCase): ]: self.assertEqual(self.tc._sanitize_packet_size(val), correct) - def test_K_sanitze_window_size(self): + def test_sanitze_window_size(self): """ verify that we conform to the rfc of packet and window sizes. """ @@ -877,7 +878,7 @@ class TransportTest(unittest.TestCase): self.assertEqual(self.tc._sanitize_window_size(val), correct) @slow - def test_L_handshake_timeout(self): + def test_handshake_timeout(self): """ verify that we can get a hanshake timeout. """ @@ -914,7 +915,7 @@ class TransportTest(unittest.TestCase): password="pygmalion", ) - def test_M_select_after_close(self): + def test_select_after_close(self): """ verify that select works when a channel is already closed. """ @@ -969,11 +970,11 @@ class TransportTest(unittest.TestCase): # send() accepts buffer instances sent = 0 while sent < len(data): - sent += chan.send(buffer(data, sent, 8)) + sent += chan.send(buffer(data, sent, 8)) # noqa self.assertEqual(sfile.read(len(data)), data) # sendall() accepts a buffer instance - chan.sendall(buffer(data)) + chan.sendall(buffer(data)) # noqa self.assertEqual(sfile.read(len(data)), data) @needs_builtin("memoryview") diff --git a/tests/test_util.py b/tests/test_util.py index 36d69de7..465e75b8 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -26,9 +26,11 @@ import os from hashlib import sha1 import unittest +import paramiko import paramiko.util +from paramiko import SSHConfig from paramiko.util import lookup_ssh_host_config as host_config, safe_string -from paramiko.py3compat import StringIO, byte_ord, b +from paramiko.py3compat import StringIO, byte_ord # Note some lines in this configuration have trailing spaces on purpose @@ -62,16 +64,12 @@ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\ """ -# for test 1: -from paramiko import * - - class UtilTest(unittest.TestCase): def test_import(self): """ verify that all the classes can be imported from paramiko. """ - symbols = list(globals().keys()) + symbols = paramiko.__all__ self.assertTrue("Transport" in symbols) self.assertTrue("SSHClient" in symbols) self.assertTrue("MissingHostKeyPolicy" in symbols) @@ -172,7 +170,7 @@ class UtilTest(unittest.TestCase): hex = "".join(["%02x" % byte_ord(c) for c in x]) self.assertEqual( hex, - "9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b", + "9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b", # noqa ) def test_host_keys(self): @@ -419,8 +417,7 @@ IdentityFile something_%l_using_fqdn f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) self.assertEqual( - config.get_hostnames(), - set(["*", "*.example.com", "spoo.example.com"]), + config.get_hostnames(), {"*", "*.example.com", "spoo.example.com"} ) def test_quoted_host_names(self): @@ -508,12 +505,12 @@ Host param3 parara self.assertRaises(Exception, conf._get_hosts, host) def test_safe_string(self): - vanilla = b("vanilla") - has_bytes = b("has \7\3 bytes") + vanilla = b"vanilla" + has_bytes = b"has \7\3 bytes" safe_vanilla = safe_string(vanilla) safe_has_bytes = safe_string(has_bytes) - expected_bytes = b("has %07%03 bytes") - err = "{0!r} != {1!r}" + expected_bytes = b"has %07%03 bytes" + err = "{!r} != {!r}" msg = err.format(safe_vanilla, vanilla) assert safe_vanilla == vanilla, msg msg = err.format(safe_has_bytes, expected_bytes) diff --git a/tests/util.py b/tests/util.py index c1ba4b2c..4ca02374 100644 --- a/tests/util.py +++ b/tests/util.py @@ -20,7 +20,7 @@ def needs_builtin(name): """ Skip decorated test if builtin name does not exist. """ - reason = "Test requires a builtin '{0}'".format(name) + reason = "Test requires a builtin '{}'".format(name) return pytest.mark.skipif(not hasattr(builtins, name), reason=reason) |