summaryrefslogtreecommitdiffhomepage
path: root/paramiko/sftp_file.py
diff options
context:
space:
mode:
Diffstat (limited to 'paramiko/sftp_file.py')
-rw-r--r--paramiko/sftp_file.py216
1 files changed, 141 insertions, 75 deletions
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py
index d0a37da3..337cdbeb 100644
--- a/paramiko/sftp_file.py
+++ b/paramiko/sftp_file.py
@@ -31,8 +31,10 @@ from paramiko.common import DEBUG
from paramiko.file import BufferedFile
from paramiko.py3compat import long
-from paramiko.sftp import CMD_CLOSE, CMD_READ, CMD_DATA, SFTPError, CMD_WRITE, \
- CMD_STATUS, CMD_FSTAT, CMD_ATTRS, CMD_FSETSTAT, CMD_EXTENDED
+from paramiko.sftp import (
+ CMD_CLOSE, CMD_READ, CMD_DATA, SFTPError, CMD_WRITE, CMD_STATUS, CMD_FSTAT,
+ CMD_ATTRS, CMD_FSETSTAT, CMD_EXTENDED,
+)
from paramiko.sftp_attr import SFTPAttributes
@@ -64,13 +66,13 @@ class SFTPFile (BufferedFile):
def __del__(self):
self._close(async=True)
-
+
def close(self):
"""
Close the file.
"""
self._close(async=False)
-
+
def _close(self, async=False):
# We allow double-close without signaling an error, because real
# Python file objects do. However, we must protect against actually
@@ -87,7 +89,8 @@ class SFTPFile (BufferedFile):
BufferedFile.close(self)
try:
if async:
- # GC'd file handle could be called from an arbitrary thread -- don't wait for a response
+ # GC'd file handle could be called from an arbitrary thread
+ # -- don't wait for a response
self.sftp._async_request(type(None), CMD_CLOSE, self.handle)
else:
self.sftp._request(CMD_CLOSE, self.handle)
@@ -99,7 +102,8 @@ class SFTPFile (BufferedFile):
pass
def _data_in_prefetch_requests(self, offset, size):
- k = [x for x in list(self._prefetch_extents.values()) if x[0] <= offset]
+ k = [x for x in list(self._prefetch_extents.values())
+ if x[0] <= offset]
if len(k) == 0:
return False
k.sort(key=lambda x: x[0])
@@ -110,9 +114,12 @@ class SFTPFile (BufferedFile):
if buf_offset + buf_size >= offset + size:
# inclusive
return True
- # well, we have part of the request. see if another chunk has the rest.
- return self._data_in_prefetch_requests(buf_offset + buf_size, offset + size - buf_offset - buf_size)
-
+ # well, we have part of the request. see if another chunk has
+ # the rest.
+ return self._data_in_prefetch_requests(
+ buf_offset + buf_size,
+ offset + size - buf_offset - buf_size)
+
def _data_in_prefetch_buffers(self, offset):
"""
if a block of data is present in the prefetch buffers, at the given
@@ -129,13 +136,14 @@ class SFTPFile (BufferedFile):
# it's not here
return None
return index
-
+
def _read_prefetch(self, size):
"""
read data out of the prefetch buffer, if possible. if the data isn't
in the buffer, return None. otherwise, behaves like a normal read.
"""
- # while not closed, and haven't fetched past the current position, and haven't reached EOF...
+ # while not closed, and haven't fetched past the current position,
+ # and haven't reached EOF...
while True:
offset = self._data_in_prefetch_buffers(self._realpos)
if offset is not None:
@@ -149,7 +157,7 @@ class SFTPFile (BufferedFile):
return None
prefetch = self._prefetch_data[offset]
del self._prefetch_data[offset]
-
+
buf_offset = self._realpos - offset
if buf_offset > 0:
self._prefetch_data[offset] = prefetch[:buf_offset]
@@ -158,14 +166,19 @@ class SFTPFile (BufferedFile):
self._prefetch_data[self._realpos + size] = prefetch[size:]
prefetch = prefetch[:size]
return prefetch
-
+
def _read(self, size):
size = min(size, self.MAX_REQUEST_SIZE)
if self._prefetching:
data = self._read_prefetch(size)
if data is not None:
return data
- t, msg = self.sftp._request(CMD_READ, self.handle, long(self._realpos), int(size))
+ t, msg = self.sftp._request(
+ CMD_READ,
+ self.handle,
+ long(self._realpos),
+ int(size)
+ )
if t != CMD_DATA:
raise SFTPError('Expected data')
return msg.get_string()
@@ -173,8 +186,18 @@ class SFTPFile (BufferedFile):
def _write(self, data):
# may write less than requested if it would exceed max packet size
chunk = min(len(data), self.MAX_REQUEST_SIZE)
- self._reqs.append(self.sftp._async_request(type(None), CMD_WRITE, self.handle, long(self._realpos), data[:chunk]))
- if not self.pipelined or (len(self._reqs) > 100 and self.sftp.sock.recv_ready()):
+ sftp_async_request = self.sftp._async_request(
+ type(None),
+ CMD_WRITE,
+ self.handle,
+ long(self._realpos),
+ data[:chunk]
+ )
+ self._reqs.append(sftp_async_request)
+ if (
+ not self.pipelined or
+ (len(self._reqs) > 100 and self.sftp.sock.recv_ready())
+ ):
while len(self._reqs):
req = self._reqs.popleft()
t, msg = self.sftp._read_response(req)
@@ -217,7 +240,22 @@ class SFTPFile (BufferedFile):
"""
self.sftp.sock.setblocking(blocking)
+ def seekable(self):
+ """
+ Check if the file supports random access.
+
+ :return:
+ `True` if the file supports random access. If `False`,
+ :meth:`seek` will raise an exception
+ """
+ return True
+
def seek(self, offset, whence=0):
+ """
+ Set the file's current position.
+
+ See `file.seek` for details.
+ """
self.flush()
if whence == self.SEEK_SET:
self._realpos = self._pos = offset
@@ -234,7 +272,8 @@ class SFTPFile (BufferedFile):
exactly like `.SFTPClient.stat`, except that it operates on an
already-open file.
- :return: an `.SFTPAttributes` object containing attributes about this file.
+ :returns:
+ an `.SFTPAttributes` object containing attributes about this file.
"""
t, msg = self.sftp._request(CMD_FSTAT, self.handle)
if t != CMD_ATTRS:
@@ -253,7 +292,7 @@ class SFTPFile (BufferedFile):
attr = SFTPAttributes()
attr.st_mode = mode
self.sftp._request(CMD_FSETSTAT, self.handle, attr)
-
+
def chown(self, uid, gid):
"""
Change the owner (``uid``) and group (``gid``) of this file. As with
@@ -264,7 +303,9 @@ class SFTPFile (BufferedFile):
:param int uid: new owner's uid
:param int gid: new group id
"""
- self.sftp._log(DEBUG, 'chown(%s, %r, %r)' % (hexlify(self.handle), uid, gid))
+ self.sftp._log(
+ DEBUG,
+ 'chown(%s, %r, %r)' % (hexlify(self.handle), uid, gid))
attr = SFTPAttributes()
attr.st_uid, attr.st_gid = uid, gid
self.sftp._request(CMD_FSETSTAT, self.handle, attr)
@@ -272,11 +313,11 @@ class SFTPFile (BufferedFile):
def utime(self, times):
"""
Set the access and modified times of this file. If
- ``times`` is ``None``, then the file's access and modified times are set
- to the current time. Otherwise, ``times`` must be a 2-tuple of numbers,
- of the form ``(atime, mtime)``, which is used to set the access and
- modified times, respectively. This bizarre API is mimicked from Python
- for the sake of consistency -- I apologize.
+ ``times`` is ``None``, then the file's access and modified times are
+ set to the current time. Otherwise, ``times`` must be a 2-tuple of
+ numbers, of the form ``(atime, mtime)``, which is used to set the
+ access and modified times, respectively. This bizarre API is mimicked
+ from Python for the sake of consistency -- I apologize.
:param tuple times:
``None`` or a tuple of (access time, modified time) in standard
@@ -294,25 +335,26 @@ class SFTPFile (BufferedFile):
Change the size of this file. This usually extends
or shrinks the size of the file, just like the ``truncate()`` method on
Python file objects.
-
+
:param size: the new size of the file
- :type size: int or long
"""
- self.sftp._log(DEBUG, 'truncate(%s, %r)' % (hexlify(self.handle), size))
+ self.sftp._log(
+ DEBUG,
+ 'truncate(%s, %r)' % (hexlify(self.handle), size))
attr = SFTPAttributes()
attr.st_size = size
self.sftp._request(CMD_FSETSTAT, self.handle, attr)
-
+
def check(self, hash_algorithm, offset=0, length=0, block_size=0):
"""
Ask the server for a hash of a section of this file. This can be used
to verify a successful upload or download, or for various rsync-like
operations.
-
- The file is hashed from ``offset``, for ``length`` bytes. If ``length``
- is 0, the remainder of the file is hashed. Thus, if both ``offset``
- and ``length`` are zero, the entire file is hashed.
-
+
+ The file is hashed from ``offset``, for ``length`` bytes.
+ If ``length`` is 0, the remainder of the file is hashed. Thus, if both
+ ``offset`` and ``length`` are zero, the entire file is hashed.
+
Normally, ``block_size`` will be 0 (the default), and this method will
return a byte string representing the requested hash (for example, a
string of length 16 for MD5, or 20 for SHA-1). If a non-zero
@@ -320,45 +362,43 @@ class SFTPFile (BufferedFile):
``offset + length``) of ``block_size`` bytes is computed as a separate
hash. The hash results are all concatenated and returned as a single
string.
-
+
For example, ``check('sha1', 0, 1024, 512)`` will return a string of
length 40. The first 20 bytes will be the SHA-1 of the first 512 bytes
of the file, and the last 20 bytes will be the SHA-1 of the next 512
bytes.
-
+
:param str hash_algorithm:
the name of the hash algorithm to use (normally ``"sha1"`` or
``"md5"``)
:param offset:
offset into the file to begin hashing (0 means to start from the
beginning)
- :type offset: int or long
:param length:
number of bytes to hash (0 means continue to the end of the file)
- :type length: int or long
:param int block_size:
number of bytes to hash per result (must not be less than 256; 0
means to compute only one hash of the entire segment)
- :type block_size: int
:return:
`str` of bytes representing the hash of each block, concatenated
together
-
- :raises IOError: if the server doesn't support the "check-file"
- extension, or possibly doesn't support the hash algorithm
- requested
-
+
+ :raises:
+ ``IOError`` -- if the server doesn't support the "check-file"
+ extension, or possibly doesn't support the hash algorithm requested
+
.. note:: Many (most?) servers don't support this extension yet.
-
+
.. versionadded:: 1.4
"""
- t, msg = self.sftp._request(CMD_EXTENDED, 'check-file', self.handle,
- hash_algorithm, long(offset), long(length), block_size)
- ext = msg.get_text()
- alg = msg.get_text()
+ t, msg = self.sftp._request(
+ CMD_EXTENDED, 'check-file', self.handle,
+ hash_algorithm, long(offset), long(length), block_size)
+ msg.get_text() # ext
+ msg.get_text() # alg
data = msg.get_remainder()
return data
-
+
def set_pipelined(self, pipelined=True):
"""
Turn on/off the pipelining of write operations to this file. When
@@ -368,55 +408,69 @@ class SFTPFile (BufferedFile):
server responses are collected. This means that if there was an error
with one of your later writes, an exception might be thrown from within
`.close` instead of `.write`.
-
+
By default, files are not pipelined.
-
+
:param bool pipelined:
``True`` if pipelining should be turned on for this file; ``False``
otherwise
-
+
.. versionadded:: 1.5
"""
self.pipelined = pipelined
-
- def prefetch(self):
+
+ def prefetch(self, file_size=None):
"""
Pre-fetch the remaining contents of this file in anticipation of future
`.read` calls. If reading the entire file, pre-fetching can
dramatically improve the download speed by avoiding roundtrip latency.
The file's contents are incrementally buffered in a background thread.
-
+
The prefetched data is stored in a buffer until read via the `.read`
method. Once data has been read, it's removed from the buffer. The
data may be read in a random order (using `.seek`); chunks of the
buffer that haven't been read will continue to be buffered.
+ :param int file_size:
+ When this is ``None`` (the default), this method calls `stat` to
+ determine the remote file size. In some situations, doing so can
+ cause exceptions or hangs (see `#562
+ <https://github.com/paramiko/paramiko/pull/562>`_); as a
+ workaround, one may call `stat` explicitly and pass its value in
+ via this parameter.
+
.. versionadded:: 1.5.1
+ .. versionchanged:: 1.16.0
+ The ``file_size`` parameter was added (with no default value).
+ .. versionchanged:: 1.16.1
+ The ``file_size`` parameter was made optional for backwards
+ compatibility.
"""
- size = self.stat().st_size
+ if file_size is None:
+ file_size = self.stat().st_size
+
# queue up async reads for the rest of the file
chunks = []
n = self._realpos
- while n < size:
- chunk = min(self.MAX_REQUEST_SIZE, size - n)
+ while n < file_size:
+ chunk = min(self.MAX_REQUEST_SIZE, file_size - n)
chunks.append((n, chunk))
n += chunk
if len(chunks) > 0:
self._start_prefetch(chunks)
-
+
def readv(self, chunks):
"""
Read a set of blocks from the file by (offset, length). This is more
efficient than doing a series of `.seek` and `.read` calls, since the
prefetch machinery is used to retrieve all the requested blocks at
once.
-
+
:param chunks:
- a list of (offset, length) tuples indicating which sections of the
- file to read
- :type chunks: list(tuple(long, int))
+ a list of ``(offset, length)`` tuples indicating which sections of
+ the file to read
:return: a list of blocks read, in the same order as in ``chunks``
-
+
.. versionadded:: 1.5.4
"""
self.sftp._log(DEBUG, 'readv(%s, %r)' % (hexlify(self.handle), chunks))
@@ -424,7 +478,10 @@ class SFTPFile (BufferedFile):
read_chunks = []
for offset, size in chunks:
# don't fetch data that's already in the prefetch buffer
- if self._data_in_prefetch_buffers(offset) or self._data_in_prefetch_requests(offset, size):
+ if (
+ self._data_in_prefetch_buffers(offset) or
+ self._data_in_prefetch_requests(offset, size)
+ ):
continue
# break up anything larger than the max read size
@@ -440,7 +497,7 @@ class SFTPFile (BufferedFile):
self.seek(x[0])
yield self.read(x[1])
- ### internals...
+ # ...internals...
def _get_size(self):
try:
@@ -455,13 +512,18 @@ class SFTPFile (BufferedFile):
t = threading.Thread(target=self._prefetch_thread, args=(chunks,))
t.setDaemon(True)
t.start()
-
+
def _prefetch_thread(self, chunks):
# do these read requests in a temporary thread because there may be
# a lot of them, so it may block.
for offset, length in chunks:
+ num = self.sftp._async_request(
+ self,
+ CMD_READ,
+ self.handle,
+ long(offset),
+ int(length))
with self._prefetch_lock:
- num = self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length))
self._prefetch_extents[num] = (offset, length)
def _async_response(self, t, msg, num):
@@ -475,13 +537,17 @@ class SFTPFile (BufferedFile):
if t != CMD_DATA:
raise SFTPError('Expected data')
data = msg.get_string()
- with self._prefetch_lock:
- offset, length = self._prefetch_extents[num]
- self._prefetch_data[offset] = data
- del self._prefetch_extents[num]
- if len(self._prefetch_extents) == 0:
- self._prefetch_done = True
-
+ while True:
+ with self._prefetch_lock:
+ # spin if in race with _prefetch_thread
+ if num in self._prefetch_extents:
+ offset, length = self._prefetch_extents[num]
+ self._prefetch_data[offset] = data
+ del self._prefetch_extents[num]
+ if len(self._prefetch_extents) == 0:
+ self._prefetch_done = True
+ break
+
def _check_exception(self):
"""if there's a saved exception, raise & clear it"""
if self._saved_exception is not None: