summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorRobey Pointer <robey@lag.net>2006-03-09 18:22:34 -0800
committerRobey Pointer <robey@lag.net>2006-03-09 18:22:34 -0800
commit862e1f48e32f874ee489002b067f175b3a851644 (patch)
treedba0642244ed31051fc1c3f47d71aa3b34436b3c
parenta98c5cf627071292de92d6a0b3e61ab916308c21 (diff)
[project @ robey@lag.net-20060310022234-c183ad0b3f31eb28]
generalize the prefetching a bit so that it can be reused for a readv method
-rw-r--r--paramiko/sftp_file.py64
1 files changed, 45 insertions, 19 deletions
diff --git a/paramiko/sftp_file.py b/paramiko/sftp_file.py
index 7d9b8f47..f1321018 100644
--- a/paramiko/sftp_file.py
+++ b/paramiko/sftp_file.py
@@ -46,7 +46,6 @@ class SFTPFile (BufferedFile):
self._prefetching = False
self._prefetch_done = False
self._prefetch_so_far = 0
- self._prefetch_size = 0
self._prefetch_data = {}
self._saved_exception = None
@@ -89,8 +88,7 @@ class SFTPFile (BufferedFile):
in the buffer, return None. otherwise, behaves like a normal read.
"""
# while not closed, and haven't fetched past the current position, and haven't reached EOF...
- while (self._prefetch_so_far <= self._realpos) and \
- (self._prefetch_so_far < self._prefetch_size) and not self._closed:
+ while (self._prefetch_so_far <= self._realpos) and not self._closed:
if self._prefetch_done:
return None
self.sftp._read_response()
@@ -348,27 +346,36 @@ class SFTPFile (BufferedFile):
dramatically improve the download speed by avoiding roundtrip latency.
The file's contents are incrementally buffered in a background thread.
+ The prefetched data is stored in a buffer until read via the L{read}
+ method. Once data has been read, it's removed from the buffer. The
+ data may be read in a random order (using L{seek}); chunks of the
+ buffer that haven't been read will continue to be buffered.
+
@since: 1.5.1
"""
size = self.stat().st_size
# queue up async reads for the rest of the file
- self._prefetching = True
- self._prefetch_done = False
- self._prefetch_so_far = self._realpos
- self._prefetch_size = size
- self._prefetch_data = {}
- t = threading.Thread(target=self._prefetch)
- t.setDaemon(True)
- t.start()
-
- def _prefetch(self):
+ chunks = []
n = self._realpos
- size = self._prefetch_size
while n < size:
chunk = min(self.MAX_REQUEST_SIZE, size - n)
- self.sftp._async_request(self, CMD_READ, self.handle, long(n), int(chunk))
+ chunks.append((n, chunk))
n += chunk
-
+ self._start_prefetch(chunks)
+
+ def readv(self, chunks):
+ # put the offsets in order, since we depend on that for determining
+ # when the reads have finished.
+ ordered_chunks = chunks[:]
+ ordered_chunks.sort(lambda x, y: cmp(x[0], y[0]))
+ self._start_prefetch(ordered_chunks)
+ # now we can just devolve to a bunch of read()s :)
+ out = []
+ for x in chunks:
+ self.seek(x[0])
+ out.append(self.read(x[1]))
+ return out
+
### internals...
@@ -379,6 +386,23 @@ class SFTPFile (BufferedFile):
except:
return 0
+ def _start_prefetch(self, chunks):
+ self._prefetching = True
+ self._prefetch_done = False
+ self._prefetch_so_far = chunks[0][0]
+ self._prefetch_data = {}
+ self._prefetch_reads = chunks
+
+ t = threading.Thread(target=self._prefetch_thread)
+ t.setDaemon(True)
+ t.start()
+
+ def _prefetch_thread(self):
+ # do these read requests in a temporary thread because there may be
+ # a lot of them, so it may block.
+ for offset, length in self._prefetch_reads:
+ self.sftp._async_request(self, CMD_READ, self.handle, long(offset), int(length))
+
def _async_response(self, t, msg):
if t == CMD_STATUS:
# save exception and re-raise it on next file operation
@@ -390,9 +414,11 @@ class SFTPFile (BufferedFile):
if t != CMD_DATA:
raise SFTPError('Expected data')
data = msg.get_string()
- self._prefetch_data[self._prefetch_so_far] = data
- self._prefetch_so_far += len(data)
- if self._prefetch_so_far == self._prefetch_size:
+ offset, length = self._prefetch_reads.pop(0)
+ assert length == len(data)
+ self._prefetch_data[offset] = data
+ self._prefetch_so_far = offset + length
+ if len(self._prefetch_reads) == 0:
self._prefetch_done = True
def _check_exception(self):