diff options
Diffstat (limited to 'paramiko/sftp_file.py')
-rw-r--r-- | paramiko/sftp_file.py | 216 |
1 files changed, 141 insertions, 75 deletions
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py index d0a37da3..337cdbeb 100644 --- a/paramiko/sftp_file.py +++ b/paramiko/sftp_file.py @@ -31,8 +31,10 @@ from paramiko.common import DEBUG from paramiko.file import BufferedFile from paramiko.py3compat import long -from paramiko.sftp import CMD_CLOSE, CMD_READ, CMD_DATA, SFTPError, CMD_WRITE, \ - CMD_STATUS, CMD_FSTAT, CMD_ATTRS, CMD_FSETSTAT, CMD_EXTENDED +from paramiko.sftp import ( + CMD_CLOSE, CMD_READ, CMD_DATA, SFTPError, CMD_WRITE, CMD_STATUS, CMD_FSTAT, + CMD_ATTRS, CMD_FSETSTAT, CMD_EXTENDED, +) from paramiko.sftp_attr import SFTPAttributes @@ -64,13 +66,13 @@ class SFTPFile (BufferedFile): def __del__(self): self._close(async=True) - + def close(self): """ Close the file. """ self._close(async=False) - + def _close(self, async=False): # We allow double-close without signaling an error, because real # Python file objects do. However, we must protect against actually @@ -87,7 +89,8 @@ class SFTPFile (BufferedFile): BufferedFile.close(self) try: if async: - # GC'd file handle could be called from an arbitrary thread -- don't wait for a response + # GC'd file handle could be called from an arbitrary thread + # -- don't wait for a response self.sftp._async_request(type(None), CMD_CLOSE, self.handle) else: self.sftp._request(CMD_CLOSE, self.handle) @@ -99,7 +102,8 @@ class SFTPFile (BufferedFile): pass def _data_in_prefetch_requests(self, offset, size): - k = [x for x in list(self._prefetch_extents.values()) if x[0] <= offset] + k = [x for x in list(self._prefetch_extents.values()) + if x[0] <= offset] if len(k) == 0: return False k.sort(key=lambda x: x[0]) @@ -110,9 +114,12 @@ class SFTPFile (BufferedFile): if buf_offset + buf_size >= offset + size: # inclusive return True - # well, we have part of the request. see if another chunk has the rest. - return self._data_in_prefetch_requests(buf_offset + buf_size, offset + size - buf_offset - buf_size) - + # well, we have part of the request. see if another chunk has + # the rest. + return self._data_in_prefetch_requests( + buf_offset + buf_size, + offset + size - buf_offset - buf_size) + def _data_in_prefetch_buffers(self, offset): """ if a block of data is present in the prefetch buffers, at the given @@ -129,13 +136,14 @@ class SFTPFile (BufferedFile): # it's not here return None return index - + def _read_prefetch(self, size): """ read data out of the prefetch buffer, if possible. if the data isn't in the buffer, return None. otherwise, behaves like a normal read. """ - # while not closed, and haven't fetched past the current position, and haven't reached EOF... + # while not closed, and haven't fetched past the current position, + # and haven't reached EOF... while True: offset = self._data_in_prefetch_buffers(self._realpos) if offset is not None: @@ -149,7 +157,7 @@ class SFTPFile (BufferedFile): return None prefetch = self._prefetch_data[offset] del self._prefetch_data[offset] - + buf_offset = self._realpos - offset if buf_offset > 0: self._prefetch_data[offset] = prefetch[:buf_offset] @@ -158,14 +166,19 @@ class SFTPFile (BufferedFile): self._prefetch_data[self._realpos + size] = prefetch[size:] prefetch = prefetch[:size] return prefetch - + def _read(self, size): size = min(size, self.MAX_REQUEST_SIZE) if self._prefetching: data = self._read_prefetch(size) if data is not None: return data - t, msg = self.sftp._request(CMD_READ, self.handle, long(self._realpos), int(size)) + t, msg = self.sftp._request( + CMD_READ, + self.handle, + long(self._realpos), + int(size) + ) if t != CMD_DATA: raise SFTPError('Expected data') return msg.get_string() @@ -173,8 +186,18 @@ class SFTPFile (BufferedFile): def _write(self, data): # may write less than requested if it would exceed max packet size chunk = min(len(data), self.MAX_REQUEST_SIZE) - self._reqs.append(self.sftp._async_request(type(None), CMD_WRITE, self.handle, long(self._realpos), data[:chunk])) - if not self.pipelined or (len(self._reqs) > 100 and self.sftp.sock.recv_ready()): + sftp_async_request = self.sftp._async_request( + type(None), + CMD_WRITE, + self.handle, + long(self._realpos), + data[:chunk] + ) + self._reqs.append(sftp_async_request) + if ( + not self.pipelined or + (len(self._reqs) > 100 and self.sftp.sock.recv_ready()) + ): while len(self._reqs): req = self._reqs.popleft() t, msg = self.sftp._read_response(req) @@ -217,7 +240,22 @@ class SFTPFile (BufferedFile): """ self.sftp.sock.setblocking(blocking) + def seekable(self): + """ + Check if the file supports random access. + + :return: + `True` if the file supports random access. If `False`, + :meth:`seek` will raise an exception + """ + return True + def seek(self, offset, whence=0): + """ + Set the file's current position. + + See `file.seek` for details. + """ self.flush() if whence == self.SEEK_SET: self._realpos = self._pos = offset @@ -234,7 +272,8 @@ class SFTPFile (BufferedFile): exactly like `.SFTPClient.stat`, except that it operates on an already-open file. - :return: an `.SFTPAttributes` object containing attributes about this file. + :returns: + an `.SFTPAttributes` object containing attributes about this file. """ t, msg = self.sftp._request(CMD_FSTAT, self.handle) if t != CMD_ATTRS: @@ -253,7 +292,7 @@ class SFTPFile (BufferedFile): attr = SFTPAttributes() attr.st_mode = mode self.sftp._request(CMD_FSETSTAT, self.handle, attr) - + def chown(self, uid, gid): """ Change the owner (``uid``) and group (``gid``) of this file. As with @@ -264,7 +303,9 @@ class SFTPFile (BufferedFile): :param int uid: new owner's uid :param int gid: new group id """ - self.sftp._log(DEBUG, 'chown(%s, %r, %r)' % (hexlify(self.handle), uid, gid)) + self.sftp._log( + DEBUG, + 'chown(%s, %r, %r)' % (hexlify(self.handle), uid, gid)) attr = SFTPAttributes() attr.st_uid, attr.st_gid = uid, gid self.sftp._request(CMD_FSETSTAT, self.handle, attr) @@ -272,11 +313,11 @@ class SFTPFile (BufferedFile): def utime(self, times): """ Set the access and modified times of this file. If - ``times`` is ``None``, then the file's access and modified times are set - to the current time. Otherwise, ``times`` must be a 2-tuple of numbers, - of the form ``(atime, mtime)``, which is used to set the access and - modified times, respectively. This bizarre API is mimicked from Python - for the sake of consistency -- I apologize. + ``times`` is ``None``, then the file's access and modified times are + set to the current time. Otherwise, ``times`` must be a 2-tuple of + numbers, of the form ``(atime, mtime)``, which is used to set the + access and modified times, respectively. This bizarre API is mimicked + from Python for the sake of consistency -- I apologize. :param tuple times: ``None`` or a tuple of (access time, modified time) in standard @@ -294,25 +335,26 @@ class SFTPFile (BufferedFile): Change the size of this file. This usually extends or shrinks the size of the file, just like the ``truncate()`` method on Python file objects. - + :param size: the new size of the file - :type size: int or long """ - self.sftp._log(DEBUG, 'truncate(%s, %r)' % (hexlify(self.handle), size)) + self.sftp._log( + DEBUG, + 'truncate(%s, %r)' % (hexlify(self.handle), size)) attr = SFTPAttributes() attr.st_size = size self.sftp._request(CMD_FSETSTAT, self.handle, attr) - + def check(self, hash_algorithm, offset=0, length=0, block_size=0): """ Ask the server for a hash of a section of this file. This can be used to verify a successful upload or download, or for various rsync-like operations. - - The file is hashed from ``offset``, for ``length`` bytes. If ``length`` - is 0, the remainder of the file is hashed. Thus, if both ``offset`` - and ``length`` are zero, the entire file is hashed. - + + The file is hashed from ``offset``, for ``length`` bytes. + If ``length`` is 0, the remainder of the file is hashed. Thus, if both + ``offset`` and ``length`` are zero, the entire file is hashed. + Normally, ``block_size`` will be 0 (the default), and this method will return a byte string representing the requested hash (for example, a string of length 16 for MD5, or 20 for SHA-1). If a non-zero @@ -320,45 +362,43 @@ class SFTPFile (BufferedFile): ``offset + length``) of ``block_size`` bytes is computed as a separate hash. The hash results are all concatenated and returned as a single string. - + For example, ``check('sha1', 0, 1024, 512)`` will return a string of length 40. The first 20 bytes will be the SHA-1 of the first 512 bytes of the file, and the last 20 bytes will be the SHA-1 of the next 512 bytes. - + :param str hash_algorithm: the name of the hash algorithm to use (normally ``"sha1"`` or ``"md5"``) :param offset: offset into the file to begin hashing (0 means to start from the beginning) - :type offset: int or long :param length: number of bytes to hash (0 means continue to the end of the file) - :type length: int or long :param int block_size: number of bytes to hash per result (must not be less than 256; 0 means to compute only one hash of the entire segment) - :type block_size: int :return: `str` of bytes representing the hash of each block, concatenated together - - :raises IOError: if the server doesn't support the "check-file" - extension, or possibly doesn't support the hash algorithm - requested - + + :raises: + ``IOError`` -- if the server doesn't support the "check-file" + extension, or possibly doesn't support the hash algorithm requested + .. note:: Many (most?) servers don't support this extension yet. - + .. versionadded:: 1.4 """ - t, msg = self.sftp._request(CMD_EXTENDED, 'check-file', self.handle, - hash_algorithm, long(offset), long(length), block_size) - ext = msg.get_text() - alg = msg.get_text() + t, msg = self.sftp._request( + CMD_EXTENDED, 'check-file', self.handle, + hash_algorithm, long(offset), long(length), block_size) + msg.get_text() # ext + msg.get_text() # alg data = msg.get_remainder() return data - + def set_pipelined(self, pipelined=True): """ Turn on/off the pipelining of write operations to this file. When @@ -368,55 +408,69 @@ class SFTPFile (BufferedFile): server responses are collected. This means that if there was an error with one of your later writes, an exception might be thrown from within `.close` instead of `.write`. - + By default, files are not pipelined. - + :param bool pipelined: ``True`` if pipelining should be turned on for this file; ``False`` otherwise - + .. versionadded:: 1.5 """ self.pipelined = pipelined - - def prefetch(self): + + def prefetch(self, file_size=None): """ Pre-fetch the remaining contents of this file in anticipation of future `.read` calls. If reading the entire file, pre-fetching can dramatically improve the download speed by avoiding roundtrip latency. The file's contents are incrementally buffered in a background thread. - + The prefetched data is stored in a buffer until read via the `.read` method. Once data has been read, it's removed from the buffer. The data may be read in a random order (using `.seek`); chunks of the buffer that haven't been read will continue to be buffered. + :param int file_size: + When this is ``None`` (the default), this method calls `stat` to + determine the remote file size. In some situations, doing so can + cause exceptions or hangs (see `#562 + <https://github.com/paramiko/paramiko/pull/562>`_); as a + workaround, one may call `stat` explicitly and pass its value in + via this parameter. + .. versionadded:: 1.5.1 + .. versionchanged:: 1.16.0 + The ``file_size`` parameter was added (with no default value). + .. versionchanged:: 1.16.1 + The ``file_size`` parameter was made optional for backwards + compatibility. """ - size = self.stat().st_size + if file_size is None: + file_size = self.stat().st_size + # queue up async reads for the rest of the file chunks = [] n = self._realpos - while n < size: - chunk = min(self.MAX_REQUEST_SIZE, size - n) + while n < file_size: + chunk = min(self.MAX_REQUEST_SIZE, file_size - n) chunks.append((n, chunk)) n += chunk if len(chunks) > 0: self._start_prefetch(chunks) - + def readv(self, chunks): """ Read a set of blocks from the file by (offset, length). This is more efficient than doing a series of `.seek` and `.read` calls, since the prefetch machinery is used to retrieve all the requested blocks at once. - + :param chunks: - a list of (offset, length) tuples indicating which sections of the - file to read - :type chunks: list(tuple(long, int)) + a list of ``(offset, length)`` tuples indicating which sections of + the file to read :return: a list of blocks read, in the same order as in ``chunks`` - + .. versionadded:: 1.5.4 """ self.sftp._log(DEBUG, 'readv(%s, %r)' % (hexlify(self.handle), chunks)) @@ -424,7 +478,10 @@ class SFTPFile (BufferedFile): read_chunks = [] for offset, size in chunks: # don't fetch data that's already in the prefetch buffer - if self._data_in_prefetch_buffers(offset) or self._data_in_prefetch_requests(offset, size): + if ( + self._data_in_prefetch_buffers(offset) or + self._data_in_prefetch_requests(offset, size) + ): continue # break up anything larger than the max read size @@ -440,7 +497,7 @@ class SFTPFile (BufferedFile): self.seek(x[0]) yield self.read(x[1]) - ### internals... + # ...internals... def _get_size(self): try: @@ -455,13 +512,18 @@ class SFTPFile (BufferedFile): t = threading.Thread(target=self._prefetch_thread, args=(chunks,)) t.setDaemon(True) t.start() - + def _prefetch_thread(self, chunks): # do these read requests in a temporary thread because there may be # a lot of them, so it may block. for offset, length in chunks: + num = self.sftp._async_request( + self, + CMD_READ, + self.handle, + long(offset), + int(length)) with self._prefetch_lock: - num = self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length)) self._prefetch_extents[num] = (offset, length) def _async_response(self, t, msg, num): @@ -475,13 +537,17 @@ class SFTPFile (BufferedFile): if t != CMD_DATA: raise SFTPError('Expected data') data = msg.get_string() - with self._prefetch_lock: - offset, length = self._prefetch_extents[num] - self._prefetch_data[offset] = data - del self._prefetch_extents[num] - if len(self._prefetch_extents) == 0: - self._prefetch_done = True - + while True: + with self._prefetch_lock: + # spin if in race with _prefetch_thread + if num in self._prefetch_extents: + offset, length = self._prefetch_extents[num] + self._prefetch_data[offset] = data + del self._prefetch_extents[num] + if len(self._prefetch_extents) == 0: + self._prefetch_done = True + break + def _check_exception(self): """if there's a saved exception, raise & clear it""" if self._saved_exception is not None: |