diff options
Diffstat (limited to 'paramiko/sftp_file.py')
-rw-r--r-- | paramiko/sftp_file.py | 64 |
1 files changed, 40 insertions, 24 deletions
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index 05c10458..3673e865 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -46,7 +46,6 @@ class SFTPFile (BufferedFile): self.pipelined = False self._prefetching = False self._prefetch_done = False - self._prefetch_so_far = 0 self._prefetch_data = {} self._prefetch_reads = [] self._saved_exception = None @@ -84,37 +83,45 @@ class SFTPFile (BufferedFile): # may have outlived the Transport connection pass + def _data_in_prefetch_buffers(self, offset): + """ + if a block of data is present in the prefetch buffers, at the given + offset, return the offset of the relevant prefetch buffer. otherwise, + return None. this guarantees nothing about the number of bytes + collected in the prefetch buffer so far. + """ + k = [i for i in self._prefetch_data.keys() if i <= offset] + if len(k) == 0: + return None + index = max(k) + buf_offset = offset - index + if buf_offset >= len(self._prefetch_data[index]): + # it's not here + return None + return index + def _read_prefetch(self, size): """ read data out of the prefetch buffer, if possible. if the data isn't in the buffer, return None. otherwise, behaves like a normal read. """ # while not closed, and haven't fetched past the current position, and haven't reached EOF... - while (self._prefetch_so_far <= self._realpos) and not self._closed: - if self._prefetch_done: - return None + while not self._prefetch_done and not self._closed: + offset = self._data_in_prefetch_buffers(self._realpos) + if offset is not None: + break self.sftp._read_response() self._check_exception() - k = self._prefetch_data.keys() - if len(k) == 0: + if offset is None: self._prefetching = False - return '' - - # find largest offset < realpos - pos_list = [i for i in k if i <= self._realpos] - if len(pos_list) == 0: return None - index = max(pos_list) - prefetch = self._prefetch_data[index] - del self._prefetch_data[index] + prefetch = self._prefetch_data[offset] + del self._prefetch_data[offset] - buf_offset = self._realpos - index + buf_offset = self._realpos - offset if buf_offset > 0: - self._prefetch_data[index] = prefetch[:buf_offset] + self._prefetch_data[offset] = prefetch[:buf_offset] prefetch = prefetch[buf_offset:] - if buf_offset >= len(prefetch): - # it's not here. - return None if size < len(prefetch): self._prefetch_data[self._realpos + size] = prefetch[size:] prefetch = prefetch[:size] @@ -384,10 +391,22 @@ class SFTPFile (BufferedFile): # put the offsets in order, since we depend on that for determining # when the reads have finished. self.sftp._log(DEBUG, 'readv(%s, %r)' % (util.hexify(self.handle), chunks)) - # FIXME: if prefetch() was already called (not readv), don't prefetch. ordered_chunks = list(chunks) ordered_chunks.sort(lambda x, y: cmp(x[0], y[0])) - self._start_prefetch(ordered_chunks) + + # break up anything larger than the max read size + if len([size for offset, size in ordered_chunks if size > self.MAX_REQUEST_SIZE]) > 0: + read_chunks = [] + for offset, size in ordered_chunks: + while size > 0: + chunk_size = min(size, self.MAX_REQUEST_SIZE) + read_chunks.append((offset, chunk_size)) + offset += chunk_size + size -= chunk_size + else: + read_chunks = ordered_chunks + + self._start_prefetch(read_chunks) # now we can just devolve to a bunch of read()s :) for x in chunks: self.seek(x[0]) @@ -406,8 +425,6 @@ class SFTPFile (BufferedFile): def _start_prefetch(self, chunks): self._prefetching = True self._prefetch_done = False - self._prefetch_so_far = chunks[0][0] - self._prefetch_data = {} self._prefetch_reads.extend(chunks) t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) @@ -434,7 +451,6 @@ class SFTPFile (BufferedFile): offset, length = self._prefetch_reads.pop(0) assert length == len(data) self._prefetch_data[offset] = data - self._prefetch_so_far = offset + length if len(self._prefetch_reads) == 0: self._prefetch_done = True |