summaryrefslogtreecommitdiffhomepage
path: root/paramiko/sftp_file.py
diff options
context:
space:
mode:
Diffstat (limited to 'paramiko/sftp_file.py')
-rw-r--r--paramiko/sftp_file.py40
1 files changed, 32 insertions, 8 deletions
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py
index 9a0a6b34..c74695e0 100644
--- a/paramiko/sftp_file.py
+++ b/paramiko/sftp_file.py
@@ -26,7 +26,7 @@ from collections import deque
import socket
import threading
import time
-from paramiko.common import DEBUG
+from paramiko.common import DEBUG, io_sleep
from paramiko.file import BufferedFile
from paramiko.util import u
@@ -435,7 +435,7 @@ class SFTPFile(BufferedFile):
"""
self.pipelined = pipelined
- def prefetch(self, file_size=None):
+ def prefetch(self, file_size=None, max_concurrent_requests=None):
"""
Pre-fetch the remaining contents of this file in anticipation of future
`.read` calls. If reading the entire file, pre-fetching can
@@ -454,6 +454,10 @@ class SFTPFile(BufferedFile):
<https://github.com/paramiko/paramiko/pull/562>`_); as a
workaround, one may call `stat` explicitly and pass its value in
via this parameter.
+ :param int max_concurrent_requests:
+ The maximum number of concurrent read requests to prefetch. See
+ `.SFTPClient.get` (its ``max_concurrent_prefetch_requests`` param)
+ for details.
.. versionadded:: 1.5.1
.. versionchanged:: 1.16.0
@@ -461,6 +465,8 @@ class SFTPFile(BufferedFile):
.. versionchanged:: 1.16.1
The ``file_size`` parameter was made optional for backwards
compatibility.
+ .. versionchanged:: 3.3
+ Added ``max_concurrent_requests``.
"""
if file_size is None:
file_size = self.stat().st_size
@@ -473,9 +479,9 @@ class SFTPFile(BufferedFile):
chunks.append((n, chunk))
n += chunk
if len(chunks) > 0:
- self._start_prefetch(chunks)
+ self._start_prefetch(chunks, max_concurrent_requests)
- def readv(self, chunks):
+ def readv(self, chunks, max_concurrent_prefetch_requests=None):
"""
Read a set of blocks from the file by (offset, length). This is more
efficient than doing a series of `.seek` and `.read` calls, since the
@@ -485,9 +491,15 @@ class SFTPFile(BufferedFile):
:param chunks:
a list of ``(offset, length)`` tuples indicating which sections of
the file to read
+ :param int max_concurrent_prefetch_requests:
+ The maximum number of concurrent read requests to prefetch. See
+ `.SFTPClient.get` (its ``max_concurrent_prefetch_requests`` param)
+ for details.
:return: a list of blocks read, in the same order as in ``chunks``
.. versionadded:: 1.5.4
+ .. versionchanged:: 3.3
+ Added ``max_concurrent_prefetch_requests``.
"""
self.sftp._log(
DEBUG, "readv({}, {!r})".format(hexlify(self.handle), chunks)
@@ -508,7 +520,7 @@ class SFTPFile(BufferedFile):
offset += chunk_size
size -= chunk_size
- self._start_prefetch(read_chunks)
+ self._start_prefetch(read_chunks, max_concurrent_prefetch_requests)
# now we can just devolve to a bunch of read()s :)
for x in chunks:
self.seek(x[0])
@@ -522,18 +534,30 @@ class SFTPFile(BufferedFile):
except:
return 0
- def _start_prefetch(self, chunks):
+ def _start_prefetch(self, chunks, max_concurrent_requests=None):
self._prefetching = True
self._prefetch_done = False
- t = threading.Thread(target=self._prefetch_thread, args=(chunks,))
+ t = threading.Thread(
+ target=self._prefetch_thread,
+ args=(chunks, max_concurrent_requests),
+ )
t.daemon = True
t.start()
- def _prefetch_thread(self, chunks):
+ def _prefetch_thread(self, chunks, max_concurrent_requests):
# do these read requests in a temporary thread because there may be
# a lot of them, so it may block.
for offset, length in chunks:
+ # Limit the number of concurrent requests in a busy-loop
+ if max_concurrent_requests is not None:
+ while True:
+ with self._prefetch_lock:
+ pf_len = len(self._prefetch_extents)
+ if pf_len < max_concurrent_requests:
+ break
+ time.sleep(io_sleep)
+
num = self.sftp._async_request(
self, CMD_READ, self.handle, int64(offset), int(length)
)