diff options
Diffstat (limited to 'paramiko/sftp_file.py')
-rw-r--r-- | paramiko/sftp_file.py | 40 |
1 files changed, 32 insertions, 8 deletions
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index 9a0a6b34..c74695e0 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -26,7 +26,7 @@ from collections import deque import socket import threading import time -from paramiko.common import DEBUG +from paramiko.common import DEBUG, io_sleep from paramiko.file import BufferedFile from paramiko.util import u @@ -435,7 +435,7 @@ class SFTPFile(BufferedFile): """ self.pipelined = pipelined - def prefetch(self, file_size=None): + def prefetch(self, file_size=None, max_concurrent_requests=None): """ Pre-fetch the remaining contents of this file in anticipation of future `.read` calls. If reading the entire file, pre-fetching can @@ -454,6 +454,10 @@ class SFTPFile(BufferedFile): <https://github.com/paramiko/paramiko/pull/562>`_); as a workaround, one may call `stat` explicitly and pass its value in via this parameter. + :param int max_concurrent_requests: + The maximum number of concurrent read requests to prefetch. See + `.SFTPClient.get` (its ``max_concurrent_prefetch_requests`` param) + for details. .. versionadded:: 1.5.1 .. versionchanged:: 1.16.0 @@ -461,6 +465,8 @@ class SFTPFile(BufferedFile): .. versionchanged:: 1.16.1 The ``file_size`` parameter was made optional for backwards compatibility. + .. versionchanged:: 3.3 + Added ``max_concurrent_requests``. """ if file_size is None: file_size = self.stat().st_size @@ -473,9 +479,9 @@ class SFTPFile(BufferedFile): chunks.append((n, chunk)) n += chunk if len(chunks) > 0: - self._start_prefetch(chunks) + self._start_prefetch(chunks, max_concurrent_requests) - def readv(self, chunks): + def readv(self, chunks, max_concurrent_prefetch_requests=None): """ Read a set of blocks from the file by (offset, length). This is more efficient than doing a series of `.seek` and `.read` calls, since the @@ -485,9 +491,15 @@ class SFTPFile(BufferedFile): :param chunks: a list of ``(offset, length)`` tuples indicating which sections of the file to read + :param int max_concurrent_prefetch_requests: + The maximum number of concurrent read requests to prefetch. See + `.SFTPClient.get` (its ``max_concurrent_prefetch_requests`` param) + for details. :return: a list of blocks read, in the same order as in ``chunks`` .. versionadded:: 1.5.4 + .. versionchanged:: 3.3 + Added ``max_concurrent_prefetch_requests``. """ self.sftp._log( DEBUG, "readv({}, {!r})".format(hexlify(self.handle), chunks) @@ -508,7 +520,7 @@ class SFTPFile(BufferedFile): offset += chunk_size size -= chunk_size - self._start_prefetch(read_chunks) + self._start_prefetch(read_chunks, max_concurrent_prefetch_requests) # now we can just devolve to a bunch of read()s :) for x in chunks: self.seek(x[0]) @@ -522,18 +534,30 @@ class SFTPFile(BufferedFile): except: return 0 - def _start_prefetch(self, chunks): + def _start_prefetch(self, chunks, max_concurrent_requests=None): self._prefetching = True self._prefetch_done = False - t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) + t = threading.Thread( + target=self._prefetch_thread, + args=(chunks, max_concurrent_requests), + ) t.daemon = True t.start() - def _prefetch_thread(self, chunks): + def _prefetch_thread(self, chunks, max_concurrent_requests): # do these read requests in a temporary thread because there may be # a lot of them, so it may block. for offset, length in chunks: + # Limit the number of concurrent requests in a busy-loop + if max_concurrent_requests is not None: + while True: + with self._prefetch_lock: + pf_len = len(self._prefetch_extents) + if pf_len < max_concurrent_requests: + break + time.sleep(io_sleep) + num = self.sftp._async_request( self, CMD_READ, self.handle, int64(offset), int(length) ) |