summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--paramiko/sftp_client.py36
-rw-r--r--paramiko/sftp_file.py36
-rw-r--r--sites/www/changelog.rst6
3 files changed, 66 insertions, 12 deletions
diff --git a/paramiko/sftp_client.py b/paramiko/sftp_client.py
index 31ac1292..a0d38bb9 100644
--- a/paramiko/sftp_client.py
+++ b/paramiko/sftp_client.py
@@ -758,7 +758,14 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
with open(localpath, "rb") as fl:
return self.putfo(fl, remotepath, file_size, callback, confirm)
- def getfo(self, remotepath, fl, callback=None, prefetch=True):
+ def getfo(
+ self,
+ remotepath,
+ fl,
+ callback=None,
+ prefetch=True,
+ max_concurrent_prefetch_requests=None,
+ ):
"""
Copy a remote file (``remotepath``) from the SFTP server and write to
an open file or file-like object, ``fl``. Any exception raised by
@@ -773,6 +780,10 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
the bytes transferred so far and the total bytes to be transferred
:param bool prefetch:
controls whether prefetching is performed (default: True)
+ :param int max_concurrent_prefetch_requests:
+ The maximum number of concurrent read requests to prefetch.
+ When this is ``None`` (the default), do not limit the number of
+ concurrent prefetch requests.
:return: the `number <int>` of bytes written to the opened file object
.. versionadded:: 1.10
@@ -782,12 +793,19 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
file_size = self.stat(remotepath).st_size
with self.open(remotepath, "rb") as fr:
if prefetch:
- fr.prefetch(file_size)
+ fr.prefetch(file_size, max_concurrent_prefetch_requests)
return self._transfer_with_callback(
reader=fr, writer=fl, file_size=file_size, callback=callback
)
- def get(self, remotepath, localpath, callback=None, prefetch=True):
+ def get(
+ self,
+ remotepath,
+ localpath,
+ callback=None,
+ prefetch=True,
+ max_concurrent_prefetch_requests=None,
+ ):
"""
Copy a remote file (``remotepath``) from the SFTP server to the local
host as ``localpath``. Any exception raised by operations will be
@@ -800,6 +818,10 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
the bytes transferred so far and the total bytes to be transferred
:param bool prefetch:
controls whether prefetching is performed (default: True)
+ :param int max_concurrent_prefetch_requests:
+ The maximum number of concurrent read requests to prefetch.
+ When this is ``None`` (the default), do not limit the number of
+ concurrent prefetch requests.
.. versionadded:: 1.4
.. versionchanged:: 1.7.4
@@ -808,7 +830,13 @@ class SFTPClient(BaseSFTP, ClosingContextManager):
Added the ``prefetch`` keyword argument.
"""
with open(localpath, "wb") as fl:
- size = self.getfo(remotepath, fl, callback, prefetch)
+ size = self.getfo(
+ remotepath,
+ fl,
+ callback,
+ prefetch,
+ max_concurrent_prefetch_requests,
+ )
s = os.stat(localpath)
if s.st_size != size:
raise IOError(
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py
index 9a0a6b34..12861df7 100644
--- a/paramiko/sftp_file.py
+++ b/paramiko/sftp_file.py
@@ -26,7 +26,7 @@ from collections import deque
import socket
import threading
import time
-from paramiko.common import DEBUG
+from paramiko.common import DEBUG, io_sleep
from paramiko.file import BufferedFile
from paramiko.util import u
@@ -435,7 +435,7 @@ class SFTPFile(BufferedFile):
"""
self.pipelined = pipelined
- def prefetch(self, file_size=None):
+ def prefetch(self, file_size=None, max_concurrent_requests=None):
"""
Pre-fetch the remaining contents of this file in anticipation of future
`.read` calls. If reading the entire file, pre-fetching can
@@ -454,6 +454,10 @@ class SFTPFile(BufferedFile):
<https://github.com/paramiko/paramiko/pull/562>`_); as a
workaround, one may call `stat` explicitly and pass its value in
via this parameter.
+ :param int max_concurrent_requests:
+ The maximum number of concurrent read requests to prefetch.
+ When this is ``None`` (the default), do not limit the number of
+ concurrent prefetch requests.
.. versionadded:: 1.5.1
.. versionchanged:: 1.16.0
@@ -473,9 +477,9 @@ class SFTPFile(BufferedFile):
chunks.append((n, chunk))
n += chunk
if len(chunks) > 0:
- self._start_prefetch(chunks)
+ self._start_prefetch(chunks, max_concurrent_requests)
- def readv(self, chunks):
+ def readv(self, chunks, max_concurrent_prefetch_requests=None):
"""
Read a set of blocks from the file by (offset, length). This is more
efficient than doing a series of `.seek` and `.read` calls, since the
@@ -485,6 +489,10 @@ class SFTPFile(BufferedFile):
:param chunks:
a list of ``(offset, length)`` tuples indicating which sections of
the file to read
+ :param int max_concurrent_prefetch_requests:
+ The maximum number of concurrent read requests to prefetch.
+ When this is ``None`` (the default), do not limit the number of
+ concurrent prefetch requests.
:return: a list of blocks read, in the same order as in ``chunks``
.. versionadded:: 1.5.4
@@ -508,7 +516,7 @@ class SFTPFile(BufferedFile):
offset += chunk_size
size -= chunk_size
- self._start_prefetch(read_chunks)
+ self._start_prefetch(read_chunks, max_concurrent_prefetch_requests)
# now we can just devolve to a bunch of read()s :)
for x in chunks:
self.seek(x[0])
@@ -522,18 +530,30 @@ class SFTPFile(BufferedFile):
except:
return 0
- def _start_prefetch(self, chunks):
+ def _start_prefetch(self, chunks, max_concurrent_requests=None):
self._prefetching = True
self._prefetch_done = False
- t = threading.Thread(target=self._prefetch_thread, args=(chunks,))
+ t = threading.Thread(
+ target=self._prefetch_thread,
+ args=(chunks, max_concurrent_requests),
+ )
t.daemon = True
t.start()
- def _prefetch_thread(self, chunks):
+ def _prefetch_thread(self, chunks, max_concurrent_requests):
# do these read requests in a temporary thread because there may be
# a lot of them, so it may block.
for offset, length in chunks:
+ # Limit the number of concurrent requests in a busy-loop
+ if max_concurrent_requests is not None:
+ while True:
+ with self._prefetch_lock:
+ pf_len = len(self._prefetch_extents)
+ if pf_len < max_concurrent_requests:
+ break
+ time.sleep(io_sleep)
+
num = self.sftp._async_request(
self, CMD_READ, self.handle, int64(offset), int(length)
)
diff --git a/sites/www/changelog.rst b/sites/www/changelog.rst
index c18890eb..9b903258 100644
--- a/sites/www/changelog.rst
+++ b/sites/www/changelog.rst
@@ -2,6 +2,12 @@
Changelog
=========
+- :feature:`2058` (solves :issue:`1587` and possibly others) Add an explicit
+ ``max_concurrent_prefetch_requests`` argument to `paramiko.client.SSHClient.get`
+ and `paramiko.client.SSHClient.getfo`, allowing users to limit the number
+ of concurrent requests used during prefetch. Patch by ``@kschoelhorn``, with
+ a test by ``@bwinston-sdp``.
+
- :release:`3.2.0 <2023-05-25>`
- :bug:`- major` Fixed a very sneaky bug found at the apparently
rarely-traveled intersection of ``RSA-SHA2`` keys, certificates, SSH agents,