summaryrefslogtreecommitdiffhomepage
path: root/paramiko/transport.py
diff options
context:
space:
mode:
Diffstat (limited to 'paramiko/transport.py')
-rw-r--r--paramiko/transport.py119
1 files changed, 72 insertions, 47 deletions
diff --git a/paramiko/transport.py b/paramiko/transport.py
index 5ab5ac95..ef57108a 100644
--- a/paramiko/transport.py
+++ b/paramiko/transport.py
@@ -43,8 +43,8 @@ from paramiko.common import xffffffff, cMSG_CHANNEL_OPEN, cMSG_IGNORE, \
MSG_CHANNEL_OPEN_SUCCESS, MSG_CHANNEL_OPEN_FAILURE, MSG_CHANNEL_OPEN, \
MSG_CHANNEL_SUCCESS, MSG_CHANNEL_FAILURE, MSG_CHANNEL_DATA, \
MSG_CHANNEL_EXTENDED_DATA, MSG_CHANNEL_WINDOW_ADJUST, MSG_CHANNEL_REQUEST, \
- MSG_CHANNEL_EOF, MSG_CHANNEL_CLOSE, MIN_PACKET_SIZE, MAX_WINDOW_SIZE, \
- DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE
+ MSG_CHANNEL_EOF, MSG_CHANNEL_CLOSE, MIN_WINDOW_SIZE, MIN_PACKET_SIZE, \
+ MAX_WINDOW_SIZE, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE
from paramiko.compress import ZlibCompressor, ZlibDecompressor
from paramiko.dsskey import DSSKey
from paramiko.kex_gex import KexGex, KexGexSHA256
@@ -88,7 +88,7 @@ class Transport (threading.Thread, ClosingContextManager):
`channels <.Channel>`, across the session. Multiple channels can be
multiplexed across a single session (and often are, in the case of port
forwardings).
-
+
Instances of this class may be used as context managers.
"""
_PROTO_ID = '2.0'
@@ -283,7 +283,7 @@ class Transport (threading.Thread, ClosingContextManager):
self._channels = ChannelMap()
self.channel_events = {} # (id -> Event)
self.channels_seen = {} # (id -> True)
- self._channel_counter = 1
+ self._channel_counter = 0
self.default_max_packet_size = default_max_packet_size
self.default_window_size = default_window_size
self._forward_agent_handler = None
@@ -301,6 +301,8 @@ class Transport (threading.Thread, ClosingContextManager):
self.global_response = None # response Message from an arbitrary global request
self.completion_event = None # user-defined event callbacks
self.banner_timeout = 15 # how long (seconds) to wait for the SSH banner
+ self.handshake_timeout = 15 # how long (seconds) to wait for the handshake to finish after SSH banner sent.
+
# server mode:
self.server_mode = False
@@ -411,7 +413,7 @@ class Transport (threading.Thread, ClosingContextManager):
if e is not None:
raise e
raise SSHException('Negotiation failed.')
- if event.isSet():
+ if event.is_set():
break
def start_server(self, event=None, server=None):
@@ -476,7 +478,7 @@ class Transport (threading.Thread, ClosingContextManager):
if e is not None:
raise e
raise SSHException('Negotiation failed.')
- if event.isSet():
+ if event.is_set():
break
def add_server_key(self, key):
@@ -514,6 +516,7 @@ class Transport (threading.Thread, ClosingContextManager):
pass
return None
+ @staticmethod
def load_server_moduli(filename=None):
"""
(optional)
@@ -553,7 +556,6 @@ class Transport (threading.Thread, ClosingContextManager):
# none succeeded
Transport._modulus_pack = None
return False
- load_server_moduli = staticmethod(load_server_moduli)
def close(self):
"""
@@ -593,7 +595,7 @@ class Transport (threading.Thread, ClosingContextManager):
"""
return self.active
- def open_session(self, window_size=None, max_packet_size=None):
+ def open_session(self, window_size=None, max_packet_size=None, timeout=None):
"""
Request a new channel to the server, of type ``"session"``. This is
just an alias for calling `open_channel` with an argument of
@@ -618,7 +620,8 @@ class Transport (threading.Thread, ClosingContextManager):
"""
return self.open_channel('session',
window_size=window_size,
- max_packet_size=max_packet_size)
+ max_packet_size=max_packet_size,
+ timeout=timeout)
def open_x11_channel(self, src_addr=None):
"""
@@ -665,7 +668,8 @@ class Transport (threading.Thread, ClosingContextManager):
dest_addr=None,
src_addr=None,
window_size=None,
- max_packet_size=None):
+ max_packet_size=None,
+ timeout=None):
"""
Request a new channel to the server. `Channels <.Channel>` are
socket-like objects used for the actual transfer of data across the
@@ -689,17 +693,20 @@ class Transport (threading.Thread, ClosingContextManager):
optional window size for this session.
:param int max_packet_size:
optional max packet size for this session.
+ :param float timeout:
+ optional timeout opening a channel, default 3600s (1h)
:return: a new `.Channel` on success
- :raises SSHException: if the request is rejected or the session ends
- prematurely
+ :raises SSHException: if the request is rejected, the session ends
+ prematurely or there is a timeout openning a channel
.. versionchanged:: 1.15
Added the ``window_size`` and ``max_packet_size`` arguments.
"""
if not self.active:
raise SSHException('SSH session not active')
+ timeout = 3600 if timeout is None else timeout
self.lock.acquire()
try:
window_size = self._sanitize_window_size(window_size)
@@ -728,6 +735,7 @@ class Transport (threading.Thread, ClosingContextManager):
finally:
self.lock.release()
self._send_user_message(m)
+ start_ts = time.time()
while True:
event.wait(0.1)
if not self.active:
@@ -735,8 +743,10 @@ class Transport (threading.Thread, ClosingContextManager):
if e is None:
e = SSHException('Unable to open channel.')
raise e
- if event.isSet():
+ if event.is_set():
break
+ elif start_ts + timeout < time.time():
+ raise SSHException('Timeout openning channel.')
chan = self._channels.get(chanid)
if chan is not None:
return chan
@@ -855,7 +865,7 @@ class Transport (threading.Thread, ClosingContextManager):
if e is not None:
raise e
raise SSHException('Negotiation failed.')
- if self.completion_event.isSet():
+ if self.completion_event.is_set():
break
return
@@ -906,7 +916,7 @@ class Transport (threading.Thread, ClosingContextManager):
self.completion_event.wait(0.1)
if not self.active:
return None
- if self.completion_event.isSet():
+ if self.completion_event.is_set():
break
return self.global_response
@@ -1080,6 +1090,8 @@ class Transport (threading.Thread, ClosingContextManager):
supplied, this method returns ``None``.
:returns: server supplied banner (`str`), or ``None``.
+
+ .. versionadded:: 1.13
"""
if not self.active or (self.auth_handler is None):
return None
@@ -1465,7 +1477,7 @@ class Transport (threading.Thread, ClosingContextManager):
self._log(DEBUG, 'Dropping user packet because connection is dead.')
return
self.clear_to_send_lock.acquire()
- if self.clear_to_send.isSet():
+ if self.clear_to_send.is_set():
break
self.clear_to_send_lock.release()
if time.time() > start + self.clear_to_send_timeout:
@@ -1559,7 +1571,7 @@ class Transport (threading.Thread, ClosingContextManager):
def _sanitize_window_size(self, window_size):
if window_size is None:
window_size = self.default_window_size
- return clamp_value(MIN_PACKET_SIZE, window_size, MAX_WINDOW_SIZE)
+ return clamp_value(MIN_WINDOW_SIZE, window_size, MAX_WINDOW_SIZE)
def _sanitize_packet_size(self, max_packet_size):
if max_packet_size is None:
@@ -1587,6 +1599,12 @@ class Transport (threading.Thread, ClosingContextManager):
try:
self.packetizer.write_all(b(self.local_version + '\r\n'))
self._check_banner()
+ # The above is actually very much part of the handshake, but sometimes the banner can be read
+ # but the machine is not responding, for example when the remote ssh daemon is loaded in to memory
+ # but we can not read from the disk/spawn a new shell.
+ # Make sure we can specify a timeout for the initial handshake.
+ # Re-use the banner timeout for now.
+ self.packetizer.start_handshake(self.handshake_timeout)
self._send_kex_init()
self._expect_packet(MSG_KEXINIT)
@@ -1636,6 +1654,7 @@ class Transport (threading.Thread, ClosingContextManager):
msg.add_byte(cMSG_UNIMPLEMENTED)
msg.add_int(m.seqno)
self._send_message(msg)
+ self.packetizer.complete_handshake()
except SSHException as e:
self._log(ERROR, 'Exception: ' + str(e))
self._log(ERROR, util.tb_strings())
@@ -2216,21 +2235,6 @@ class SecurityOptions (object):
"""
return '<paramiko.SecurityOptions for %s>' % repr(self._transport)
- def _get_ciphers(self):
- return self._transport._preferred_ciphers
-
- def _get_digests(self):
- return self._transport._preferred_macs
-
- def _get_key_types(self):
- return self._transport._preferred_keys
-
- def _get_kex(self):
- return self._transport._preferred_kex
-
- def _get_compression(self):
- return self._transport._preferred_compression
-
def _set(self, name, orig, x):
if type(x) is list:
x = tuple(x)
@@ -2242,30 +2246,51 @@ class SecurityOptions (object):
raise ValueError('unknown cipher')
setattr(self._transport, name, x)
- def _set_ciphers(self, x):
+ @property
+ def ciphers(self):
+ """Symmetric encryption ciphers"""
+ return self._transport._preferred_ciphers
+
+ @ciphers.setter
+ def ciphers(self, x):
self._set('_preferred_ciphers', '_cipher_info', x)
- def _set_digests(self, x):
+ @property
+ def digests(self):
+ """Digest (one-way hash) algorithms"""
+ return self._transport._preferred_macs
+
+ @digests.setter
+ def digests(self, x):
self._set('_preferred_macs', '_mac_info', x)
- def _set_key_types(self, x):
+ @property
+ def key_types(self):
+ """Public-key algorithms"""
+ return self._transport._preferred_keys
+
+ @key_types.setter
+ def key_types(self, x):
self._set('_preferred_keys', '_key_info', x)
- def _set_kex(self, x):
+
+ @property
+ def kex(self):
+ """Key exchange algorithms"""
+ return self._transport._preferred_kex
+
+ @kex.setter
+ def kex(self, x):
self._set('_preferred_kex', '_kex_info', x)
- def _set_compression(self, x):
- self._set('_preferred_compression', '_compression_info', x)
+ @property
+ def compression(self):
+ """Compression algorithms"""
+ return self._transport._preferred_compression
- ciphers = property(_get_ciphers, _set_ciphers, None,
- "Symmetric encryption ciphers")
- digests = property(_get_digests, _set_digests, None,
- "Digest (one-way hash) algorithms")
- key_types = property(_get_key_types, _set_key_types, None,
- "Public-key algorithms")
- kex = property(_get_kex, _set_kex, None, "Key exchange algorithms")
- compression = property(_get_compression, _set_compression, None,
- "Compression algorithms")
+ @compression.setter
+ def compression(self, x):
+ self._set('_preferred_compression', '_compression_info', x)
class ChannelMap (object):