summaryrefslogtreecommitdiffhomepage
path: root/paramiko/sftp_file.py
diff options
context:
space:
mode:
Diffstat (limited to 'paramiko/sftp_file.py')
-rw-r--r--paramiko/sftp_file.py79
1 files changed, 76 insertions, 3 deletions
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py
index 3770f2d4..4ecf9c41 100644
--- a/paramiko/sftp_file.py
+++ b/paramiko/sftp_file.py
@@ -20,6 +20,7 @@
L{SFTPFile}
"""
+import threading
from paramiko.common import *
from paramiko.sftp import *
from paramiko.file import BufferedFile
@@ -41,6 +42,8 @@ class SFTPFile (BufferedFile):
self.handle = handle
BufferedFile._set_mode(self, mode, bufsize)
self.pipelined = False
+ self._prefetching = False
+ self._saved_exception = None
def __del__(self):
self.close()
@@ -56,7 +59,7 @@ class SFTPFile (BufferedFile):
if self._closed:
return
if self.pipelined:
- self.sftp._finish_responses()
+ self.sftp._finish_responses(self)
BufferedFile.close(self)
try:
self.sftp._request(CMD_CLOSE, self.handle)
@@ -67,8 +70,29 @@ class SFTPFile (BufferedFile):
# may have outlived the Transport connection
pass
+ def _read_prefetch(self, size):
+ while (self._prefetch_so_far <= self._realpos) and \
+ (self._prefetch_so_far < self._prefetch_size) and not self._closed:
+ self.sftp._read_response()
+ self._check_exception()
+ k = self._prefetch_data.keys()
+ k.sort()
+ while (len(k) > 0) and (k[0] + len(self._prefetch_data[k[0]]) <= self._realpos):
+ # done with that block
+ del self._prefetch_data[k[0]]
+ k.pop(0)
+ if len(k) == 0:
+ self._prefetching = False
+ return ''
+ assert k[0] <= self._realpos
+ buf_offset = self._realpos - k[0]
+ buf_length = len(self._prefetch_data[k[0]]) - buf_offset
+ return self._prefetch_data[k[0]][buf_offset : buf_offset + buf_length]
+
def _read(self, size):
size = min(size, self.MAX_REQUEST_SIZE)
+ if self._prefetching:
+ return self._read_prefetch(size)
t, msg = self.sftp._request(CMD_READ, self.handle, long(self._realpos), int(size))
if t != CMD_DATA:
raise SFTPError('Expected data')
@@ -77,9 +101,10 @@ class SFTPFile (BufferedFile):
def _write(self, data):
# may write less than requested if it would exceed max packet size
chunk = min(len(data), self.MAX_REQUEST_SIZE)
- self.sftp._async_request(CMD_WRITE, self.handle, long(self._realpos), str(data[:chunk]))
+ req = self.sftp._async_request(type(None), CMD_WRITE, self.handle, long(self._realpos),
+ str(data[:chunk]))
if not self.pipelined or self.sftp.sock.recv_ready():
- t, msg = self.sftp._read_response()
+ t, msg = self.sftp._read_response(req)
if t != CMD_STATUS:
raise SFTPError('Expected status')
self.sftp._convert_status(msg)
@@ -217,6 +242,33 @@ class SFTPFile (BufferedFile):
@since: 1.5
"""
self.pipelined = pipelined
+
+ def prefetch(self):
+ """
+ Pre-fetch the remaining contents of this file in anticipation of
+ future L{read} calls. If reading the entire file, pre-fetching can
+ dramatically improve the download speed by avoiding roundtrip latency.
+ The file's contents are incrementally buffered in a background thread.
+
+ @since: 1.5.1
+ """
+ size = self.stat().st_size
+ # queue up async reads for the rest of the file
+ self._prefetching = True
+ self._prefetch_so_far = self._realpos
+ self._prefetch_size = size
+ self._prefetch_data = {}
+ t = threading.Thread(target=self._prefetch)
+ t.setDaemon(True)
+ t.start()
+
+ def _prefetch(self):
+ n = self._realpos
+ size = self._prefetch_size
+ while n < size:
+ chunk = min(self.MAX_REQUEST_SIZE, size - n)
+ self.sftp._async_request(self, CMD_READ, self.handle, long(n), int(chunk))
+ n += chunk
### internals...
@@ -227,3 +279,24 @@ class SFTPFile (BufferedFile):
return self.stat().st_size
except:
return 0
+
+ def _async_response(self, t, msg):
+ if t == CMD_STATUS:
+ # save exception and re-raise it on next file operation
+ try:
+ self.sftp._convert_status(msg)
+ except Exception, x:
+ self._saved_exception = x
+ return
+ if t != CMD_DATA:
+ raise SFTPError('Expected data')
+ data = msg.get_string()
+ self._prefetch_data[self._prefetch_so_far] = data
+ self._prefetch_so_far += len(data)
+
+ def _check_exception(self):
+ "if there's a saved exception, raise & clear it"
+ if self._saved_exception is not None:
+ x = self._saved_exception
+ self._saved_exception = None
+ raise x