summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--paramiko/sftp_file.py64
-rw-r--r--tests/test_sftp_big.py74
2 files changed, 111 insertions, 27 deletions
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py
index 05c10458..3673e865 100644
--- a/paramiko/sftp_file.py
+++ b/paramiko/sftp_file.py
@@ -46,7 +46,6 @@ class SFTPFile (BufferedFile):
self.pipelined = False
self._prefetching = False
self._prefetch_done = False
- self._prefetch_so_far = 0
self._prefetch_data = {}
self._prefetch_reads = []
self._saved_exception = None
@@ -84,37 +83,45 @@ class SFTPFile (BufferedFile):
# may have outlived the Transport connection
pass
+ def _data_in_prefetch_buffers(self, offset):
+ """
+ if a block of data is present in the prefetch buffers, at the given
+ offset, return the offset of the relevant prefetch buffer. otherwise,
+ return None. this guarantees nothing about the number of bytes
+ collected in the prefetch buffer so far.
+ """
+ k = [i for i in self._prefetch_data.keys() if i <= offset]
+ if len(k) == 0:
+ return None
+ index = max(k)
+ buf_offset = offset - index
+ if buf_offset >= len(self._prefetch_data[index]):
+ # it's not here
+ return None
+ return index
+
def _read_prefetch(self, size):
"""
read data out of the prefetch buffer, if possible. if the data isn't
in the buffer, return None. otherwise, behaves like a normal read.
"""
# while not closed, and haven't fetched past the current position, and haven't reached EOF...
- while (self._prefetch_so_far <= self._realpos) and not self._closed:
- if self._prefetch_done:
- return None
+ while not self._prefetch_done and not self._closed:
+ offset = self._data_in_prefetch_buffers(self._realpos)
+ if offset is not None:
+ break
self.sftp._read_response()
self._check_exception()
- k = self._prefetch_data.keys()
- if len(k) == 0:
+ if offset is None:
self._prefetching = False
- return ''
-
- # find largest offset < realpos
- pos_list = [i for i in k if i <= self._realpos]
- if len(pos_list) == 0:
return None
- index = max(pos_list)
- prefetch = self._prefetch_data[index]
- del self._prefetch_data[index]
+ prefetch = self._prefetch_data[offset]
+ del self._prefetch_data[offset]
- buf_offset = self._realpos - index
+ buf_offset = self._realpos - offset
if buf_offset > 0:
- self._prefetch_data[index] = prefetch[:buf_offset]
+ self._prefetch_data[offset] = prefetch[:buf_offset]
prefetch = prefetch[buf_offset:]
- if buf_offset >= len(prefetch):
- # it's not here.
- return None
if size < len(prefetch):
self._prefetch_data[self._realpos + size] = prefetch[size:]
prefetch = prefetch[:size]
@@ -384,10 +391,22 @@ class SFTPFile (BufferedFile):
# put the offsets in order, since we depend on that for determining
# when the reads have finished.
self.sftp._log(DEBUG, 'readv(%s, %r)' % (util.hexify(self.handle), chunks))
- # FIXME: if prefetch() was already called (not readv), don't prefetch.
ordered_chunks = list(chunks)
ordered_chunks.sort(lambda x, y: cmp(x[0], y[0]))
- self._start_prefetch(ordered_chunks)
+
+ # break up anything larger than the max read size
+ if len([size for offset, size in ordered_chunks if size > self.MAX_REQUEST_SIZE]) > 0:
+ read_chunks = []
+ for offset, size in ordered_chunks:
+ while size > 0:
+ chunk_size = min(size, self.MAX_REQUEST_SIZE)
+ read_chunks.append((offset, chunk_size))
+ offset += chunk_size
+ size -= chunk_size
+ else:
+ read_chunks = ordered_chunks
+
+ self._start_prefetch(read_chunks)
# now we can just devolve to a bunch of read()s :)
for x in chunks:
self.seek(x[0])
@@ -406,8 +425,6 @@ class SFTPFile (BufferedFile):
def _start_prefetch(self, chunks):
self._prefetching = True
self._prefetch_done = False
- self._prefetch_so_far = chunks[0][0]
- self._prefetch_data = {}
self._prefetch_reads.extend(chunks)
t = threading.Thread(target=self._prefetch_thread, args=(chunks,))
@@ -434,7 +451,6 @@ class SFTPFile (BufferedFile):
offset, length = self._prefetch_reads.pop(0)
assert length == len(data)
self._prefetch_data[offset] = data
- self._prefetch_so_far = offset + length
if len(self._prefetch_reads) == 0:
self._prefetch_done = True
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index 94edab02..b3f214cb 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -272,8 +272,76 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write(' ')
finally:
sftp.remove('%s/hongry.txt' % FOLDER)
-
- def test_7_big_file_big_buffer(self):
+
+ def test_7_prefetch_readv(self):
+ """
+ verify that prefetch and readv don't conflict with each other.
+ """
+ sftp = get_sftp()
+ kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
+ try:
+ f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
+ f.set_pipelined(True)
+ for n in range(1024):
+ f.write(kblob)
+ if n % 128 == 0:
+ sys.stderr.write('.')
+ f.close()
+ sys.stderr.write(' ')
+
+ self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+
+ f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
+ f.prefetch()
+ data = f.read(1024)
+ self.assertEqual(data, kblob)
+
+ chunk_size = 793
+ base_offset = 512 * 1024
+ k2blob = kblob + kblob
+ chunks = [(base_offset + (chunk_size * i), chunk_size) for i in range(20)]
+ for data in f.readv(chunks):
+ offset = base_offset % 1024
+ self.assertEqual(chunk_size, len(data))
+ self.assertEqual(k2blob[offset:offset + chunk_size], data)
+ base_offset += chunk_size
+
+ f.close()
+ sys.stderr.write(' ')
+ finally:
+ sftp.remove('%s/hongry.txt' % FOLDER)
+
+ def test_8_large_readv(self):
+ """
+ verify that a very large readv is broken up correctly and still
+ returned as a single blob.
+ """
+ sftp = get_sftp()
+ kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
+ try:
+ f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
+ f.set_pipelined(True)
+ for n in range(1024):
+ f.write(kblob)
+ if n % 128 == 0:
+ sys.stderr.write('.')
+ f.close()
+ sys.stderr.write(' ')
+
+ self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+
+ f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
+ data = list(f.readv([(23 * 1024, 128 * 1024)]))
+ self.assertEqual(1, len(data))
+ data = data[0]
+ self.assertEqual(128 * 1024, len(data))
+
+ f.close()
+ sys.stderr.write(' ')
+ finally:
+ sftp.remove('%s/hongry.txt' % FOLDER)
+
+ def test_9_big_file_big_buffer(self):
"""
write a 1MB file, with no linefeeds, and a big buffer.
"""
@@ -288,7 +356,7 @@ class BigSFTPTest (unittest.TestCase):
finally:
sftp.remove('%s/hongry.txt' % FOLDER)
- def test_8_big_file_renegotiate(self):
+ def test_A_big_file_renegotiate(self):
"""
write a 1MB file, forcing key renegotiation in the middle.
"""