summaryrefslogtreecommitdiffhomepage
path: root/tests/test_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_client.py')
-rw-r--r--tests/test_client.py253
1 files changed, 133 insertions, 120 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
index 7163fdcf..4943df29 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -48,30 +48,31 @@ requires_gss_auth = unittest.skipUnless(
)
FINGERPRINTS = {
- 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c',
- 'ssh-rsa': b'\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5',
- 'ecdsa-sha2-nistp256': b'\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60',
- 'ssh-ed25519': b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80',
+ "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c",
+ "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5",
+ "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60",
+ "ssh-ed25519": b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80',
}
class NullServer(paramiko.ServerInterface):
+
def __init__(self, *args, **kwargs):
# Allow tests to enable/disable specific key types
- self.__allowed_keys = kwargs.pop('allowed_keys', [])
+ self.__allowed_keys = kwargs.pop("allowed_keys", [])
# And allow them to set a (single...meh) expected public blob (cert)
- self.__expected_public_blob = kwargs.pop('public_blob', None)
+ self.__expected_public_blob = kwargs.pop("public_blob", None)
super(NullServer, self).__init__(*args, **kwargs)
def get_allowed_auths(self, username):
- if username == 'slowdive':
- return 'publickey,password'
- return 'publickey'
+ if username == "slowdive":
+ return "publickey,password"
+ return "publickey"
def check_auth_password(self, username, password):
- if (username == 'slowdive') and (password == 'pygmalion'):
+ if (username == "slowdive") and (password == "pygmalion"):
return paramiko.AUTH_SUCCESSFUL
- if (username == 'slowdive') and (password == 'unresponsive-server'):
+ if (username == "slowdive") and (password == "unresponsive-server"):
time.sleep(5)
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
@@ -83,13 +84,13 @@ class NullServer(paramiko.ServerInterface):
return paramiko.AUTH_FAILED
# Base check: allowed auth type & fingerprint matches
happy = (
- key.get_name() in self.__allowed_keys and
- key.get_fingerprint() == expected
+ key.get_name() in self.__allowed_keys
+ and key.get_fingerprint() == expected
)
# Secondary check: if test wants assertions about cert data
if (
- self.__expected_public_blob is not None and
- key.public_blob != self.__expected_public_blob
+ self.__expected_public_blob is not None
+ and key.public_blob != self.__expected_public_blob
):
happy = False
return paramiko.AUTH_SUCCESSFUL if happy else paramiko.AUTH_FAILED
@@ -98,31 +99,32 @@ class NullServer(paramiko.ServerInterface):
return paramiko.OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != b'yes':
+ if command != b"yes":
return False
return True
def check_channel_env_request(self, channel, name, value):
- if name == 'INVALID_ENV':
+ if name == "INVALID_ENV":
return False
- if not hasattr(channel, 'env'):
- setattr(channel, 'env', {})
+ if not hasattr(channel, "env"):
+ setattr(channel, "env", {})
channel.env[name] = value
return True
class ClientTest(unittest.TestCase):
+
def setUp(self):
self.sockl = socket.socket()
- self.sockl.bind(('localhost', 0))
+ self.sockl.bind(("localhost", 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.connect_kwargs = dict(
hostname=self.addr,
port=self.port,
- username='slowdive',
+ username="slowdive",
look_for_keys=False,
)
self.event = threading.Event()
@@ -130,10 +132,10 @@ class ClientTest(unittest.TestCase):
def tearDown(self):
# Shut down client Transport
- if hasattr(self, 'tc'):
+ if hasattr(self, "tc"):
self.tc.close()
# Shut down shared socket
- if hasattr(self, 'sockl'):
+ if hasattr(self, "sockl"):
# Signal to server thread that it should shut down early; it checks
# this immediately after accept(). (In scenarios where connection
# actually succeeded during the test, this becomes a no-op.)
@@ -151,7 +153,7 @@ class ClientTest(unittest.TestCase):
self.sockl.close()
def _run(
- self, allowed_keys=None, delay=0, public_blob=None, kill_event=None,
+ self, allowed_keys=None, delay=0, public_blob=None, kill_event=None
):
if allowed_keys is None:
allowed_keys = FINGERPRINTS.keys()
@@ -163,10 +165,10 @@ class ClientTest(unittest.TestCase):
self.socks.close()
return
self.ts = paramiko.Transport(self.socks)
- keypath = _support('test_rsa.key')
+ keypath = _support("test_rsa.key")
host_key = paramiko.RSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
- keypath = _support('test_ecdsa_256.key')
+ keypath = _support("test_ecdsa_256.key")
host_key = paramiko.ECDSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob)
@@ -181,17 +183,21 @@ class ClientTest(unittest.TestCase):
The exception is ``allowed_keys`` which is stripped and handed to the
``NullServer`` used for testing.
"""
- run_kwargs = {'kill_event': self.kill_event}
- for key in ('allowed_keys', 'public_blob'):
+ run_kwargs = {"kill_event": self.kill_event}
+ for key in ("allowed_keys", "public_blob"):
run_kwargs[key] = kwargs.pop(key, None)
# Server setup
threading.Thread(target=self._run, kwargs=run_kwargs).start()
- host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key'))
+ host_key = paramiko.RSAKey.from_private_key_file(
+ _support("test_rsa.key")
+ )
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
# Client setup
self.tc = paramiko.SSHClient()
- self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
+ self.tc.get_host_keys().add(
+ "[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key
+ )
# Actual connection
self.tc.connect(**dict(self.connect_kwargs, **kwargs))
@@ -200,22 +206,22 @@ class ClientTest(unittest.TestCase):
self.event.wait(1.0)
self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
- self.assertEqual('slowdive', self.ts.get_username())
+ self.assertEqual("slowdive", self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
self.assertEqual(False, self.tc.get_transport().gss_kex_used)
# Command execution functions?
- stdin, stdout, stderr = self.tc.exec_command('yes')
+ stdin, stdout, stderr = self.tc.exec_command("yes")
schan = self.ts.accept(1.0)
- schan.send('Hello there.\n')
- schan.send_stderr('This is on stderr.\n')
+ schan.send("Hello there.\n")
+ schan.send_stderr("This is on stderr.\n")
schan.close()
- self.assertEqual('Hello there.\n', stdout.readline())
- self.assertEqual('', stdout.readline())
- self.assertEqual('This is on stderr.\n', stderr.readline())
- self.assertEqual('', stderr.readline())
+ self.assertEqual("Hello there.\n", stdout.readline())
+ self.assertEqual("", stdout.readline())
+ self.assertEqual("This is on stderr.\n", stderr.readline())
+ self.assertEqual("", stderr.readline())
# Cleanup
stdin.close()
@@ -224,32 +230,33 @@ class ClientTest(unittest.TestCase):
class SSHClientTest(ClientTest):
+
def test_1_client(self):
"""
verify that the SSHClient stuff works too.
"""
- self._test_connection(password='pygmalion')
+ self._test_connection(password="pygmalion")
def test_2_client_dsa(self):
"""
verify that SSHClient works with a DSA key.
"""
- self._test_connection(key_filename=_support('test_dss.key'))
+ self._test_connection(key_filename=_support("test_dss.key"))
def test_client_rsa(self):
"""
verify that SSHClient works with an RSA key.
"""
- self._test_connection(key_filename=_support('test_rsa.key'))
+ self._test_connection(key_filename=_support("test_rsa.key"))
def test_2_5_client_ecdsa(self):
"""
verify that SSHClient works with an ECDSA key.
"""
- self._test_connection(key_filename=_support('test_ecdsa_256.key'))
+ self._test_connection(key_filename=_support("test_ecdsa_256.key"))
def test_client_ed25519(self):
- self._test_connection(key_filename=_support('test_ed25519.key'))
+ self._test_connection(key_filename=_support("test_ed25519.key"))
def test_3_multiple_key_files(self):
"""
@@ -257,22 +264,22 @@ class SSHClientTest(ClientTest):
"""
# This is dumb :(
types_ = {
- 'rsa': 'ssh-rsa',
- 'dss': 'ssh-dss',
- 'ecdsa': 'ecdsa-sha2-nistp256',
+ "rsa": "ssh-rsa",
+ "dss": "ssh-dss",
+ "ecdsa": "ecdsa-sha2-nistp256",
}
# Various combos of attempted & valid keys
# TODO: try every possible combo using itertools functions
for attempt, accept in (
- (['rsa', 'dss'], ['dss']), # Original test #3
- (['dss', 'rsa'], ['dss']), # Ordering matters sometimes, sadly
- (['dss', 'rsa', 'ecdsa_256'], ['dss']), # Try ECDSA but fail
- (['rsa', 'ecdsa_256'], ['ecdsa']), # ECDSA success
+ (["rsa", "dss"], ["dss"]), # Original test #3
+ (["dss", "rsa"], ["dss"]), # Ordering matters sometimes, sadly
+ (["dss", "rsa", "ecdsa_256"], ["dss"]), # Try ECDSA but fail
+ (["rsa", "ecdsa_256"], ["ecdsa"]), # ECDSA success
):
try:
self._test_connection(
key_filename=[
- _support('test_{}.key'.format(x)) for x in attempt
+ _support("test_{}.key".format(x)) for x in attempt
],
allowed_keys=[types_[x] for x in accept],
)
@@ -288,10 +295,11 @@ class SSHClientTest(ClientTest):
"""
# Until #387 is fixed we have to catch a high-up exception since
# various platforms trigger different errors here >_<
- self.assertRaises(SSHException,
+ self.assertRaises(
+ SSHException,
self._test_connection,
- key_filename=[_support('test_rsa.key')],
- allowed_keys=['ecdsa-sha2-nistp256'],
+ key_filename=[_support("test_rsa.key")],
+ allowed_keys=["ecdsa-sha2-nistp256"],
)
def test_certs_allowed_as_key_filename_values(self):
@@ -299,9 +307,9 @@ class SSHClientTest(ClientTest):
# They're similar except for which path is given; the expected auth and
# server-side behavior is 100% identical.)
# NOTE: only bothered whipping up one cert per overall class/family.
- for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'):
- cert_name = 'test_{}.key-cert.pub'.format(type_)
- cert_path = _support(os.path.join('cert_support', cert_name))
+ for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"):
+ cert_name = "test_{}.key-cert.pub".format(type_)
+ cert_path = _support(os.path.join("cert_support", cert_name))
self._test_connection(
key_filename=cert_path,
public_blob=PublicBlob.from_file(cert_path),
@@ -314,13 +322,13 @@ class SSHClientTest(ClientTest):
# about the server-side key object's public blob. Thus, we can prove
# that a specific cert was found, along with regular authorization
# succeeding proving that the overall flow works.
- for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'):
- key_name = 'test_{}.key'.format(type_)
- key_path = _support(os.path.join('cert_support', key_name))
+ for type_ in ("rsa", "dss", "ecdsa_256", "ed25519"):
+ key_name = "test_{}.key".format(type_)
+ key_path = _support(os.path.join("cert_support", key_name))
self._test_connection(
key_filename=key_path,
public_blob=PublicBlob.from_file(
- '{}-cert.pub'.format(key_path)
+ "{}-cert.pub".format(key_path)
),
)
@@ -335,19 +343,19 @@ class SSHClientTest(ClientTest):
verify that SSHClient's AutoAddPolicy works.
"""
threading.Thread(target=self._run).start()
- hostname = '[%s]:%d' % (self.addr, self.port)
- key_file = _support('test_ecdsa_256.key')
+ hostname = "[%s]:%d" % (self.addr, self.port)
+ key_file = _support("test_ecdsa_256.key")
public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file)
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
- self.tc.connect(password='pygmalion', **self.connect_kwargs)
+ self.tc.connect(password="pygmalion", **self.connect_kwargs)
self.event.wait(1.0)
self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
- self.assertEqual('slowdive', self.ts.get_username())
+ self.assertEqual("slowdive", self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
self.assertEqual(1, len(self.tc.get_host_keys()))
new_host_key = list(self.tc.get_host_keys()[hostname].values())[0]
@@ -357,9 +365,11 @@ class SSHClientTest(ClientTest):
"""
verify that SSHClient correctly saves a known_hosts file.
"""
- warnings.filterwarnings('ignore', 'tempnam.*')
+ warnings.filterwarnings("ignore", "tempnam.*")
- host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key'))
+ host_key = paramiko.RSAKey.from_private_key_file(
+ _support("test_rsa.key")
+ )
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
fd, localname = mkstemp()
os.close(fd)
@@ -367,11 +377,13 @@ class SSHClientTest(ClientTest):
client = paramiko.SSHClient()
self.assertEquals(0, len(client.get_host_keys()))
- host_id = '[%s]:%d' % (self.addr, self.port)
+ host_id = "[%s]:%d" % (self.addr, self.port)
- client.get_host_keys().add(host_id, 'ssh-rsa', public_host_key)
+ client.get_host_keys().add(host_id, "ssh-rsa", public_host_key)
self.assertEquals(1, len(client.get_host_keys()))
- self.assertEquals(public_host_key, client.get_host_keys()[host_id]['ssh-rsa'])
+ self.assertEquals(
+ public_host_key, client.get_host_keys()[host_id]["ssh-rsa"]
+ )
client.save_host_keys(localname)
@@ -394,7 +406,7 @@ class SSHClientTest(ClientTest):
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
- self.tc.connect(**dict(self.connect_kwargs, password='pygmalion'))
+ self.tc.connect(**dict(self.connect_kwargs, password="pygmalion"))
self.event.wait(1.0)
self.assertTrue(self.event.is_set())
@@ -423,7 +435,7 @@ class SSHClientTest(ClientTest):
self.tc = tc
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEquals(0, len(self.tc.get_host_keys()))
- self.tc.connect(**dict(self.connect_kwargs, password='pygmalion'))
+ self.tc.connect(**dict(self.connect_kwargs, password="pygmalion"))
self.event.wait(1.0)
self.assertTrue(self.event.is_set())
@@ -438,19 +450,19 @@ class SSHClientTest(ClientTest):
verify that the SSHClient has a configurable banner timeout.
"""
# Start the thread with a 1 second wait.
- threading.Thread(target=self._run, kwargs={'delay': 1}).start()
- host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key'))
+ threading.Thread(target=self._run, kwargs={"delay": 1}).start()
+ host_key = paramiko.RSAKey.from_private_key_file(
+ _support("test_rsa.key")
+ )
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
- self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
+ self.tc.get_host_keys().add(
+ "[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key
+ )
# Connect with a half second banner timeout.
kwargs = dict(self.connect_kwargs, banner_timeout=0.5)
- self.assertRaises(
- paramiko.SSHException,
- self.tc.connect,
- **kwargs
- )
+ self.assertRaises(paramiko.SSHException, self.tc.connect, **kwargs)
def test_8_auth_trickledown(self):
"""
@@ -466,9 +478,9 @@ class SSHClientTest(ClientTest):
# 'television' as per tests/test_pkey.py). NOTE: must use
# key_filename, loading the actual key here with PKey will except
# immediately; we're testing the try/except crap within Client.
- key_filename=[_support('test_rsa_password.key')],
+ key_filename=[_support("test_rsa_password.key")],
# Actual password for default 'slowdive' user
- password='pygmalion',
+ password="pygmalion",
)
self._test_connection(**kwargs)
@@ -481,7 +493,7 @@ class SSHClientTest(ClientTest):
self.assertRaises(
AuthenticationException,
self._test_connection,
- password='unresponsive-server',
+ password="unresponsive-server",
auth_timeout=0.5,
)
@@ -490,10 +502,7 @@ class SSHClientTest(ClientTest):
"""
Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding
"""
- kwargs = dict(
- gss_kex=True,
- key_filename=[_support('test_rsa.key')],
- )
+ kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")])
self._test_connection(**kwargs)
@requires_gss_auth
@@ -501,10 +510,7 @@ class SSHClientTest(ClientTest):
"""
Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
"""
- kwargs = dict(
- gss_auth=True,
- key_filename=[_support('test_rsa.key')],
- )
+ kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")])
self._test_connection(**kwargs)
def test_12_reject_policy(self):
@@ -519,7 +525,8 @@ class SSHClientTest(ClientTest):
self.assertRaises(
paramiko.SSHException,
self.tc.connect,
- password='pygmalion', **self.connect_kwargs
+ password="pygmalion",
+ **self.connect_kwargs
)
@requires_gss_auth
@@ -537,14 +544,14 @@ class SSHClientTest(ClientTest):
self.assertRaises(
paramiko.SSHException,
self.tc.connect,
- password='pygmalion',
+ password="pygmalion",
gss_kex=True,
- **self.connect_kwargs
+ **self.connect_kwargs
)
def _client_host_key_bad(self, host_key):
threading.Thread(target=self._run).start()
- hostname = '[%s]:%d' % (self.addr, self.port)
+ hostname = "[%s]:%d" % (self.addr, self.port)
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.WarningPolicy())
@@ -554,13 +561,13 @@ class SSHClientTest(ClientTest):
self.assertRaises(
paramiko.BadHostKeyException,
self.tc.connect,
- password='pygmalion',
+ password="pygmalion",
**self.connect_kwargs
)
def _client_host_key_good(self, ktype, kfile):
threading.Thread(target=self._run).start()
- hostname = '[%s]:%d' % (self.addr, self.port)
+ hostname = "[%s]:%d" % (self.addr, self.port)
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
@@ -568,7 +575,7 @@ class SSHClientTest(ClientTest):
known_hosts = self.tc.get_host_keys()
known_hosts.add(hostname, host_key.get_name(), host_key)
- self.tc.connect(password='pygmalion', **self.connect_kwargs)
+ self.tc.connect(password="pygmalion", **self.connect_kwargs)
self.event.wait(1.0)
self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
@@ -583,10 +590,10 @@ class SSHClientTest(ClientTest):
self._client_host_key_bad(host_key)
def test_host_key_negotiation_3(self):
- self._client_host_key_good(paramiko.ECDSAKey, 'test_ecdsa_256.key')
+ self._client_host_key_good(paramiko.ECDSAKey, "test_ecdsa_256.key")
def test_host_key_negotiation_4(self):
- self._client_host_key_good(paramiko.RSAKey, 'test_rsa.key')
+ self._client_host_key_good(paramiko.RSAKey, "test_rsa.key")
def _setup_for_env(self):
threading.Thread(target=self._run).start()
@@ -594,7 +601,9 @@ class SSHClientTest(ClientTest):
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.assertEqual(0, len(self.tc.get_host_keys()))
- self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
+ self.tc.connect(
+ self.addr, self.port, username="slowdive", password="pygmalion"
+ )
self.event.wait(1.0)
self.assertTrue(self.event.isSet())
@@ -605,11 +614,11 @@ class SSHClientTest(ClientTest):
Verify that environment variables can be set by the client.
"""
self._setup_for_env()
- target_env = {b'A': b'B', b'C': b'd'}
+ target_env = {b"A": b"B", b"C": b"d"}
- self.tc.exec_command('yes', environment=target_env)
+ self.tc.exec_command("yes", environment=target_env)
schan = self.ts.accept(1.0)
- self.assertEqual(target_env, getattr(schan, 'env', {}))
+ self.assertEqual(target_env, getattr(schan, "env", {}))
schan.close()
@unittest.skip("Clients normally fail silently, thus so do we, for now")
@@ -617,14 +626,14 @@ class SSHClientTest(ClientTest):
self._setup_for_env()
with self.assertRaises(SSHException) as manager:
# Verify that a rejection by the server can be detected
- self.tc.exec_command('yes', environment={b'INVALID_ENV': b''})
+ self.tc.exec_command("yes", environment={b"INVALID_ENV": b""})
self.assertTrue(
- 'INVALID_ENV' in str(manager.exception),
- 'Expected variable name in error message'
+ "INVALID_ENV" in str(manager.exception),
+ "Expected variable name in error message",
)
self.assertTrue(
isinstance(manager.exception.args[1], SSHException),
- 'Expected original SSHException in exception'
+ "Expected original SSHException in exception",
)
def test_missing_key_policy_accepts_classes_or_instances(self):
@@ -652,35 +661,39 @@ class PasswordPassphraseTests(ClientTest):
def test_password_kwarg_works_for_password_auth(self):
# Straightforward / duplicate of earlier basic password test.
- self._test_connection(password='pygmalion')
+ self._test_connection(password="pygmalion")
# TODO: more granular exception pending #387; should be signaling "no auth
# methods available" because no key and no password
@raises(SSHException)
def test_passphrase_kwarg_not_used_for_password_auth(self):
# Using the "right" password in the "wrong" field shouldn't work.
- self._test_connection(passphrase='pygmalion')
+ self._test_connection(passphrase="pygmalion")
def test_passphrase_kwarg_used_for_key_passphrase(self):
# Straightforward again, with new passphrase kwarg.
self._test_connection(
- key_filename=_support('test_rsa_password.key'),
- passphrase='television',
+ key_filename=_support("test_rsa_password.key"),
+ passphrase="television",
)
- def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given(self): # noqa
+ def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given(
+ self
+ ): # noqa
# Backwards compatibility: passphrase in the password field.
self._test_connection(
- key_filename=_support('test_rsa_password.key'),
- password='television',
+ key_filename=_support("test_rsa_password.key"),
+ password="television",
)
- @raises(AuthenticationException) # TODO: more granular
- def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given(self): # noqa
+ @raises(AuthenticationException) # TODO: more granular
+ def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given(
+ self
+ ): # noqa
# Sanity: if we're given both fields, the password field is NOT used as
# a passphrase.
self._test_connection(
- key_filename=_support('test_rsa_password.key'),
- password='television',
- passphrase='wat? lol no',
+ key_filename=_support("test_rsa_password.key"),
+ password="television",
+ passphrase="wat? lol no",
)