diff options
Diffstat (limited to 'paramiko/sftp_file.py')
-rw-r--r-- | paramiko/sftp_file.py | 24 |
1 files changed, 15 insertions, 9 deletions
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index 4ec936d0..a39b1f47 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -20,6 +20,8 @@ L{SFTPFile} """ +from __future__ import with_statement + from binascii import hexlify from collections import deque import socket @@ -53,7 +55,8 @@ class SFTPFile (BufferedFile): self._prefetching = False self._prefetch_done = False self._prefetch_data = {} - self._prefetch_reads = [] + self._prefetch_extents = {} + self._prefetch_lock = threading.Lock() self._saved_exception = None self._reqs = deque() @@ -91,7 +94,7 @@ class SFTPFile (BufferedFile): pass def _data_in_prefetch_requests(self, offset, size): - k = [i for i in self._prefetch_reads if i[0] <= offset] + k = [x for x in self._prefetch_extents.values() if x[0] <= offset] if len(k) == 0: return False k.sort(lambda x, y: cmp(x[0], y[0])) @@ -447,7 +450,6 @@ class SFTPFile (BufferedFile): def _start_prefetch(self, chunks): self._prefetching = True self._prefetch_done = False - self._prefetch_reads.extend(chunks) t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) t.setDaemon(True) @@ -457,9 +459,11 @@ class SFTPFile (BufferedFile): # do these read requests in a temporary thread because there may be # a lot of them, so it may block. for offset, length in chunks: - self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) + with self._prefetch_lock: + num = self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) + self._prefetch_extents[num] = (offset, length) - def _async_response(self, t, msg): + def _async_response(self, t, msg, num): if t == CMD_STATUS: # save exception and re-raise it on next file operation try: @@ -470,10 +474,12 @@ class SFTPFile (BufferedFile): if t != CMD_DATA: raise SFTPError('Expected data') data = msg.get_string() - offset, length = self._prefetch_reads.pop(0) - self._prefetch_data[offset] = data - if len(self._prefetch_reads) == 0: - self._prefetch_done = True + with self._prefetch_lock: + offset, length = self._prefetch_extents[num] + self._prefetch_data[offset] = data + del self._prefetch_extents[num] + if len(self._prefetch_extents) == 0: + self._prefetch_done = True def _check_exception(self): "if there's a saved exception, raise & clear it" |