summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--paramiko/auth_handler.py3
-rw-r--r--paramiko/client.py43
-rw-r--r--paramiko/dsskey.py4
-rw-r--r--paramiko/ecdsakey.py11
-rw-r--r--paramiko/ed25519key.py6
-rw-r--r--paramiko/pkey.py25
-rw-r--r--paramiko/rsakey.py8
-rw-r--r--paramiko/server.py1
-rw-r--r--paramiko/transport.py40
-rw-r--r--tasks.py1
-rw-r--r--tests/test_client.py28
-rw-r--r--tests/test_pkey.py18
12 files changed, 107 insertions, 81 deletions
diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py
index cb6d8fb4..13449b58 100644
--- a/paramiko/auth_handler.py
+++ b/paramiko/auth_handler.py
@@ -46,6 +46,7 @@ from paramiko.common import (
MSG_USERAUTH_REQUEST,
MSG_USERAUTH_SUCCESS,
MSG_USERAUTH_FAILURE,
+ cMSG_USERAUTH_BANNER,
MSG_USERAUTH_BANNER,
MSG_USERAUTH_INFO_REQUEST,
MSG_USERAUTH_INFO_RESPONSE,
@@ -508,7 +509,7 @@ class AuthHandler(object):
)
key = None
except Exception as e:
- msg = 'Auth rejected: unsupported or mangled public key ({0}: {1})' # noqa
+ msg = "Auth rejected: unsupported or mangled public key ({0}: {1})" # noqa
self.transport._log(INFO, msg.format(e.__class__.__name__, e))
key = None
if key is None:
diff --git a/paramiko/client.py b/paramiko/client.py
index 2993e6de..49a9a301 100644
--- a/paramiko/client.py
+++ b/paramiko/client.py
@@ -416,8 +416,16 @@ class SSHClient(ClosingContextManager):
key_filenames = key_filename
self._auth(
- username, password, pkey, key_filenames, allow_agent,
- look_for_keys, gss_auth, gss_kex, gss_deleg_creds, t.gss_host,
+ username,
+ password,
+ pkey,
+ key_filenames,
+ allow_agent,
+ look_for_keys,
+ gss_auth,
+ gss_kex,
+ gss_deleg_creds,
+ t.gss_host,
)
def close(self):
@@ -542,10 +550,10 @@ class SSHClient(ClosingContextManager):
- Otherwise, the filename is assumed to be a private key, and the
matching public cert will be loaded if it exists.
"""
- cert_suffix = '-cert.pub'
+ cert_suffix = "-cert.pub"
# Assume privkey, not cert, by default
if filename.endswith(cert_suffix):
- key_path = filename[:-len(cert_suffix)]
+ key_path = filename[: -len(cert_suffix)]
cert_path = filename
else:
key_path = filename
@@ -556,7 +564,7 @@ class SSHClient(ClosingContextManager):
# when #387 is released, since this is a critical log message users are
# likely testing/filtering for (bah.)
msg = "Trying discovered key {0} in {1}".format(
- hexlify(key.get_fingerprint()), key_path,
+ hexlify(key.get_fingerprint()), key_path
)
self._log(DEBUG, msg)
# Attempt to load cert if it exists.
@@ -565,8 +573,19 @@ class SSHClient(ClosingContextManager):
self._log(DEBUG, "Adding public certificate {0}".format(cert_path))
return key
- def _auth(self, username, password, pkey, key_filenames, allow_agent,
- look_for_keys, gss_auth, gss_kex, gss_deleg_creds, gss_host):
+ def _auth(
+ self,
+ username,
+ password,
+ pkey,
+ key_filenames,
+ allow_agent,
+ look_for_keys,
+ gss_auth,
+ gss_kex,
+ gss_deleg_creds,
+ gss_host,
+ ):
"""
Try, in order:
@@ -600,7 +619,7 @@ class SSHClient(ClosingContextManager):
if gss_auth:
try:
return self._transport.auth_gssapi_with_mic(
- username, gss_host, gss_deleg_creds,
+ username, gss_host, gss_deleg_creds
)
except Exception as e:
saved_exception = e
@@ -625,7 +644,7 @@ class SSHClient(ClosingContextManager):
for pkey_class in (RSAKey, DSSKey, ECDSAKey, Ed25519Key):
try:
key = self._key_from_filepath(
- key_filename, pkey_class, password,
+ key_filename, pkey_class, password
)
allowed_types = set(
self._transport.auth_publickey(username, key)
@@ -677,8 +696,8 @@ class SSHClient(ClosingContextManager):
if os.path.isfile(full_path):
# TODO: only do this append if below did not run
keyfiles.append((keytype, full_path))
- if os.path.isfile(full_path + '-cert.pub'):
- keyfiles.append((keytype, full_path + '-cert.pub'))
+ if os.path.isfile(full_path + "-cert.pub"):
+ keyfiles.append((keytype, full_path + "-cert.pub"))
if not look_for_keys:
keyfiles = []
@@ -686,7 +705,7 @@ class SSHClient(ClosingContextManager):
for pkey_class, filename in keyfiles:
try:
key = self._key_from_filepath(
- filename, pkey_class, password,
+ filename, pkey_class, password
)
# for 2-factor auth a successfully auth'd key will result
# in ['password']
diff --git a/paramiko/dsskey.py b/paramiko/dsskey.py
index 1bc2fbf5..ec358ee2 100644
--- a/paramiko/dsskey.py
+++ b/paramiko/dsskey.py
@@ -71,8 +71,8 @@ class DSSKey(PKey):
else:
self._check_type_and_load_cert(
msg=msg,
- key_type='ssh-dss',
- cert_type='ssh-dss-cert-v01@openssh.com',
+ key_type="ssh-dss",
+ cert_type="ssh-dss-cert-v01@openssh.com",
)
self.p = msg.get_mpint()
self.q = msg.get_mpint()
diff --git a/paramiko/ecdsakey.py b/paramiko/ecdsakey.py
index 1fc164f4..ab9ec15f 100644
--- a/paramiko/ecdsakey.py
+++ b/paramiko/ecdsakey.py
@@ -139,21 +139,18 @@ class ECDSAKey(PKey):
# identifier, so strip out any cert business. (NOTE: could push
# that into _ECDSACurveSet.get_by_key_format_identifier(), but it
# feels more correct to do it here?)
- suffix = '-cert-v01@openssh.com'
+ suffix = "-cert-v01@openssh.com"
if key_type.endswith(suffix):
- key_type = key_type[:-len(suffix)]
+ key_type = key_type[: -len(suffix)]
self.ecdsa_curve = self._ECDSA_CURVES.get_by_key_format_identifier(
key_type
)
key_types = self._ECDSA_CURVES.get_key_format_identifier_list()
cert_types = [
- '{0}-cert-v01@openssh.com'.format(x)
- for x in key_types
+ "{0}-cert-v01@openssh.com".format(x) for x in key_types
]
self._check_type_and_load_cert(
- msg=msg,
- key_type=key_types,
- cert_type=cert_types,
+ msg=msg, key_type=key_types, cert_type=cert_types
)
curvename = msg.get_text()
if curvename != self.ecdsa_curve.nist_name:
diff --git a/paramiko/ed25519key.py b/paramiko/ed25519key.py
index b30ba085..68ada224 100644
--- a/paramiko/ed25519key.py
+++ b/paramiko/ed25519key.py
@@ -56,8 +56,10 @@ class Ed25519Key(PKey):
.. versionchanged:: 2.3
Added a ``file_obj`` parameter to match other key classes.
"""
- def __init__(self, msg=None, data=None, filename=None, password=None,
- file_obj=None):
+
+ def __init__(
+ self, msg=None, data=None, filename=None, password=None, file_obj=None
+ ):
self.public_blob = None
verifying_key = signing_key = None
if msg is None and data is not None:
diff --git a/paramiko/pkey.py b/paramiko/pkey.py
index 3a4a3193..a6e7800a 100644
--- a/paramiko/pkey.py
+++ b/paramiko/pkey.py
@@ -361,11 +361,11 @@ class PKey(object):
else:
encryption = serialization.BestAvailableEncryption(b(password))
- f.write(key.private_bytes(
- serialization.Encoding.PEM,
- format,
- encryption
- ).decode())
+ f.write(
+ key.private_bytes(
+ serialization.Encoding.PEM, format, encryption
+ ).decode()
+ )
def _check_type_and_load_cert(self, msg, key_type, cert_type):
"""
@@ -388,7 +388,7 @@ class PKey(object):
cert_types = [cert_types]
# Can't do much with no message, that should've been handled elsewhere
if msg is None:
- raise SSHException('Key object may not be empty')
+ raise SSHException("Key object may not be empty")
# First field is always key type, in either kind of object. (make sure
# we rewind before grabbing it - sometimes caller had to do their own
# introspection first!)
@@ -411,7 +411,7 @@ class PKey(object):
# (requires going back into per-type subclasses.)
msg.get_string()
else:
- err = 'Invalid key (class: {0}, data type: {1}'
+ err = "Invalid key (class: {0}, data type: {1}"
raise SSHException(err.format(self.__class__.__name__, type_))
def load_certificate(self, value):
@@ -434,11 +434,11 @@ class PKey(object):
successfully.
"""
if isinstance(value, Message):
- constructor = 'from_message'
+ constructor = "from_message"
elif os.path.isfile(value):
- constructor = 'from_file'
+ constructor = "from_file"
else:
- constructor = 'from_string'
+ constructor = "from_string"
blob = getattr(PublicBlob, constructor)(value)
if not blob.key_type.startswith(self.get_name()):
err = "PublicBlob type {0} incompatible with key type {1}"
@@ -464,6 +464,7 @@ class PublicBlob(object):
`from_message` for useful instantiation, the main constructor is
basically "I should be using ``attrs`` for this."
"""
+
def __init__(self, type_, blob, comment=None):
"""
Create a new public blob of given type and contents.
@@ -505,7 +506,7 @@ class PublicBlob(object):
m = Message(key_blob)
blob_type = m.get_text()
if blob_type != key_type:
- msg = "Invalid PublicBlob contents: key type={0!r}, but blob type={1!r}" # noqa
+ msg = "Invalid PublicBlob contents: key type={0!r}, but blob type={1!r}" # noqa
raise ValueError(msg.format(key_type, blob_type))
# All good? All good.
return cls(type_=key_type, blob=key_blob, comment=comment)
@@ -522,7 +523,7 @@ class PublicBlob(object):
return cls(type_=type_, blob=message.asbytes())
def __str__(self):
- ret = '{0} public key/certificate'.format(self.key_type)
+ ret = "{0} public key/certificate".format(self.key_type)
if self.comment:
ret += "- {0}".format(self.comment)
return ret
diff --git a/paramiko/rsakey.py b/paramiko/rsakey.py
index d3ce0335..442bfe1f 100644
--- a/paramiko/rsakey.py
+++ b/paramiko/rsakey.py
@@ -61,8 +61,8 @@ class RSAKey(PKey):
else:
self._check_type_and_load_cert(
msg=msg,
- key_type='ssh-rsa',
- cert_type='ssh-rsa-cert-v01@openssh.com',
+ key_type="ssh-rsa",
+ cert_type="ssh-rsa-cert-v01@openssh.com",
)
self.key = rsa.RSAPublicNumbers(
e=msg.get_mpint(), n=msg.get_mpint()
@@ -114,9 +114,7 @@ class RSAKey(PKey):
def sign_ssh_data(self, data):
sig = self.key.sign(
- data,
- padding=padding.PKCS1v15(),
- algorithm=hashes.SHA1(),
+ data, padding=padding.PKCS1v15(), algorithm=hashes.SHA1()
)
m = Message()
diff --git a/paramiko/server.py b/paramiko/server.py
index 5814b7c6..3eb1a170 100644
--- a/paramiko/server.py
+++ b/paramiko/server.py
@@ -593,6 +593,7 @@ class ServerInterface(object):
"""
return (None, None)
+
class InteractiveQuery(object):
"""
A query (set of prompts) for a user during interactive authentication.
diff --git a/paramiko/transport.py b/paramiko/transport.py
index 8742a248..eebe0513 100644
--- a/paramiko/transport.py
+++ b/paramiko/transport.py
@@ -244,18 +244,18 @@ class Transport(threading.Thread, ClosingContextManager):
}
_key_info = {
- 'ssh-rsa': RSAKey,
- 'ssh-rsa-cert-v01@openssh.com': RSAKey,
- 'ssh-dss': DSSKey,
- 'ssh-dss-cert-v01@openssh.com': DSSKey,
- 'ecdsa-sha2-nistp256': ECDSAKey,
- 'ecdsa-sha2-nistp256-cert-v01@openssh.com': ECDSAKey,
- 'ecdsa-sha2-nistp384': ECDSAKey,
- 'ecdsa-sha2-nistp384-cert-v01@openssh.com': ECDSAKey,
- 'ecdsa-sha2-nistp521': ECDSAKey,
- 'ecdsa-sha2-nistp521-cert-v01@openssh.com': ECDSAKey,
- 'ssh-ed25519': Ed25519Key,
- 'ssh-ed25519-cert-v01@openssh.com': Ed25519Key,
+ "ssh-rsa": RSAKey,
+ "ssh-rsa-cert-v01@openssh.com": RSAKey,
+ "ssh-dss": DSSKey,
+ "ssh-dss-cert-v01@openssh.com": DSSKey,
+ "ecdsa-sha2-nistp256": ECDSAKey,
+ "ecdsa-sha2-nistp256-cert-v01@openssh.com": ECDSAKey,
+ "ecdsa-sha2-nistp384": ECDSAKey,
+ "ecdsa-sha2-nistp384-cert-v01@openssh.com": ECDSAKey,
+ "ecdsa-sha2-nistp521": ECDSAKey,
+ "ecdsa-sha2-nistp521-cert-v01@openssh.com": ECDSAKey,
+ "ssh-ed25519": Ed25519Key,
+ "ssh-ed25519-cert-v01@openssh.com": Ed25519Key,
}
_kex_info = {
@@ -340,7 +340,7 @@ class Transport(threading.Thread, ClosingContextManager):
if isinstance(sock, string_types):
# convert "host:port" into (host, port)
- hl = sock.split(':', 1)
+ hl = sock.split(":", 1)
self.hostname = hl[0]
if len(hl) == 1:
sock = (hl[0], 22)
@@ -350,7 +350,7 @@ class Transport(threading.Thread, ClosingContextManager):
# connect to the given (host, port)
hostname, port = sock
self.hostname = hostname
- reason = 'No suitable address family'
+ reason = "No suitable address family"
addrinfos = socket.getaddrinfo(
hostname, port, socket.AF_UNSPEC, socket.SOCK_STREAM
)
@@ -1236,9 +1236,11 @@ class Transport(threading.Thread, ClosingContextManager):
if (pkey is not None) or (password is not None) or gss_auth or gss_kex:
if gss_auth:
- self._log(DEBUG, 'Attempting GSS-API auth... (gssapi-with-mic)') # noqa
+ self._log(
+ DEBUG, "Attempting GSS-API auth... (gssapi-with-mic)"
+ ) # noqa
self.auth_gssapi_with_mic(
- username, self.gss_host, gss_deleg_creds,
+ username, self.gss_host, gss_deleg_creds
)
elif gss_kex:
self._log(DEBUG, "Attempting GSS-API auth... (gssapi-keyex)")
@@ -2006,7 +2008,11 @@ class Transport(threading.Thread, ClosingContextManager):
% chanid,
) # noqa
else:
- self._log(ERROR, 'Channel request for unknown channel %d' % chanid) # noqa
+ self._log(
+ ERROR,
+ "Channel request for unknown channel %d"
+ % chanid,
+ ) # noqa
break
elif (
self.auth_handler is not None
diff --git a/tasks.py b/tasks.py
index 261a7443..6ea0a2c4 100644
--- a/tasks.py
+++ b/tasks.py
@@ -7,7 +7,6 @@ from invocations import travis
from invocations.checks import blacken
from invocations.docs import docs, www, sites
from invocations.packaging.release import ns as release_coll, publish
-from invocations.testing import count_errors
# TODO: this screams out for the invoke missing-feature of "I just wrap task X,
diff --git a/tests/test_client.py b/tests/test_client.py
index 48af2a57..fec1485e 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -52,9 +52,9 @@ FINGERPRINTS = {
class NullServer(paramiko.ServerInterface):
def __init__(self, *args, **kwargs):
# Allow tests to enable/disable specific key types
- self.__allowed_keys = kwargs.pop('allowed_keys', [])
+ self.__allowed_keys = kwargs.pop("allowed_keys", [])
# And allow them to set a (single...meh) expected public blob (cert)
- self.__expected_public_blob = kwargs.pop('public_blob', None)
+ self.__expected_public_blob = kwargs.pop("public_blob", None)
super(NullServer, self).__init__(*args, **kwargs)
def get_allowed_auths(self, username):
@@ -77,13 +77,13 @@ class NullServer(paramiko.ServerInterface):
return paramiko.AUTH_FAILED
# Base check: allowed auth type & fingerprint matches
happy = (
- key.get_name() in self.__allowed_keys and
- key.get_fingerprint() == expected
+ key.get_name() in self.__allowed_keys
+ and key.get_fingerprint() == expected
)
# Secondary check: if test wants assertions about cert data
if (
- self.__expected_public_blob is not None and
- key.public_blob != self.__expected_public_blob
+ self.__expected_public_blob is not None
+ and key.public_blob != self.__expected_public_blob
):
happy = False
return paramiko.AUTH_SUCCESSFUL if happy else paramiko.AUTH_FAILED
@@ -150,7 +150,7 @@ class SSHClientTest(unittest.TestCase):
``NullServer`` used for testing.
"""
run_kwargs = {}
- for key in ('allowed_keys', 'public_blob'):
+ for key in ("allowed_keys", "public_blob"):
run_kwargs[key] = kwargs.pop(key, None)
# Server setup
threading.Thread(target=self._run, kwargs=run_kwargs).start()
@@ -270,9 +270,9 @@ class SSHClientTest(unittest.TestCase):
# They're similar except for which path is given; the expected auth and
# server-side behavior is 100% identical.)
# NOTE: only bothered whipping up one cert per overall class/family.
- for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'):
- cert_name = 'test_{0}.key-cert.pub'.format(type_)
- cert_path = _support(os.path.join('cert_support', cert_name))
+ for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"):
+ cert_name = "test_{0}.key-cert.pub".format(type_)
+ cert_path = _support(os.path.join("cert_support", cert_name))
self._test_connection(
key_filename=cert_path,
public_blob=PublicBlob.from_file(cert_path),
@@ -285,13 +285,13 @@ class SSHClientTest(unittest.TestCase):
# about the server-side key object's public blob. Thus, we can prove
# that a specific cert was found, along with regular authorization
# succeeding proving that the overall flow works.
- for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'):
- key_name = 'test_{0}.key'.format(type_)
- key_path = _support(os.path.join('cert_support', key_name))
+ for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"):
+ key_name = "test_{0}.key".format(type_)
+ key_path = _support(os.path.join("cert_support", key_name))
self._test_connection(
key_filename=key_path,
public_blob=PublicBlob.from_file(
- '{0}-cert.pub'.format(key_path)
+ "{0}-cert.pub".format(key_path)
),
)
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 2479f273..08d38e3b 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -488,7 +488,7 @@ class KeyTest(unittest.TestCase):
# No exception -> it's good. Meh.
def test_ed25519_load_from_file_obj(self):
- with open(_support('test_ed25519.key')) as pkey_fileobj:
+ with open(_support("test_ed25519.key")) as pkey_fileobj:
key = Ed25519Key.from_private_key(pkey_fileobj)
self.assertEqual(key, key)
self.assertTrue(key.can_sign())
@@ -513,19 +513,21 @@ class KeyTest(unittest.TestCase):
# test_client.py; this and nearby cert tests are more about the gritty
# details.
# PKey.load_certificate
- key_path = _support(os.path.join('cert_support', 'test_rsa.key'))
+ key_path = _support(os.path.join("cert_support", "test_rsa.key"))
key = RSAKey.from_private_key_file(key_path)
self.assertTrue(key.public_blob is None)
cert_path = _support(
- os.path.join('cert_support', 'test_rsa.key-cert.pub')
+ os.path.join("cert_support", "test_rsa.key-cert.pub")
)
key.load_certificate(cert_path)
self.assertTrue(key.public_blob is not None)
- self.assertEqual(key.public_blob.key_type, 'ssh-rsa-cert-v01@openssh.com')
- self.assertEqual(key.public_blob.comment, 'test_rsa.key.pub')
+ self.assertEqual(
+ key.public_blob.key_type, "ssh-rsa-cert-v01@openssh.com"
+ )
+ self.assertEqual(key.public_blob.comment, "test_rsa.key.pub")
# Delve into blob contents, for test purposes
msg = Message(key.public_blob.key_blob)
- self.assertEqual(msg.get_text(), 'ssh-rsa-cert-v01@openssh.com')
+ self.assertEqual(msg.get_text(), "ssh-rsa-cert-v01@openssh.com")
nonce = msg.get_string()
e = msg.get_mpint()
n = msg.get_mpint()
@@ -535,10 +537,10 @@ class KeyTest(unittest.TestCase):
self.assertEqual(msg.get_int64(), 1234)
# Prevented from loading certificate that doesn't match
- key_path = _support(os.path.join('cert_support', 'test_ed25519.key'))
+ key_path = _support(os.path.join("cert_support", "test_ed25519.key"))
key1 = Ed25519Key.from_private_key_file(key_path)
self.assertRaises(
ValueError,
key1.load_certificate,
- _support('test_rsa.key-cert.pub'),
+ _support("test_rsa.key-cert.pub"),
)