summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--paramiko/packet.py7
-rw-r--r--paramiko/transport.py38
-rw-r--r--paramiko/util.py2
-rwxr-xr-xtests/test_sftp.py48
-rw-r--r--tests/test_transport.py36
5 files changed, 111 insertions, 20 deletions
diff --git a/paramiko/packet.py b/paramiko/packet.py
index 6de9971f..cc389ce6 100644
--- a/paramiko/packet.py
+++ b/paramiko/packet.py
@@ -33,6 +33,10 @@ from paramiko.ssh_exception import SSHException
from paramiko.message import Message
+class NeedRekeyException (Exception):
+ pass
+
+
class Packetizer (object):
"""
Implementation of the base SSH packet protocol.
@@ -187,6 +191,8 @@ class Packetizer (object):
except socket.timeout:
if self.__closed:
raise EOFError()
+ if self.__need_rekey:
+ raise NeedRekeyException()
self._check_keepalive()
return out
@@ -269,6 +275,7 @@ class Packetizer (object):
done).
@raise SSHException: if the packet is mangled
+ @raise NeedRekeyException: if the transport should rekey
"""
header = self.read_all(self.__block_size_in)
if self.__block_engine_in != None:
diff --git a/paramiko/transport.py b/paramiko/transport.py
index 58ec46f3..232fdaad 100644
--- a/paramiko/transport.py
+++ b/paramiko/transport.py
@@ -35,7 +35,7 @@ from paramiko.ssh_exception import SSHException, BadAuthenticationType
from paramiko.message import Message
from paramiko.channel import Channel
from paramiko.sftp_client import SFTPClient
-from paramiko.packet import Packetizer
+from paramiko.packet import Packetizer, NeedRekeyException
from paramiko.rsakey import RSAKey
from paramiko.dsskey import DSSKey
from paramiko.kex_group1 import KexGroup1
@@ -247,6 +247,7 @@ class Transport (threading.Thread):
self.max_packet_size = 32768
self.saved_exception = None
self.clear_to_send = threading.Event()
+ self.clear_to_send_lock = threading.Lock()
self.log_name = 'paramiko.transport'
self.logger = util.get_logger(self.log_name)
self.packetizer.set_log(self.logger)
@@ -592,9 +593,9 @@ class Transport (threading.Thread):
self.channels_seen[chanid] = True
chan._set_transport(self)
chan._set_window(self.window_size, self.max_packet_size)
- self._send_user_message(m)
finally:
self.lock.release()
+ self._send_user_message(m)
while 1:
event.wait(0.1);
if not self.active:
@@ -1166,8 +1167,6 @@ class Transport (threading.Thread):
def _send_message(self, data):
self.packetizer.send_message(data)
- if self.packetizer.need_rekey() and not self.in_kex:
- self._send_kex_init()
def _send_user_message(self, data):
"""
@@ -1179,9 +1178,14 @@ class Transport (threading.Thread):
if not self.active:
self._log(DEBUG, 'Dropping user packet because connection is dead.')
return
+ self.clear_to_send_lock.acquire()
if self.clear_to_send.isSet():
break
- self._send_message(data)
+ self.clear_to_send_lock.release()
+ try:
+ self._send_message(data)
+ finally:
+ self.clear_to_send_lock.release()
def _set_K_H(self, k, h):
"used by a kex object to set the K (root key) and H (exchange hash)"
@@ -1246,7 +1250,10 @@ class Transport (threading.Thread):
while self.active:
if self.packetizer.need_rekey() and not self.in_kex:
self._send_kex_init()
- ptype, m = self.packetizer.read_message()
+ try:
+ ptype, m = self.packetizer.read_message()
+ except NeedRekeyException:
+ continue
if ptype == MSG_IGNORE:
continue
elif ptype == MSG_DISCONNECT:
@@ -1324,7 +1331,11 @@ class Transport (threading.Thread):
def _negotiate_keys(self, m):
# throws SSHException on anything unusual
- self.clear_to_send.clear()
+ self.clear_to_send_lock.acquire()
+ try:
+ self.clear_to_send.clear()
+ finally:
+ self.clear_to_send_lock.release()
if self.local_kex_init == None:
# remote side wants to renegotiate
self._send_kex_init()
@@ -1371,7 +1382,11 @@ class Transport (threading.Thread):
announce to the other side that we'd like to negotiate keys, and what
kind of key negotiation we support.
"""
- self.clear_to_send.clear()
+ self.clear_to_send_lock.acquire()
+ try:
+ self.clear_to_send.clear()
+ finally:
+ self.clear_to_send_lock.release()
self.in_kex = True
if self.server_mode:
if (self._modulus_pack is None) and ('diffie-hellman-group-exchange-sha1' in self._preferred_kex):
@@ -1559,7 +1574,12 @@ class Transport (threading.Thread):
# it's now okay to send data again (if this was a re-key)
if not self.packetizer.need_rekey():
self.in_kex = False
- self.clear_to_send.set()
+ self._log(DEBUG, 'clear to send')
+ self.clear_to_send_lock.acquire()
+ try:
+ self.clear_to_send.set()
+ finally:
+ self.clear_to_send_lock.release()
return
def _parse_disconnect(self, m):
diff --git a/paramiko/util.py b/paramiko/util.py
index cf90033c..3956863f 100644
--- a/paramiko/util.py
+++ b/paramiko/util.py
@@ -257,7 +257,7 @@ def log_to_file(filename, level=DEBUG):
l.setLevel(level)
f = open(filename, 'w')
lh = logging.StreamHandler(f)
- lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s] thr=%(_threadid)-3d %(name)s: %(message)s',
+ lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s.%(msecs)03d] thr=%(_threadid)-3d %(name)s: %(message)s',
'%Y%m%d-%H:%M:%S'))
l.addHandler(lh)
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index 4c5065e8..94e9dea5 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -62,7 +62,7 @@ liver insulin receptors. Their sensitivity to insulin is, however, similarly
decreased compared with chicken.
'''
-FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing')
+FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000')
sftp = None
tc = None
@@ -121,7 +121,6 @@ class SFTPTest (unittest.TestCase):
ts.start_server(event, server)
tc.connect(username='slowdive', password='pygmalion')
event.wait(1.0)
-# self.assert_(self.ts.is_active())
sftp = paramiko.SFTP.from_transport(tc)
init_loopback = staticmethod(init_loopback)
@@ -132,7 +131,14 @@ class SFTPTest (unittest.TestCase):
set_big_file_test = staticmethod(set_big_file_test)
def setUp(self):
- sftp.mkdir(FOLDER)
+ global FOLDER
+ for i in xrange(1000):
+ FOLDER = FOLDER[:-3] + '%03d' % i
+ try:
+ sftp.mkdir(FOLDER)
+ break
+ except (IOError, OSError):
+ pass
def tearDown(self):
sftp.rmdir(FOLDER)
@@ -511,17 +517,39 @@ class SFTPTest (unittest.TestCase):
global g_big_file_test
if not g_big_file_test:
return
- kblob = (1024 * 1024 * 'x')
+ mblob = (1024 * 1024 * 'x')
try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024)
- f.write(kblob)
+ f.write(mblob)
f.close()
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
finally:
sftp.remove('%s/hongry.txt' % FOLDER)
+
+ def test_H_big_file_renegotiate(self):
+ """
+ write a 1MB file, forcing key renegotiation in the middle.
+ """
+ global g_big_file_test
+ if not g_big_file_test:
+ return
+ t = sftp.sock.get_transport()
+ t.packetizer.REKEY_BYTES = 512 * 1024
+ k32blob = (32 * 1024 * 'x')
+ try:
+ f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024)
+ for i in xrange(32):
+ f.write(k32blob)
+ f.close()
- def test_H_realpath(self):
+ self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ self.assertNotEquals(t.H, t.session_id)
+ finally:
+ sftp.remove('%s/hongry.txt' % FOLDER)
+ t.packetizer.REKEY_BYTES = pow(2, 30)
+
+ def test_I_realpath(self):
"""
test that realpath is returning something non-empty and not an
error.
@@ -532,7 +560,7 @@ class SFTPTest (unittest.TestCase):
self.assert_(len(f) > 0)
self.assertEquals(os.path.join(pwd, FOLDER), f)
- def test_I_mkdir(self):
+ def test_J_mkdir(self):
"""
verify that mkdir/rmdir work.
"""
@@ -555,7 +583,7 @@ class SFTPTest (unittest.TestCase):
except IOError:
pass
- def test_J_chdir(self):
+ def test_K_chdir(self):
"""
verify that chdir/getcwd work.
"""
@@ -592,7 +620,7 @@ class SFTPTest (unittest.TestCase):
except:
pass
- def test_K_get_put(self):
+ def test_L_get_put(self):
"""
verify that get/put work.
"""
@@ -621,7 +649,7 @@ class SFTPTest (unittest.TestCase):
os.unlink(localname)
sftp.unlink(FOLDER + '/bunny.txt')
- def test_L_check(self):
+ def test_M_check(self):
"""
verify that file.check() works against our own server.
(it's an sftp extension that we support, and may be the only ones who
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 81254b48..c97fb160 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -504,3 +504,39 @@ class TransportTest (unittest.TestCase):
self.assertEquals('', chan.recv(16))
chan.close()
+
+ def test_F_renegotiate(self):
+ """
+ verify that a transport can correctly renegotiate mid-stream.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.ts.start_server(event, server)
+ self.tc.connect(hostkey=public_host_key,
+ username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ self.tc.packetizer.REKEY_BYTES = 16384
+
+ chan = self.tc.open_session()
+ self.assert_(chan.exec_command('yes'))
+ schan = self.ts.accept(1.0)
+
+ self.assertEquals(self.tc.H, self.tc.session_id)
+ for i in range(20):
+ chan.send('x' * 1024)
+ chan.close()
+
+ # allow a few seconds for the rekeying to complete
+ for i in xrange(50):
+ if self.tc.H != self.tc.session_id:
+ break
+ time.sleep(0.1)
+ self.assertNotEquals(self.tc.H, self.tc.session_id)
+
+ schan.close()