diff options
-rw-r--r-- | paramiko/sftp_client.py | 36 | ||||
-rw-r--r-- | paramiko/sftp_file.py | 36 | ||||
-rw-r--r-- | sites/www/changelog.rst | 6 |
3 files changed, 66 insertions, 12 deletions
diff --git a/paramiko/sftp_client.py b/paramiko/sftp_client.py index 31ac1292..a0d38bb9 100644 --- a/paramiko/sftp_client.py +++ b/paramiko/sftp_client.py @@ -758,7 +758,14 @@ class SFTPClient(BaseSFTP, ClosingContextManager): with open(localpath, "rb") as fl: return self.putfo(fl, remotepath, file_size, callback, confirm) - def getfo(self, remotepath, fl, callback=None, prefetch=True): + def getfo( + self, + remotepath, + fl, + callback=None, + prefetch=True, + max_concurrent_prefetch_requests=None, + ): """ Copy a remote file (``remotepath``) from the SFTP server and write to an open file or file-like object, ``fl``. Any exception raised by @@ -773,6 +780,10 @@ class SFTPClient(BaseSFTP, ClosingContextManager): the bytes transferred so far and the total bytes to be transferred :param bool prefetch: controls whether prefetching is performed (default: True) + :param int max_concurrent_prefetch_requests: + The maximum number of concurrent read requests to prefetch. + When this is ``None`` (the default), do not limit the number of + concurrent prefetch requests. :return: the `number <int>` of bytes written to the opened file object .. versionadded:: 1.10 @@ -782,12 +793,19 @@ class SFTPClient(BaseSFTP, ClosingContextManager): file_size = self.stat(remotepath).st_size with self.open(remotepath, "rb") as fr: if prefetch: - fr.prefetch(file_size) + fr.prefetch(file_size, max_concurrent_prefetch_requests) return self._transfer_with_callback( reader=fr, writer=fl, file_size=file_size, callback=callback ) - def get(self, remotepath, localpath, callback=None, prefetch=True): + def get( + self, + remotepath, + localpath, + callback=None, + prefetch=True, + max_concurrent_prefetch_requests=None, + ): """ Copy a remote file (``remotepath``) from the SFTP server to the local host as ``localpath``. Any exception raised by operations will be @@ -800,6 +818,10 @@ class SFTPClient(BaseSFTP, ClosingContextManager): the bytes transferred so far and the total bytes to be transferred :param bool prefetch: controls whether prefetching is performed (default: True) + :param int max_concurrent_prefetch_requests: + The maximum number of concurrent read requests to prefetch. + When this is ``None`` (the default), do not limit the number of + concurrent prefetch requests. .. versionadded:: 1.4 .. versionchanged:: 1.7.4 @@ -808,7 +830,13 @@ class SFTPClient(BaseSFTP, ClosingContextManager): Added the ``prefetch`` keyword argument. """ with open(localpath, "wb") as fl: - size = self.getfo(remotepath, fl, callback, prefetch) + size = self.getfo( + remotepath, + fl, + callback, + prefetch, + max_concurrent_prefetch_requests, + ) s = os.stat(localpath) if s.st_size != size: raise IOError( diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index 9a0a6b34..12861df7 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -26,7 +26,7 @@ from collections import deque import socket import threading import time -from paramiko.common import DEBUG +from paramiko.common import DEBUG, io_sleep from paramiko.file import BufferedFile from paramiko.util import u @@ -435,7 +435,7 @@ class SFTPFile(BufferedFile): """ self.pipelined = pipelined - def prefetch(self, file_size=None): + def prefetch(self, file_size=None, max_concurrent_requests=None): """ Pre-fetch the remaining contents of this file in anticipation of future `.read` calls. If reading the entire file, pre-fetching can @@ -454,6 +454,10 @@ class SFTPFile(BufferedFile): <https://github.com/paramiko/paramiko/pull/562>`_); as a workaround, one may call `stat` explicitly and pass its value in via this parameter. + :param int max_concurrent_requests: + The maximum number of concurrent read requests to prefetch. + When this is ``None`` (the default), do not limit the number of + concurrent prefetch requests. .. versionadded:: 1.5.1 .. versionchanged:: 1.16.0 @@ -473,9 +477,9 @@ class SFTPFile(BufferedFile): chunks.append((n, chunk)) n += chunk if len(chunks) > 0: - self._start_prefetch(chunks) + self._start_prefetch(chunks, max_concurrent_requests) - def readv(self, chunks): + def readv(self, chunks, max_concurrent_prefetch_requests=None): """ Read a set of blocks from the file by (offset, length). This is more efficient than doing a series of `.seek` and `.read` calls, since the @@ -485,6 +489,10 @@ class SFTPFile(BufferedFile): :param chunks: a list of ``(offset, length)`` tuples indicating which sections of the file to read + :param int max_concurrent_prefetch_requests: + The maximum number of concurrent read requests to prefetch. + When this is ``None`` (the default), do not limit the number of + concurrent prefetch requests. :return: a list of blocks read, in the same order as in ``chunks`` .. versionadded:: 1.5.4 @@ -508,7 +516,7 @@ class SFTPFile(BufferedFile): offset += chunk_size size -= chunk_size - self._start_prefetch(read_chunks) + self._start_prefetch(read_chunks, max_concurrent_prefetch_requests) # now we can just devolve to a bunch of read()s :) for x in chunks: self.seek(x[0]) @@ -522,18 +530,30 @@ class SFTPFile(BufferedFile): except: return 0 - def _start_prefetch(self, chunks): + def _start_prefetch(self, chunks, max_concurrent_requests=None): self._prefetching = True self._prefetch_done = False - t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) + t = threading.Thread( + target=self._prefetch_thread, + args=(chunks, max_concurrent_requests), + ) t.daemon = True t.start() - def _prefetch_thread(self, chunks): + def _prefetch_thread(self, chunks, max_concurrent_requests): # do these read requests in a temporary thread because there may be # a lot of them, so it may block. for offset, length in chunks: + # Limit the number of concurrent requests in a busy-loop + if max_concurrent_requests is not None: + while True: + with self._prefetch_lock: + pf_len = len(self._prefetch_extents) + if pf_len < max_concurrent_requests: + break + time.sleep(io_sleep) + num = self.sftp._async_request( self, CMD_READ, self.handle, int64(offset), int(length) ) diff --git a/sites/www/changelog.rst b/sites/www/changelog.rst index c18890eb..9b903258 100644 --- a/sites/www/changelog.rst +++ b/sites/www/changelog.rst @@ -2,6 +2,12 @@ Changelog ========= +- :feature:`2058` (solves :issue:`1587` and possibly others) Add an explicit + ``max_concurrent_prefetch_requests`` argument to `paramiko.client.SSHClient.get` + and `paramiko.client.SSHClient.getfo`, allowing users to limit the number + of concurrent requests used during prefetch. Patch by ``@kschoelhorn``, with + a test by ``@bwinston-sdp``. + - :release:`3.2.0 <2023-05-25>` - :bug:`- major` Fixed a very sneaky bug found at the apparently rarely-traveled intersection of ``RSA-SHA2`` keys, certificates, SSH agents, |