diff options
Diffstat (limited to 'paramiko/sftp_file.py')
-rw-r--r-- | paramiko/sftp_file.py | 79 |
1 files changed, 76 insertions, 3 deletions
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index 3770f2d4..4ecf9c41 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -20,6 +20,7 @@ L{SFTPFile} """ +import threading from paramiko.common import * from paramiko.sftp import * from paramiko.file import BufferedFile @@ -41,6 +42,8 @@ class SFTPFile (BufferedFile): self.handle = handle BufferedFile._set_mode(self, mode, bufsize) self.pipelined = False + self._prefetching = False + self._saved_exception = None def __del__(self): self.close() @@ -56,7 +59,7 @@ class SFTPFile (BufferedFile): if self._closed: return if self.pipelined: - self.sftp._finish_responses() + self.sftp._finish_responses(self) BufferedFile.close(self) try: self.sftp._request(CMD_CLOSE, self.handle) @@ -67,8 +70,29 @@ class SFTPFile (BufferedFile): # may have outlived the Transport connection pass + def _read_prefetch(self, size): + while (self._prefetch_so_far <= self._realpos) and \ + (self._prefetch_so_far < self._prefetch_size) and not self._closed: + self.sftp._read_response() + self._check_exception() + k = self._prefetch_data.keys() + k.sort() + while (len(k) > 0) and (k[0] + len(self._prefetch_data[k[0]]) <= self._realpos): + # done with that block + del self._prefetch_data[k[0]] + k.pop(0) + if len(k) == 0: + self._prefetching = False + return '' + assert k[0] <= self._realpos + buf_offset = self._realpos - k[0] + buf_length = len(self._prefetch_data[k[0]]) - buf_offset + return self._prefetch_data[k[0]][buf_offset : buf_offset + buf_length] + def _read(self, size): size = min(size, self.MAX_REQUEST_SIZE) + if self._prefetching: + return self._read_prefetch(size) t, msg = self.sftp._request(CMD_READ, self.handle, long(self._realpos), int(size)) if t != CMD_DATA: raise SFTPError('Expected data') @@ -77,9 +101,10 @@ class SFTPFile (BufferedFile): def _write(self, data): # may write less than requested if it would exceed max packet size chunk = min(len(data), self.MAX_REQUEST_SIZE) - self.sftp._async_request(CMD_WRITE, self.handle, long(self._realpos), str(data[:chunk])) + req = self.sftp._async_request(type(None), CMD_WRITE, self.handle, long(self._realpos), + str(data[:chunk])) if not self.pipelined or self.sftp.sock.recv_ready(): - t, msg = self.sftp._read_response() + t, msg = self.sftp._read_response(req) if t != CMD_STATUS: raise SFTPError('Expected status') self.sftp._convert_status(msg) @@ -217,6 +242,33 @@ class SFTPFile (BufferedFile): @since: 1.5 """ self.pipelined = pipelined + + def prefetch(self): + """ + Pre-fetch the remaining contents of this file in anticipation of + future L{read} calls. If reading the entire file, pre-fetching can + dramatically improve the download speed by avoiding roundtrip latency. + The file's contents are incrementally buffered in a background thread. + + @since: 1.5.1 + """ + size = self.stat().st_size + # queue up async reads for the rest of the file + self._prefetching = True + self._prefetch_so_far = self._realpos + self._prefetch_size = size + self._prefetch_data = {} + t = threading.Thread(target=self._prefetch) + t.setDaemon(True) + t.start() + + def _prefetch(self): + n = self._realpos + size = self._prefetch_size + while n < size: + chunk = min(self.MAX_REQUEST_SIZE, size - n) + self.sftp._async_request(self, CMD_READ, self.handle, long(n), int(chunk)) + n += chunk ### internals... @@ -227,3 +279,24 @@ class SFTPFile (BufferedFile): return self.stat().st_size except: return 0 + + def _async_response(self, t, msg): + if t == CMD_STATUS: + # save exception and re-raise it on next file operation + try: + self.sftp._convert_status(msg) + except Exception, x: + self._saved_exception = x + return + if t != CMD_DATA: + raise SFTPError('Expected data') + data = msg.get_string() + self._prefetch_data[self._prefetch_so_far] = data + self._prefetch_so_far += len(data) + + def _check_exception(self): + "if there's a saved exception, raise & clear it" + if self._saved_exception is not None: + x = self._saved_exception + self._saved_exception = None + raise x |