diff options
Diffstat (limited to 'paramiko/sftp_file.py')
-rw-r--r-- | paramiko/sftp_file.py | 248 |
1 files changed, 134 insertions, 114 deletions
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index 8c5c7aca..03d67b33 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -7,7 +7,7 @@ # Software Foundation; either version 2.1 of the License, or (at your option) # any later version. # -# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. @@ -17,23 +17,31 @@ # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. """ -L{SFTPFile} +SFTP file object """ +from __future__ import with_statement + from binascii import hexlify +from collections import deque import socket import threading import time +from paramiko.common import DEBUG -from paramiko.common import * -from paramiko.sftp import * from paramiko.file import BufferedFile +from paramiko.py3compat import long +from paramiko.sftp import CMD_CLOSE, CMD_READ, CMD_DATA, SFTPError, CMD_WRITE, \ + CMD_STATUS, CMD_FSTAT, CMD_ATTRS, CMD_FSETSTAT, CMD_EXTENDED from paramiko.sftp_attr import SFTPAttributes class SFTPFile (BufferedFile): """ Proxy object for a file on the remote server, in client mode SFTP. + + Instances of this class may be used as context managers in the same way + that built-in Python file objects are. """ # Some sftp servers will choke if you send read/write requests larger than @@ -49,13 +57,18 @@ class SFTPFile (BufferedFile): self._prefetching = False self._prefetch_done = False self._prefetch_data = {} - self._prefetch_reads = [] + self._prefetch_extents = {} + self._prefetch_lock = threading.Lock() self._saved_exception = None + self._reqs = deque() def __del__(self): self._close(async=True) def close(self): + """ + Close the file. + """ self._close(async=False) def _close(self, async=False): @@ -86,10 +99,10 @@ class SFTPFile (BufferedFile): pass def _data_in_prefetch_requests(self, offset, size): - k = [i for i in self._prefetch_reads if i[0] <= offset] + k = [x for x in list(self._prefetch_extents.values()) if x[0] <= offset] if len(k) == 0: return False - k.sort(lambda x, y: cmp(x[0], y[0])) + k.sort(key=lambda x: x[0]) buf_offset, buf_size = k[-1] if buf_offset + buf_size <= offset: # prefetch request ends before this one begins @@ -160,45 +173,47 @@ class SFTPFile (BufferedFile): def _write(self, data): # may write less than requested if it would exceed max packet size chunk = min(len(data), self.MAX_REQUEST_SIZE) - req = self.sftp._async_request(type(None), CMD_WRITE, self.handle, long(self._realpos), str(data[:chunk])) - if not self.pipelined or self.sftp.sock.recv_ready(): - t, msg = self.sftp._read_response(req) - if t != CMD_STATUS: - raise SFTPError('Expected status') - # convert_status already called + self._reqs.append(self.sftp._async_request(type(None), CMD_WRITE, self.handle, long(self._realpos), data[:chunk])) + if not self.pipelined or (len(self._reqs) > 100 and self.sftp.sock.recv_ready()): + while len(self._reqs): + req = self._reqs.popleft() + t, msg = self.sftp._read_response(req) + if t != CMD_STATUS: + raise SFTPError('Expected status') + # convert_status already called return chunk def settimeout(self, timeout): """ Set a timeout on read/write operations on the underlying socket or - ssh L{Channel}. + ssh `.Channel`. + + :param float timeout: + seconds to wait for a pending read/write operation before raising + ``socket.timeout``, or ``None`` for no timeout - @see: L{Channel.settimeout} - @param timeout: seconds to wait for a pending read/write operation - before raising C{socket.timeout}, or C{None} for no timeout - @type timeout: float + .. seealso:: `.Channel.settimeout` """ self.sftp.sock.settimeout(timeout) def gettimeout(self): """ - Returns the timeout in seconds (as a float) associated with the socket - or ssh L{Channel} used for this file. + Returns the timeout in seconds (as a `float`) associated with the + socket or ssh `.Channel` used for this file. - @see: L{Channel.gettimeout} - @rtype: float + .. seealso:: `.Channel.gettimeout` """ return self.sftp.sock.gettimeout() def setblocking(self, blocking): """ Set blocking or non-blocking mode on the underiying socket or ssh - L{Channel}. + `.Channel`. + + :param int blocking: + 0 to set non-blocking mode; non-0 to set blocking mode. - @see: L{Channel.setblocking} - @param blocking: 0 to set non-blocking mode; non-0 to set blocking - mode. - @type blocking: int + .. seealso:: `.Channel.setblocking` """ self.sftp.sock.setblocking(blocking) @@ -211,16 +226,15 @@ class SFTPFile (BufferedFile): self._realpos = self._pos else: self._realpos = self._pos = self._get_size() + offset - self._rbuffer = '' + self._rbuffer = bytes() def stat(self): """ Retrieve information about this file from the remote system. This is - exactly like L{SFTP.stat}, except that it operates on an already-open - file. + exactly like `.SFTPClient.stat`, except that it operates on an + already-open file. - @return: an object containing attributes about this file. - @rtype: SFTPAttributes + :return: an `.SFTPAttributes` object containing attributes about this file. """ t, msg = self.sftp._request(CMD_FSTAT, self.handle) if t != CMD_ATTRS: @@ -230,11 +244,10 @@ class SFTPFile (BufferedFile): def chmod(self, mode): """ Change the mode (permissions) of this file. The permissions are - unix-style and identical to those used by python's C{os.chmod} + unix-style and identical to those used by Python's `os.chmod` function. - @param mode: new permissions - @type mode: int + :param int mode: new permissions """ self.sftp._log(DEBUG, 'chmod(%s, %r)' % (hexlify(self.handle), mode)) attr = SFTPAttributes() @@ -243,15 +256,13 @@ class SFTPFile (BufferedFile): def chown(self, uid, gid): """ - Change the owner (C{uid}) and group (C{gid}) of this file. As with - python's C{os.chown} function, you must pass both arguments, so if you - only want to change one, use L{stat} first to retrieve the current + Change the owner (``uid``) and group (``gid``) of this file. As with + Python's `os.chown` function, you must pass both arguments, so if you + only want to change one, use `stat` first to retrieve the current owner and group. - @param uid: new owner's uid - @type uid: int - @param gid: new group id - @type gid: int + :param int uid: new owner's uid + :param int gid: new group id """ self.sftp._log(DEBUG, 'chown(%s, %r, %r)' % (hexlify(self.handle), uid, gid)) attr = SFTPAttributes() @@ -261,15 +272,15 @@ class SFTPFile (BufferedFile): def utime(self, times): """ Set the access and modified times of this file. If - C{times} is C{None}, then the file's access and modified times are set - to the current time. Otherwise, C{times} must be a 2-tuple of numbers, - of the form C{(atime, mtime)}, which is used to set the access and - modified times, respectively. This bizarre API is mimicked from python + ``times`` is ``None``, then the file's access and modified times are set + to the current time. Otherwise, ``times`` must be a 2-tuple of numbers, + of the form ``(atime, mtime)``, which is used to set the access and + modified times, respectively. This bizarre API is mimicked from Python for the sake of consistency -- I apologize. - @param times: C{None} or a tuple of (access time, modified time) in - standard internet epoch time (seconds since 01 January 1970 GMT) - @type times: tuple(int) + :param tuple times: + ``None`` or a tuple of (access time, modified time) in standard + internet epoch time (seconds since 01 January 1970 GMT) """ if times is None: times = (time.time(), time.time()) @@ -281,11 +292,11 @@ class SFTPFile (BufferedFile): def truncate(self, size): """ Change the size of this file. This usually extends - or shrinks the size of the file, just like the C{truncate()} method on - python file objects. + or shrinks the size of the file, just like the ``truncate()`` method on + Python file objects. - @param size: the new size of the file - @type size: int or long + :param size: the new size of the file + :type size: int or long """ self.sftp._log(DEBUG, 'truncate(%s, %r)' % (hexlify(self.handle), size)) attr = SFTPAttributes() @@ -298,51 +309,53 @@ class SFTPFile (BufferedFile): to verify a successful upload or download, or for various rsync-like operations. - The file is hashed from C{offset}, for C{length} bytes. If C{length} - is 0, the remainder of the file is hashed. Thus, if both C{offset} - and C{length} are zero, the entire file is hashed. + The file is hashed from ``offset``, for ``length`` bytes. If ``length`` + is 0, the remainder of the file is hashed. Thus, if both ``offset`` + and ``length`` are zero, the entire file is hashed. - Normally, C{block_size} will be 0 (the default), and this method will + Normally, ``block_size`` will be 0 (the default), and this method will return a byte string representing the requested hash (for example, a string of length 16 for MD5, or 20 for SHA-1). If a non-zero - C{block_size} is given, each chunk of the file (from C{offset} to - C{offset + length}) of C{block_size} bytes is computed as a separate + ``block_size`` is given, each chunk of the file (from ``offset`` to + ``offset + length``) of ``block_size`` bytes is computed as a separate hash. The hash results are all concatenated and returned as a single string. - For example, C{check('sha1', 0, 1024, 512)} will return a string of + For example, ``check('sha1', 0, 1024, 512)`` will return a string of length 40. The first 20 bytes will be the SHA-1 of the first 512 bytes of the file, and the last 20 bytes will be the SHA-1 of the next 512 bytes. - @param hash_algorithm: the name of the hash algorithm to use (normally - C{"sha1"} or C{"md5"}) - @type hash_algorithm: str - @param offset: offset into the file to begin hashing (0 means to start - from the beginning) - @type offset: int or long - @param length: number of bytes to hash (0 means continue to the end of - the file) - @type length: int or long - @param block_size: number of bytes to hash per result (must not be less - than 256; 0 means to compute only one hash of the entire segment) - @type block_size: int - @return: string of bytes representing the hash of each block, - concatenated together - @rtype: str + :param str hash_algorithm: + the name of the hash algorithm to use (normally ``"sha1"`` or + ``"md5"``) + :param offset: + offset into the file to begin hashing (0 means to start from the + beginning) + :type offset: int or long + :param length: + number of bytes to hash (0 means continue to the end of the file) + :type length: int or long + :param int block_size: + number of bytes to hash per result (must not be less than 256; 0 + means to compute only one hash of the entire segment) + :type block_size: int + :return: + `str` of bytes representing the hash of each block, concatenated + together - @note: Many (most?) servers don't support this extension yet. - - @raise IOError: if the server doesn't support the "check-file" + :raises IOError: if the server doesn't support the "check-file" extension, or possibly doesn't support the hash algorithm requested - @since: 1.4 + .. note:: Many (most?) servers don't support this extension yet. + + .. versionadded:: 1.4 """ t, msg = self.sftp._request(CMD_EXTENDED, 'check-file', self.handle, hash_algorithm, long(offset), long(length), block_size) - ext = msg.get_string() - alg = msg.get_string() + ext = msg.get_text() + alg = msg.get_text() data = msg.get_remainder() return data @@ -350,35 +363,35 @@ class SFTPFile (BufferedFile): """ Turn on/off the pipelining of write operations to this file. When pipelining is on, paramiko won't wait for the server response after - each write operation. Instead, they're collected as they come in. - At the first non-write operation (including L{close}), all remaining + each write operation. Instead, they're collected as they come in. At + the first non-write operation (including `.close`), all remaining server responses are collected. This means that if there was an error - with one of your later writes, an exception might be thrown from - within L{close} instead of L{write}. + with one of your later writes, an exception might be thrown from within + `.close` instead of `.write`. - By default, files are I{not} pipelined. + By default, files are not pipelined. - @param pipelined: C{True} if pipelining should be turned on for this - file; C{False} otherwise - @type pipelined: bool + :param bool pipelined: + ``True`` if pipelining should be turned on for this file; ``False`` + otherwise - @since: 1.5 + .. versionadded:: 1.5 """ self.pipelined = pipelined def prefetch(self): """ - Pre-fetch the remaining contents of this file in anticipation of - future L{read} calls. If reading the entire file, pre-fetching can + Pre-fetch the remaining contents of this file in anticipation of future + `.read` calls. If reading the entire file, pre-fetching can dramatically improve the download speed by avoiding roundtrip latency. The file's contents are incrementally buffered in a background thread. - The prefetched data is stored in a buffer until read via the L{read} + The prefetched data is stored in a buffer until read via the `.read` method. Once data has been read, it's removed from the buffer. The - data may be read in a random order (using L{seek}); chunks of the + data may be read in a random order (using `.seek`); chunks of the buffer that haven't been read will continue to be buffered. - @since: 1.5.1 + .. versionadded:: 1.5.1 """ size = self.stat().st_size # queue up async reads for the rest of the file @@ -394,17 +407,17 @@ class SFTPFile (BufferedFile): def readv(self, chunks): """ Read a set of blocks from the file by (offset, length). This is more - efficient than doing a series of L{seek} and L{read} calls, since the + efficient than doing a series of `.seek` and `.read` calls, since the prefetch machinery is used to retrieve all the requested blocks at once. - @param chunks: a list of (offset, length) tuples indicating which - sections of the file to read - @type chunks: list(tuple(long, int)) - @return: a list of blocks read, in the same order as in C{chunks} - @rtype: list(str) + :param chunks: + a list of (offset, length) tuples indicating which sections of the + file to read + :type chunks: list(tuple(long, int)) + :return: a list of blocks read, in the same order as in ``chunks`` - @since: 1.5.4 + .. versionadded:: 1.5.4 """ self.sftp._log(DEBUG, 'readv(%s, %r)' % (hexlify(self.handle), chunks)) @@ -426,11 +439,9 @@ class SFTPFile (BufferedFile): for x in chunks: self.seek(x[0]) yield self.read(x[1]) - ### internals... - def _get_size(self): try: return self.stat().st_size @@ -440,7 +451,6 @@ class SFTPFile (BufferedFile): def _start_prefetch(self, chunks): self._prefetching = True self._prefetch_done = False - self._prefetch_reads.extend(chunks) t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) t.setDaemon(True) @@ -450,27 +460,37 @@ class SFTPFile (BufferedFile): # do these read requests in a temporary thread because there may be # a lot of them, so it may block. for offset, length in chunks: - self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) + with self._prefetch_lock: + num = self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) + self._prefetch_extents[num] = (offset, length) - def _async_response(self, t, msg): + def _async_response(self, t, msg, num): if t == CMD_STATUS: # save exception and re-raise it on next file operation try: self.sftp._convert_status(msg) - except Exception, x: - self._saved_exception = x + except Exception as e: + self._saved_exception = e return if t != CMD_DATA: raise SFTPError('Expected data') data = msg.get_string() - offset, length = self._prefetch_reads.pop(0) - self._prefetch_data[offset] = data - if len(self._prefetch_reads) == 0: - self._prefetch_done = True + with self._prefetch_lock: + offset, length = self._prefetch_extents[num] + self._prefetch_data[offset] = data + del self._prefetch_extents[num] + if len(self._prefetch_extents) == 0: + self._prefetch_done = True def _check_exception(self): - "if there's a saved exception, raise & clear it" + """if there's a saved exception, raise & clear it""" if self._saved_exception is not None: x = self._saved_exception self._saved_exception = None raise x + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() |