diff options
-rw-r--r-- | paramiko/sftp_client.py | 2 | ||||
-rw-r--r-- | paramiko/sftp_file.py | 22 |
2 files changed, 14 insertions, 10 deletions
diff --git a/paramiko/sftp_client.py b/paramiko/sftp_client.py index d9215743..cf94582c 100644 --- a/paramiko/sftp_client.py +++ b/paramiko/sftp_client.py @@ -736,7 +736,7 @@ class SFTPClient (BaseSFTP): self._convert_status(msg) return t, msg if fileobj is not type(None): - fileobj._async_response(t, msg) + fileobj._async_response(t, msg, num) if waitfor is None: # just doing a single check break diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index 4ec936d0..b7cf8501 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -53,7 +53,8 @@ class SFTPFile (BufferedFile): self._prefetching = False self._prefetch_done = False self._prefetch_data = {} - self._prefetch_reads = [] + self._prefetch_extents = {} + self._prefetch_lock = threading.Lock() self._saved_exception = None self._reqs = deque() @@ -91,7 +92,7 @@ class SFTPFile (BufferedFile): pass def _data_in_prefetch_requests(self, offset, size): - k = [i for i in self._prefetch_reads if i[0] <= offset] + k = [self._prefetch_extents[i] for i in self._prefetch_extents if self._prefetch_extents[i][0] <= offset] if len(k) == 0: return False k.sort(lambda x, y: cmp(x[0], y[0])) @@ -447,7 +448,6 @@ class SFTPFile (BufferedFile): def _start_prefetch(self, chunks): self._prefetching = True self._prefetch_done = False - self._prefetch_reads.extend(chunks) t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) t.setDaemon(True) @@ -457,9 +457,11 @@ class SFTPFile (BufferedFile): # do these read requests in a temporary thread because there may be # a lot of them, so it may block. for offset, length in chunks: - self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) + with self._prefetch_lock: + num = self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) + self._prefetch_extents[num] = (offset, length) - def _async_response(self, t, msg): + def _async_response(self, t, msg, num): if t == CMD_STATUS: # save exception and re-raise it on next file operation try: @@ -470,10 +472,12 @@ class SFTPFile (BufferedFile): if t != CMD_DATA: raise SFTPError('Expected data') data = msg.get_string() - offset, length = self._prefetch_reads.pop(0) - self._prefetch_data[offset] = data - if len(self._prefetch_reads) == 0: - self._prefetch_done = True + with self._prefetch_lock: + offset,length = self._prefetch_extents[num] + self._prefetch_data[offset] = data + del self._prefetch_extents[num] + if len(self._prefetch_extents) == 0: + self._prefetch_done = True def _check_exception(self): "if there's a saved exception, raise & clear it" |