summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorRobey Pointer <robey@lag.net>2007-10-28 20:03:44 -0700
committerRobey Pointer <robey@lag.net>2007-10-28 20:03:44 -0700
commite3d9b90ea1f25792e652a0809b37b52635243932 (patch)
treec6b99f4940f2ceec7941500839d860119d0609fb
parent80b9e289cef4fc0939141ccf15751017a58a1637 (diff)
[project @ robey@lag.net-20071029030344-9adfzb9ulfodtepu]
bug 157205: select() doesn't notify incoming stderr data, because stderr's pipe isn't hooked up to the fileno() BufferedPipe. to fix, i added an "or" pipe-event that can be triggered by either stdout or stderr, and hooked them both up to fileno(). added a unit test for the bug and one for the "or" pipe.
-rw-r--r--paramiko/channel.py10
-rw-r--r--paramiko/pipe.py38
-rw-r--r--tests/test_buffered_pipe.py15
-rw-r--r--tests/test_transport.py38
4 files changed, 95 insertions, 6 deletions
diff --git a/paramiko/channel.py b/paramiko/channel.py
index 34624655..052e487e 100644
--- a/paramiko/channel.py
+++ b/paramiko/channel.py
@@ -782,14 +782,14 @@ class Channel (object):
def fileno(self):
"""
Returns an OS-level file descriptor which can be used for polling, but
- but I{not} for reading or writing). This is primaily to allow python's
+ but I{not} for reading or writing. This is primaily to allow python's
C{select} module to work.
The first time C{fileno} is called on a channel, a pipe is created to
simulate real OS-level file descriptor (FD) behavior. Because of this,
two OS-level FDs are created, which will use up FDs faster than normal.
- You won't notice this effect unless you open hundreds or thousands of
- channels simultaneously, but it's still notable.
+ (You won't notice this effect unless you have hundreds of channels
+ open at the same time.)
@return: an OS-level file descriptor
@rtype: int
@@ -803,7 +803,9 @@ class Channel (object):
return self._pipe.fileno()
# create the pipe and feed in any existing data
self._pipe = pipe.make_pipe()
- self.in_buffer.set_event(self._pipe)
+ p1, p2 = pipe.make_or_pipe(self._pipe)
+ self.in_buffer.set_event(p1)
+ self.in_stderr_buffer.set_event(p2)
return self._pipe.fileno()
finally:
self.lock.release()
diff --git a/paramiko/pipe.py b/paramiko/pipe.py
index d71ca167..1cfed2d0 100644
--- a/paramiko/pipe.py
+++ b/paramiko/pipe.py
@@ -19,6 +19,9 @@
"""
Abstraction of a one-way pipe where the read end can be used in select().
Normally this is trivial, but Windows makes it nearly impossible.
+
+The pipe acts like an Event, which can be set or cleared. When set, the pipe
+will trigger as readable in select().
"""
import sys
@@ -57,7 +60,7 @@ class PosixPipe (object):
self._set = False
def set (self):
- if self._set:
+ if self._set or self._closed:
return
self._set = True
os.write(self._wfd, '*')
@@ -103,7 +106,7 @@ class WindowsPipe (object):
self._set = False
def set (self):
- if self._set:
+ if self._set or self._closed:
return
self._set = True
self._wsock.send('*')
@@ -111,3 +114,34 @@ class WindowsPipe (object):
def set_forever (self):
self._forever = True
self.set()
+
+
+class OrPipe (object):
+ def __init__(self, pipe):
+ self._set = False
+ self._partner = None
+ self._pipe = pipe
+
+ def set(self):
+ self._set = True
+ if not self._partner._set:
+ self._pipe.set()
+
+ def clear(self):
+ self._set = False
+ if not self._partner._set:
+ self._pipe.clear()
+
+
+def make_or_pipe(pipe):
+ """
+ wraps a pipe into two pipe-like objects which are "or"d together to
+ affect the real pipe. if either returned pipe is set, the wrapped pipe
+ is set. when both are cleared, the wrapped pipe is cleared.
+ """
+ p1 = OrPipe(pipe)
+ p2 = OrPipe(pipe)
+ p1._partner = p2
+ p2._partner = p1
+ return p1, p2
+
diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py
index ac123527..bef8fb8a 100644
--- a/tests/test_buffered_pipe.py
+++ b/tests/test_buffered_pipe.py
@@ -24,6 +24,7 @@ import threading
import time
import unittest
from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
+from paramiko import pipe
def delay_thread(pipe):
@@ -75,3 +76,17 @@ class BufferedPipeTest (unittest.TestCase):
threading.Thread(target=close_thread, args=(p,)).start()
data = p.read(1, 1.0)
self.assertEquals('', data)
+
+ def test_4_or_pipe(self):
+ p = pipe.make_pipe()
+ p1, p2 = pipe.make_or_pipe(p)
+ self.assertFalse(p._set)
+ p1.set()
+ self.assertTrue(p._set)
+ p2.set()
+ self.assertTrue(p._set)
+ p1.clear()
+ self.assertTrue(p._set)
+ p2.clear()
+ self.assertFalse(p._set)
+
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 53e69690..6aaf7386 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -639,3 +639,41 @@ class TransportTest (unittest.TestCase):
self.tc.cancel_port_forward('', port)
self.assertTrue(self.server._listen is None)
+ def test_K_stderr_select(self):
+ """
+ verify that select() on a channel works even if only stderr is
+ receiving data.
+ """
+ self.setup_test_server()
+ chan = self.tc.open_session()
+ chan.invoke_shell()
+ schan = self.ts.accept(1.0)
+
+ # nothing should be ready
+ r, w, e = select.select([chan], [], [], 0.1)
+ self.assertEquals([], r)
+ self.assertEquals([], w)
+ self.assertEquals([], e)
+
+ schan.send_stderr('hello\n')
+
+ # something should be ready now (give it 1 second to appear)
+ for i in range(10):
+ r, w, e = select.select([chan], [], [], 0.1)
+ if chan in r:
+ break
+ time.sleep(0.1)
+ self.assertEquals([chan], r)
+ self.assertEquals([], w)
+ self.assertEquals([], e)
+
+ self.assertEquals('hello\n', chan.recv_stderr(6))
+
+ # and, should be dead again now
+ r, w, e = select.select([chan], [], [], 0.1)
+ self.assertEquals([], r)
+ self.assertEquals([], w)
+ self.assertEquals([], e)
+
+ schan.close()
+ chan.close()