summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorKlemens Schölhorn <klemens.schoelhorn@advantest.com>2022-05-17 18:06:48 +0200
committerKlemens Schölhorn <klemens.schoelhorn@advantest.com>2023-06-02 12:02:15 +0200
commit9fddb6e99fb981cd31eb8187dd2f47746b3625d8 (patch)
treedc2f13662bb8b8b15c7da4a42984a40c36bbbfa8
parent63fd01aa39b0d7cba587380a61b38590004c755d (diff)
Allow limiting the number of concurrent prefetch requests
Currently prefetch will send all requests for all chunks for the file in one shot. With the default chunk size of 32k, this can result in many thousand outstanding requests for large files. Some servers like Serv-U 15.2.3.742 seem to be dropping requests after a certain number, which results in the file download hanging indefinitely (or until the server closes the connection). Fix this issue by letting the user specify a limit for the number of concurrent requests. This is similar to openssh's sftp, which limits the number of concurrent requests to 64 by default.
-rw-r--r--paramiko/sftp_client.py36
-rw-r--r--paramiko/sftp_file.py36
-rw-r--r--sites/www/changelog.rst6
3 files changed, 66 insertions, 12 deletions
diff --git a/paramiko/sftp_client.py b/paramiko/sftp_client.py
index 31ac1292..a0d38bb9 100644
--- a/paramiko/sftp_client.py
+++ b/paramiko/sftp_client.py
@@ -758,7 +758,14 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
with open(localpath, "rb") as fl:
return self.putfo(fl, remotepath, file_size, callback, confirm)
- def getfo(self, remotepath, fl, callback=None, prefetch=True):
+ def getfo(
+ self,
+ remotepath,
+ fl,
+ callback=None,
+ prefetch=True,
+ max_concurrent_prefetch_requests=None,
+ ):
"""
Copy a remote file (``remotepath``) from the SFTP server and write to
an open file or file-like object, ``fl``. Any exception raised by
@@ -773,6 +780,10 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
the bytes transferred so far and the total bytes to be transferred
:param bool prefetch:
controls whether prefetching is performed (default: True)
+ :param int max_concurrent_prefetch_requests:
+ The maximum number of concurrent read requests to prefetch.
+ When this is ``None`` (the default), do not limit the number of
+ concurrent prefetch requests.
:return: the `number <int>` of bytes written to the opened file object
.. versionadded:: 1.10
@@ -782,12 +793,19 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
file_size = self.stat(remotepath).st_size
with self.open(remotepath, "rb") as fr:
if prefetch:
- fr.prefetch(file_size)
+ fr.prefetch(file_size, max_concurrent_prefetch_requests)
return self._transfer_with_callback(
reader=fr, writer=fl, file_size=file_size, callback=callback
)
- def get(self, remotepath, localpath, callback=None, prefetch=True):
+ def get(
+ self,
+ remotepath,
+ localpath,
+ callback=None,
+ prefetch=True,
+ max_concurrent_prefetch_requests=None,
+ ):
"""
Copy a remote file (``remotepath``) from the SFTP server to the local
host as ``localpath``. Any exception raised by operations will be
@@ -800,6 +818,10 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
the bytes transferred so far and the total bytes to be transferred
:param bool prefetch:
controls whether prefetching is performed (default: True)
+ :param int max_concurrent_prefetch_requests:
+ The maximum number of concurrent read requests to prefetch.
+ When this is ``None`` (the default), do not limit the number of
+ concurrent prefetch requests.
.. versionadded:: 1.4
.. versionchanged:: 1.7.4
@@ -808,7 +830,13 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
Added the ``prefetch`` keyword argument.
"""
with open(localpath, "wb") as fl:
- size = self.getfo(remotepath, fl, callback, prefetch)
+ size = self.getfo(
+ remotepath,
+ fl,
+ callback,
+ prefetch,
+ max_concurrent_prefetch_requests,
+ )
s = os.stat(localpath)
if s.st_size != size:
raise IOError(
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py
index 9a0a6b34..12861df7 100644
--- a/paramiko/sftp_file.py
+++ b/paramiko/sftp_file.py
@@ -26,7 +26,7 @@ from collections import deque
import socket
import threading
import time
-from paramiko.common import DEBUG
+from paramiko.common import DEBUG, io_sleep
from paramiko.file import BufferedFile
from paramiko.util import u
@@ -435,7 +435,7 @@ class SFTPFile(BufferedFile):
"""
self.pipelined = pipelined
- def prefetch(self, file_size=None):
+ def prefetch(self, file_size=None, max_concurrent_requests=None):
"""
Pre-fetch the remaining contents of this file in anticipation of future
`.read` calls. If reading the entire file, pre-fetching can
@@ -454,6 +454,10 @@ class SFTPFile(BufferedFile):
<https://github.com/paramiko/paramiko/pull/562>`_); as a
workaround, one may call `stat` explicitly and pass its value in
via this parameter.
+ :param int max_concurrent_requests:
+ The maximum number of concurrent read requests to prefetch.
+ When this is ``None`` (the default), do not limit the number of
+ concurrent prefetch requests.
.. versionadded:: 1.5.1
.. versionchanged:: 1.16.0
@@ -473,9 +477,9 @@ class SFTPFile(BufferedFile):
chunks.append((n, chunk))
n += chunk
if len(chunks) > 0:
- self._start_prefetch(chunks)
+ self._start_prefetch(chunks, max_concurrent_requests)
- def readv(self, chunks):
+ def readv(self, chunks, max_concurrent_prefetch_requests=None):
"""
Read a set of blocks from the file by (offset, length). This is more
efficient than doing a series of `.seek` and `.read` calls, since the
@@ -485,6 +489,10 @@ class SFTPFile(BufferedFile):
:param chunks:
a list of ``(offset, length)`` tuples indicating which sections of
the file to read
+ :param int max_concurrent_prefetch_requests:
+ The maximum number of concurrent read requests to prefetch.
+ When this is ``None`` (the default), do not limit the number of
+ concurrent prefetch requests.
:return: a list of blocks read, in the same order as in ``chunks``
.. versionadded:: 1.5.4
@@ -508,7 +516,7 @@ class SFTPFile(BufferedFile):
offset += chunk_size
size -= chunk_size
- self._start_prefetch(read_chunks)
+ self._start_prefetch(read_chunks, max_concurrent_prefetch_requests)
# now we can just devolve to a bunch of read()s :)
for x in chunks:
self.seek(x[0])
@@ -522,18 +530,30 @@ class SFTPFile(BufferedFile):
except:
return 0
- def _start_prefetch(self, chunks):
+ def _start_prefetch(self, chunks, max_concurrent_requests=None):
self._prefetching = True
self._prefetch_done = False
- t = threading.Thread(target=self._prefetch_thread, args=(chunks,))
+ t = threading.Thread(
+ target=self._prefetch_thread,
+ args=(chunks, max_concurrent_requests),
+ )
t.daemon = True
t.start()
- def _prefetch_thread(self, chunks):
+ def _prefetch_thread(self, chunks, max_concurrent_requests):
# do these read requests in a temporary thread because there may be
# a lot of them, so it may block.
for offset, length in chunks:
+ # Limit the number of concurrent requests in a busy-loop
+ if max_concurrent_requests is not None:
+ while True:
+ with self._prefetch_lock:
+ pf_len = len(self._prefetch_extents)
+ if pf_len < max_concurrent_requests:
+ break
+ time.sleep(io_sleep)
+
num = self.sftp._async_request(
self, CMD_READ, self.handle, int64(offset), int(length)
)
diff --git a/sites/www/changelog.rst b/sites/www/changelog.rst
index c18890eb..9b903258 100644
--- a/sites/www/changelog.rst
+++ b/sites/www/changelog.rst
@@ -2,6 +2,12 @@
Changelog
=========
+- :feature:`2058` (solves :issue:`1587` and possibly others) Add an explicit
+ ``max_concurrent_prefetch_requests`` argument to `paramiko.client.SSHClient.get`
+ and `paramiko.client.SSHClient.getfo`, allowing users to limit the number
+ of concurrent requests used during prefetch. Patch by ``@kschoelhorn``, with
+ a test by ``@bwinston-sdp``.
+
- :release:`3.2.0 <2023-05-25>`
- :bug:`- major` Fixed a very sneaky bug found at the apparently
rarely-traveled intersection of ``RSA-SHA2`` keys, certificates, SSH agents,