summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--paramiko/sftp_client.py2
-rw-r--r--paramiko/sftp_file.py22
2 files changed, 14 insertions, 10 deletions
diff --git a/paramiko/sftp_client.py b/paramiko/sftp_client.py
index d9215743..cf94582c 100644
--- a/paramiko/sftp_client.py
+++ b/paramiko/sftp_client.py
@@ -736,7 +736,7 @@ class SFTPClient (BaseSFTP):
self._convert_status(msg)
return t, msg
if fileobj is not type(None):
- fileobj._async_response(t, msg)
+ fileobj._async_response(t, msg, num)
if waitfor is None:
# just doing a single check
break
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py
index 4ec936d0..b7cf8501 100644
--- a/paramiko/sftp_file.py
+++ b/paramiko/sftp_file.py
@@ -53,7 +53,8 @@ class SFTPFile (BufferedFile):
self._prefetching = False
self._prefetch_done = False
self._prefetch_data = {}
- self._prefetch_reads = []
+ self._prefetch_extents = {}
+ self._prefetch_lock = threading.Lock()
self._saved_exception = None
self._reqs = deque()
@@ -91,7 +92,7 @@ class SFTPFile (BufferedFile):
pass
def _data_in_prefetch_requests(self, offset, size):
- k = [i for i in self._prefetch_reads if i[0] <= offset]
+ k = [self._prefetch_extents[i] for i in self._prefetch_extents if self._prefetch_extents[i][0] <= offset]
if len(k) == 0:
return False
k.sort(lambda x, y: cmp(x[0], y[0]))
@@ -447,7 +448,6 @@ class SFTPFile (BufferedFile):
def _start_prefetch(self, chunks):
self._prefetching = True
self._prefetch_done = False
- self._prefetch_reads.extend(chunks)
t = threading.Thread(target=self._prefetch_thread, args=(chunks,))
t.setDaemon(True)
@@ -457,9 +457,11 @@ class SFTPFile (BufferedFile):
# do these read requests in a temporary thread because there may be
# a lot of them, so it may block.
for offset, length in chunks:
- self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length))
+ with self._prefetch_lock:
+ num = self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length))
+ self._prefetch_extents[num] = (offset, length)
- def _async_response(self, t, msg):
+ def _async_response(self, t, msg, num):
if t == CMD_STATUS:
# save exception and re-raise it on next file operation
try:
@@ -470,10 +472,12 @@ class SFTPFile (BufferedFile):
if t != CMD_DATA:
raise SFTPError('Expected data')
data = msg.get_string()
- offset, length = self._prefetch_reads.pop(0)
- self._prefetch_data[offset] = data
- if len(self._prefetch_reads) == 0:
- self._prefetch_done = True
+ with self._prefetch_lock:
+ offset,length = self._prefetch_extents[num]
+ self._prefetch_data[offset] = data
+ del self._prefetch_extents[num]
+ if len(self._prefetch_extents) == 0:
+ self._prefetch_done = True
def _check_exception(self):
"if there's a saved exception, raise & clear it"