diff options
-rw-r--r-- | paramiko/channel.py | 4 | ||||
-rw-r--r-- | paramiko/transport.py | 13 | ||||
-rw-r--r-- | tests/test_client.py | 52 |
3 files changed, 37 insertions, 32 deletions
diff --git a/paramiko/channel.py b/paramiko/channel.py index f053f4a8..b2e8edd1 100644 --- a/paramiko/channel.py +++ b/paramiko/channel.py @@ -329,7 +329,7 @@ class Channel(ClosingContextManager): try: self.set_environment_variable(name, value) except SSHException as e: - err = "Failed to set environment variable \"{0}\"." + err = 'Failed to set environment variable "{0}".' raise SSHException(err.format(name), e) @open_only @@ -353,7 +353,7 @@ class Channel(ClosingContextManager): m = Message() m.add_byte(cMSG_CHANNEL_REQUEST) m.add_int(self.remote_chanid) - m.add_string('env') + m.add_string("env") m.add_boolean(False) m.add_string(name) m.add_string(value) diff --git a/paramiko/transport.py b/paramiko/transport.py index 819639b7..828b2c22 100644 --- a/paramiko/transport.py +++ b/paramiko/transport.py @@ -536,9 +536,8 @@ class Transport(threading.Thread, ClosingContextManager): if e is not None: raise e raise SSHException("Negotiation failed.") - if ( - event.is_set() or - (timeout is not None and time.time() >= max_time) + if event.is_set() or ( + timeout is not None and time.time() >= max_time ): break @@ -2225,10 +2224,10 @@ class Transport(threading.Thread, ClosingContextManager): ) # noqa self.host_key_type = agreed_keys[0] if self.server_mode and (self.get_server_key() is None): - raise SSHException('Incompatible ssh peer (can\'t match requested host key type)') # noqa - self._log_agreement( - 'HostKey', agreed_keys[0], agreed_keys[0] - ) + raise SSHException( + "Incompatible ssh peer (can't match requested host key type)" + ) # noqa + self._log_agreement("HostKey", agreed_keys[0], agreed_keys[0]) if self.server_mode: agreed_local_ciphers = list( diff --git a/tests/test_client.py b/tests/test_client.py index dd2676f5..8bf7556d 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -84,11 +84,11 @@ class NullServer(paramiko.ServerInterface): return True def check_channel_env_request(self, channel, name, value): - if name == 'INVALID_ENV': + if name == "INVALID_ENV": return False - if not hasattr(channel, 'env'): - setattr(channel, 'env', {}) + if not hasattr(channel, "env"): + setattr(channel, "env", {}) channel.env[name] = value return True @@ -118,10 +118,10 @@ class SSHClientTest(unittest.TestCase): allowed_keys = FINGERPRINTS.keys() self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks) - keypath = _support('test_rsa.key') + keypath = _support("test_rsa.key") host_key = paramiko.RSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) - keypath = _support('test_ecdsa_256.key') + keypath = _support("test_ecdsa_256.key") host_key = paramiko.ECDSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) server = NullServer(allowed_keys=allowed_keys) @@ -252,8 +252,8 @@ class SSHClientTest(unittest.TestCase): verify that SSHClient's AutoAddPolicy works. """ threading.Thread(target=self._run).start() - hostname = '[%s]:%d' % (self.addr, self.port) - key_file = _support('test_ecdsa_256.key') + hostname = "[%s]:%d" % (self.addr, self.port) + key_file = _support("test_ecdsa_256.key") public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) self.tc = paramiko.SSHClient() @@ -450,7 +450,7 @@ class SSHClientTest(unittest.TestCase): def _client_host_key_bad(self, host_key): threading.Thread(target=self._run).start() - hostname = '[%s]:%d' % (self.addr, self.port) + hostname = "[%s]:%d" % (self.addr, self.port) self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.WarningPolicy()) @@ -460,13 +460,13 @@ class SSHClientTest(unittest.TestCase): self.assertRaises( paramiko.BadHostKeyException, self.tc.connect, - password='pygmalion', + password="pygmalion", **self.connect_kwargs ) def _client_host_key_good(self, ktype, kfile): threading.Thread(target=self._run).start() - hostname = '[%s]:%d' % (self.addr, self.port) + hostname = "[%s]:%d" % (self.addr, self.port) self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) @@ -474,7 +474,7 @@ class SSHClientTest(unittest.TestCase): known_hosts = self.tc.get_host_keys() known_hosts.add(hostname, host_key.get_name(), host_key) - self.tc.connect(password='pygmalion', **self.connect_kwargs) + self.tc.connect(password="pygmalion", **self.connect_kwargs) self.event.wait(1.0) self.assertTrue(self.event.is_set()) self.assertTrue(self.ts.is_active()) @@ -489,10 +489,10 @@ class SSHClientTest(unittest.TestCase): self._client_host_key_bad(host_key) def test_host_key_negotiation_3(self): - self._client_host_key_good(paramiko.ECDSAKey, 'test_ecdsa_256.key') + self._client_host_key_good(paramiko.ECDSAKey, "test_ecdsa_256.key") def test_host_key_negotiation_4(self): - self._client_host_key_good(paramiko.RSAKey, 'test_rsa.key') + self._client_host_key_good(paramiko.RSAKey, "test_rsa.key") def test_update_environment(self): """ @@ -503,28 +503,34 @@ class SSHClientTest(unittest.TestCase): self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.assertEqual(0, len(self.tc.get_host_keys())) - self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') + self.tc.connect( + self.addr, self.port, username="slowdive", password="pygmalion" + ) self.event.wait(1.0) self.assertTrue(self.event.isSet()) self.assertTrue(self.ts.is_active()) - target_env = {b'A': b'B', b'C': b'd'} + target_env = {b"A": b"B", b"C": b"d"} - self.tc.exec_command('yes', environment=target_env) + self.tc.exec_command("yes", environment=target_env) schan = self.ts.accept(1.0) - self.assertEqual(target_env, getattr(schan, 'env', {})) + self.assertEqual(target_env, getattr(schan, "env", {})) schan.close() # Cannot use assertRaises in context manager mode as it is not supported # in Python 2.6. try: # Verify that a rejection by the server can be detected - self.tc.exec_command('yes', environment={b'INVALID_ENV': b''}) + self.tc.exec_command("yes", environment={b"INVALID_ENV": b""}) except SSHException as e: - self.assertTrue('INVALID_ENV' in str(e), - 'Expected variable name in error message') - self.assertTrue(isinstance(e.args[1], SSHException), - 'Expected original SSHException in exception') + self.assertTrue( + "INVALID_ENV" in str(e), + "Expected variable name in error message", + ) + self.assertTrue( + isinstance(e.args[1], SSHException), + "Expected original SSHException in exception", + ) else: - self.assertFalse(False, 'SSHException was not thrown.') + self.assertFalse(False, "SSHException was not thrown.") |