summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--paramiko/buffered_pipe.py168
-rw-r--r--tests/test_buffered_pipe.py71
2 files changed, 239 insertions, 0 deletions
diff --git a/paramiko/buffered_pipe.py b/paramiko/buffered_pipe.py
new file mode 100644
index 00000000..8e1aacd0
--- /dev/null
+++ b/paramiko/buffered_pipe.py
@@ -0,0 +1,168 @@
+# Copyright (C) 2006 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Attempt to generalize the "feeder" part of a Channel: an object which can be
+read from and closed, but is reading from a buffer fed by another thread. The
+read operations are blocking and can have a timeout set.
+"""
+
+import array
+import threading
+import time
+
+
+class PipeTimeout (IOError):
+ """
+ Indicates that a timeout was reached on a read from a L{BufferedPipe}.
+ """
+ pass
+
+
+class BufferedPipe (object):
+ """
+ A buffer that obeys normal read (with timeout) & close semantics for a
+ file or socket, but is fed data from another thread. This is used by
+ L{Channel}.
+ """
+
+ def __init__(self):
+ self._lock = threading.Lock()
+ self._cv = threading.Condition(self._lock)
+ self._event = None
+ self._buffer = array.array('B')
+ self._closed = False
+
+ def feed(self, data):
+ """
+ Feed new data into this pipe. This method is assumed to be called
+ from a separate thread, so synchronization is done.
+
+ @param data: the data to add
+ @type data: str
+ """
+ self._lock.acquire()
+ try:
+ if self._event is not None:
+ self._event.set()
+ self._buffer.fromstring(data)
+ self._cv.notifyAll()
+ finally:
+ self._lock.release()
+
+ def read_ready(self):
+ """
+ Returns true if data is buffered and ready to be read from this
+ feeder. A C{False} result does not mean that the feeder has closed;
+ it means you may need to wait before more data arrives.
+
+ @return: C{True} if a L{read} call would immediately return at least
+ one byte; C{False} otherwise.
+ @rtype: bool
+ """
+ self._lock.acquire()
+ try:
+ if len(self._buffer) == 0:
+ return False
+ return True
+ finally:
+ self._lock.release()
+
+ def read(self, nbytes, timeout=None):
+ """
+ Read data from the pipe. The return value is a string representing
+ the data received. The maximum amount of data to be received at once
+ is specified by C{nbytes}. If a string of length zero is returned,
+ the pipe has been closed.
+
+ The optional C{timeout} argument can be a nonnegative float expressing
+ seconds, or C{None} for no timeout. If a float is given, a
+ C{PipeTimeout} will be raised if the timeout period value has
+ elapsed before any data arrives.
+
+ @param nbytes: maximum number of bytes to read
+ @type nbytes: int
+ @param timeout: maximum seconds to wait (or C{None}, the default, to
+ wait forever)
+ @type timeout: float
+ @return: data
+ @rtype: str
+
+ @raise PipeTimeout: if a timeout was set via L{settimeout} and no
+ data was ready before that timeout
+ """
+ out = ''
+ self._lock.acquire()
+ try:
+ if len(self._buffer) == 0:
+ if self._closed:
+ return out
+ # should we block?
+ if timeout == 0.0:
+ raise PipeTimeout()
+ # loop here in case we get woken up but a different thread has
+ # grabbed everything in the buffer.
+ while (len(self._buffer) == 0) and not self._closed:
+ then = time.time()
+ self._cv.wait(timeout)
+ if timeout is not None:
+ timeout -= time.time() - then
+ if timeout <= 0.0:
+ raise PipeTimeout()
+
+ # something's in the buffer and we have the lock!
+ if len(self._buffer) <= nbytes:
+ out = self._buffer.tostring()
+ del self._buffer[:]
+ if (self._event is not None) and not self._closed:
+ self._event.clear()
+ else:
+ out = self._buffer[:nbytes].tostring()
+ del self.in_buffer[:nbytes]
+ finally:
+ self._lock.release()
+
+ return out
+
+ def close(self):
+ """
+ Close this pipe object. Future calls to L{read} after the buffer
+ has been emptied will return immediately with an empty string.
+ """
+ self._lock.acquire()
+ try:
+ self._closed = True
+ self._cv.notifyAll()
+ if self._event is not None:
+ self._event.set()
+ finally:
+ self._lock.release()
+
+ def __len__(self):
+ """
+ Return the number of bytes buffered.
+
+ @return: number of bytes bufferes
+ @rtype: int
+ """
+ self._lock.acquire()
+ try:
+ return len(self._buffer)
+ finally:
+ self._lock.release()
+
diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py
new file mode 100644
index 00000000..8e4a4282
--- /dev/null
+++ b/tests/test_buffered_pipe.py
@@ -0,0 +1,71 @@
+# Copyright (C) 2006 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Some unit tests for BufferedPipe.
+"""
+
+import threading
+import time
+import unittest
+from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
+
+
+def delay_thread(pipe):
+ pipe.feed('a')
+ time.sleep(0.5)
+ pipe.feed('b')
+ pipe.close()
+
+
+def close_thread(pipe):
+ time.sleep(0.2)
+ pipe.close()
+
+
+class BufferedPipeTest (unittest.TestCase):
+
+ def test_1_buffered_pipe(self):
+ p = BufferedPipe()
+ self.assert_(not p.read_ready())
+ p.feed('hello.')
+ self.assert_(p.read_ready())
+ data = p.read(6)
+ self.assertEquals('hello.', data)
+ p.close()
+ self.assert_(not p.read_ready())
+ self.assertEquals('', p.read(1))
+
+ def test_2_delay(self):
+ p = BufferedPipe()
+ self.assert_(not p.read_ready())
+ threading.Thread(target=delay_thread, args=(p,)).start()
+ self.assertEquals('a', p.read(1, 0.1))
+ try:
+ p.read(1, 0.1)
+ self.assert_(False)
+ except PipeTimeout:
+ pass
+ self.assertEquals('b', p.read(1, 0.5))
+ self.assertEquals('', p.read(1))
+
+ def test_3_close_while_reading(self):
+ p = BufferedPipe()
+ threading.Thread(target=close_thread, args=(p,)).start()
+ data = p.read(1, 1.0)
+ self.assertEquals('', data)