diff options
author | Richard Kettlewell <rjk@greenend.org.uk> | 2011-08-29 15:50:35 +0100 |
---|---|---|
committer | Richard Kettlewell <rjk@greenend.org.uk> | 2011-08-29 15:50:35 +0100 |
commit | 974294ad7d083c49bffe9189cac0d5b08984b1ad (patch) | |
tree | 8fce1198c4792f426f2016569124c3c91d366552 | |
parent | 7bcbc2419812f87cf06391da61684df205f131d3 (diff) |
Fix issue 34 (SFTPFile prefetch assumes response order matches requests)
SFTPFile._async_response gets a new 'num' parameter giving the request
number. This can be matched up with the return value of
SFTPClient._async_request() to retrieve data specific to that request.
The prefetch queue SFTPFile._prefetch_reads is replaced with the dict
_prefetch_extents, which maps request numbers to (offset,length)
tuples.
A lock is used to exclude the case where a response arrives in
_async_response before _prefetch_thread has updated it.
-rw-r--r-- | paramiko/sftp_client.py | 2 | ||||
-rw-r--r-- | paramiko/sftp_file.py | 22 |
2 files changed, 14 insertions, 10 deletions
diff --git a/paramiko/sftp_client.py b/paramiko/sftp_client.py index 79a77614..189caa47 100644 --- a/paramiko/sftp_client.py +++ b/paramiko/sftp_client.py @@ -682,7 +682,7 @@ class SFTPClient (BaseSFTP): self._convert_status(msg) return t, msg if fileobj is not type(None): - fileobj._async_response(t, msg) + fileobj._async_response(t, msg, num) if waitfor is None: # just doing a single check break diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index 8c5c7aca..2d4d4317 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -49,7 +49,8 @@ class SFTPFile (BufferedFile): self._prefetching = False self._prefetch_done = False self._prefetch_data = {} - self._prefetch_reads = [] + self._prefetch_extents = {} + self._prefetch_lock = threading.Lock() self._saved_exception = None def __del__(self): @@ -86,7 +87,7 @@ class SFTPFile (BufferedFile): pass def _data_in_prefetch_requests(self, offset, size): - k = [i for i in self._prefetch_reads if i[0] <= offset] + k = [self._prefetch_extents[i] for i in self._prefetch_extents if self._prefetch_extents[i][0] <= offset] if len(k) == 0: return False k.sort(lambda x, y: cmp(x[0], y[0])) @@ -440,7 +441,6 @@ class SFTPFile (BufferedFile): def _start_prefetch(self, chunks): self._prefetching = True self._prefetch_done = False - self._prefetch_reads.extend(chunks) t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) t.setDaemon(True) @@ -450,9 +450,11 @@ class SFTPFile (BufferedFile): # do these read requests in a temporary thread because there may be # a lot of them, so it may block. for offset, length in chunks: - self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) + with self._prefetch_lock: + num = self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) + self._prefetch_extents[num] = (offset, length) - def _async_response(self, t, msg): + def _async_response(self, t, msg, num): if t == CMD_STATUS: # save exception and re-raise it on next file operation try: @@ -463,10 +465,12 @@ class SFTPFile (BufferedFile): if t != CMD_DATA: raise SFTPError('Expected data') data = msg.get_string() - offset, length = self._prefetch_reads.pop(0) - self._prefetch_data[offset] = data - if len(self._prefetch_reads) == 0: - self._prefetch_done = True + with self._prefetch_lock: + offset,length = self._prefetch_extents[num] + self._prefetch_data[offset] = data + del self._prefetch_extents[num] + if len(self._prefetch_extents) == 0: + self._prefetch_done = True def _check_exception(self): "if there's a saved exception, raise & clear it" |