diff options
author | Klemens Schölhorn <klemens.schoelhorn@advantest.com> | 2022-05-17 18:06:48 +0200 |
---|---|---|
committer | Klemens Schölhorn <klemens.schoelhorn@advantest.com> | 2023-06-02 12:02:15 +0200 |
commit | 9fddb6e99fb981cd31eb8187dd2f47746b3625d8 (patch) | |
tree | dc2f13662bb8b8b15c7da4a42984a40c36bbbfa8 | |
parent | 63fd01aa39b0d7cba587380a61b38590004c755d (diff) |
Allow limiting the number of concurrent prefetch requests
Currently prefetch will send all requests for all chunks for the file in
one shot. With the default chunk size of 32k, this can result in many
thousand outstanding requests for large files. Some servers like Serv-U
15.2.3.742 seem to be dropping requests after a certain number, which
results in the file download hanging indefinitely (or until the server
closes the connection).
Fix this issue by letting the user specify a limit for the number of
concurrent requests. This is similar to openssh's sftp, which limits
the number of concurrent requests to 64 by default.
-rw-r--r-- | paramiko/sftp_client.py | 36 | ||||
-rw-r--r-- | paramiko/sftp_file.py | 36 | ||||
-rw-r--r-- | sites/www/changelog.rst | 6 |
3 files changed, 66 insertions, 12 deletions
diff --git a/paramiko/sftp_client.py b/paramiko/sftp_client.py index 31ac1292..a0d38bb9 100644 --- a/paramiko/sftp_client.py +++ b/paramiko/sftp_client.py @@ -758,7 +758,14 @@ class SFTPClient(BaseSFTP, ClosingContextManager): with open(localpath, "rb") as fl: return self.putfo(fl, remotepath, file_size, callback, confirm) - def getfo(self, remotepath, fl, callback=None, prefetch=True): + def getfo( + self, + remotepath, + fl, + callback=None, + prefetch=True, + max_concurrent_prefetch_requests=None, + ): """ Copy a remote file (``remotepath``) from the SFTP server and write to an open file or file-like object, ``fl``. Any exception raised by @@ -773,6 +780,10 @@ class SFTPClient(BaseSFTP, ClosingContextManager): the bytes transferred so far and the total bytes to be transferred :param bool prefetch: controls whether prefetching is performed (default: True) + :param int max_concurrent_prefetch_requests: + The maximum number of concurrent read requests to prefetch. + When this is ``None`` (the default), do not limit the number of + concurrent prefetch requests. :return: the `number <int>` of bytes written to the opened file object .. versionadded:: 1.10 @@ -782,12 +793,19 @@ class SFTPClient(BaseSFTP, ClosingContextManager): file_size = self.stat(remotepath).st_size with self.open(remotepath, "rb") as fr: if prefetch: - fr.prefetch(file_size) + fr.prefetch(file_size, max_concurrent_prefetch_requests) return self._transfer_with_callback( reader=fr, writer=fl, file_size=file_size, callback=callback ) - def get(self, remotepath, localpath, callback=None, prefetch=True): + def get( + self, + remotepath, + localpath, + callback=None, + prefetch=True, + max_concurrent_prefetch_requests=None, + ): """ Copy a remote file (``remotepath``) from the SFTP server to the local host as ``localpath``. Any exception raised by operations will be @@ -800,6 +818,10 @@ class SFTPClient(BaseSFTP, ClosingContextManager): the bytes transferred so far and the total bytes to be transferred :param bool prefetch: controls whether prefetching is performed (default: True) + :param int max_concurrent_prefetch_requests: + The maximum number of concurrent read requests to prefetch. + When this is ``None`` (the default), do not limit the number of + concurrent prefetch requests. .. versionadded:: 1.4 .. versionchanged:: 1.7.4 @@ -808,7 +830,13 @@ class SFTPClient(BaseSFTP, ClosingContextManager): Added the ``prefetch`` keyword argument. """ with open(localpath, "wb") as fl: - size = self.getfo(remotepath, fl, callback, prefetch) + size = self.getfo( + remotepath, + fl, + callback, + prefetch, + max_concurrent_prefetch_requests, + ) s = os.stat(localpath) if s.st_size != size: raise IOError( diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index 9a0a6b34..12861df7 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -26,7 +26,7 @@ from collections import deque import socket import threading import time -from paramiko.common import DEBUG +from paramiko.common import DEBUG, io_sleep from paramiko.file import BufferedFile from paramiko.util import u @@ -435,7 +435,7 @@ class SFTPFile(BufferedFile): """ self.pipelined = pipelined - def prefetch(self, file_size=None): + def prefetch(self, file_size=None, max_concurrent_requests=None): """ Pre-fetch the remaining contents of this file in anticipation of future `.read` calls. If reading the entire file, pre-fetching can @@ -454,6 +454,10 @@ class SFTPFile(BufferedFile): <https://github.com/paramiko/paramiko/pull/562>`_); as a workaround, one may call `stat` explicitly and pass its value in via this parameter. + :param int max_concurrent_requests: + The maximum number of concurrent read requests to prefetch. + When this is ``None`` (the default), do not limit the number of + concurrent prefetch requests. .. versionadded:: 1.5.1 .. versionchanged:: 1.16.0 @@ -473,9 +477,9 @@ class SFTPFile(BufferedFile): chunks.append((n, chunk)) n += chunk if len(chunks) > 0: - self._start_prefetch(chunks) + self._start_prefetch(chunks, max_concurrent_requests) - def readv(self, chunks): + def readv(self, chunks, max_concurrent_prefetch_requests=None): """ Read a set of blocks from the file by (offset, length). This is more efficient than doing a series of `.seek` and `.read` calls, since the @@ -485,6 +489,10 @@ class SFTPFile(BufferedFile): :param chunks: a list of ``(offset, length)`` tuples indicating which sections of the file to read + :param int max_concurrent_prefetch_requests: + The maximum number of concurrent read requests to prefetch. + When this is ``None`` (the default), do not limit the number of + concurrent prefetch requests. :return: a list of blocks read, in the same order as in ``chunks`` .. versionadded:: 1.5.4 @@ -508,7 +516,7 @@ class SFTPFile(BufferedFile): offset += chunk_size size -= chunk_size - self._start_prefetch(read_chunks) + self._start_prefetch(read_chunks, max_concurrent_prefetch_requests) # now we can just devolve to a bunch of read()s :) for x in chunks: self.seek(x[0]) @@ -522,18 +530,30 @@ class SFTPFile(BufferedFile): except: return 0 - def _start_prefetch(self, chunks): + def _start_prefetch(self, chunks, max_concurrent_requests=None): self._prefetching = True self._prefetch_done = False - t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) + t = threading.Thread( + target=self._prefetch_thread, + args=(chunks, max_concurrent_requests), + ) t.daemon = True t.start() - def _prefetch_thread(self, chunks): + def _prefetch_thread(self, chunks, max_concurrent_requests): # do these read requests in a temporary thread because there may be # a lot of them, so it may block. for offset, length in chunks: + # Limit the number of concurrent requests in a busy-loop + if max_concurrent_requests is not None: + while True: + with self._prefetch_lock: + pf_len = len(self._prefetch_extents) + if pf_len < max_concurrent_requests: + break + time.sleep(io_sleep) + num = self.sftp._async_request( self, CMD_READ, self.handle, int64(offset), int(length) ) diff --git a/sites/www/changelog.rst b/sites/www/changelog.rst index c18890eb..9b903258 100644 --- a/sites/www/changelog.rst +++ b/sites/www/changelog.rst @@ -2,6 +2,12 @@ Changelog ========= +- :feature:`2058` (solves :issue:`1587` and possibly others) Add an explicit + ``max_concurrent_prefetch_requests`` argument to `paramiko.client.SSHClient.get` + and `paramiko.client.SSHClient.getfo`, allowing users to limit the number + of concurrent requests used during prefetch. Patch by ``@kschoelhorn``, with + a test by ``@bwinston-sdp``. + - :release:`3.2.0 <2023-05-25>` - :bug:`- major` Fixed a very sneaky bug found at the apparently rarely-traveled intersection of ``RSA-SHA2`` keys, certificates, SSH agents, |