summaryrefslogtreecommitdiffhomepage
path: root/channel.py
diff options
context:
space:
mode:
authorRobey Pointer <robey@lag.net>2003-11-04 08:34:24 +0000
committerRobey Pointer <robey@lag.net>2003-11-04 08:34:24 +0000
commit51607386c7609a483568ad935083c9668fe6241b (patch)
tree46b1083cfbd387fd181cc8fbef2ce77f837a3bd6 /channel.py
[project @ Arch-1:robey@lag.net--2003-public%secsh--dev--1.0--base-0]
initial import (automatically generated log message)
Diffstat (limited to 'channel.py')
-rw-r--r--channel.py608
1 files changed, 608 insertions, 0 deletions
diff --git a/channel.py b/channel.py
new file mode 100644
index 00000000..275c0a26
--- /dev/null
+++ b/channel.py
@@ -0,0 +1,608 @@
+from message import Message
+from secsh import SSHException
+from transport import MSG_CHANNEL_REQUEST, MSG_CHANNEL_CLOSE, MSG_CHANNEL_WINDOW_ADJUST, MSG_CHANNEL_DATA, \
+ MSG_CHANNEL_EOF
+
+import time, threading, logging, socket, os
+from logging import DEBUG
+
+
+# this is ugly, and won't work on windows
+def set_nonblocking(fd):
+ import fcntl
+ fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
+
+
+class Channel(object):
+ """
+ Abstraction for a secsh channel.
+ """
+
+ def __init__(self, chanid, transport):
+ self.chanid = chanid
+ self.transport = transport
+ self.active = 0
+ self.eof_received = 0
+ self.eof_sent = 0
+ self.in_buffer = ''
+ self.timeout = None
+ self.closed = 0
+ self.lock = threading.Lock()
+ self.in_buffer_cv = threading.Condition(self.lock)
+ self.out_buffer_cv = threading.Condition(self.lock)
+ self.name = str(chanid)
+ self.logger = logging.getLogger('secsh.chan.' + str(chanid))
+ self.pipe_rfd = self.pipe_wfd = None
+
+ def __repr__(self):
+ out = '<secsh.Channel %d' % self.chanid
+ if self.closed:
+ out += ' (closed)'
+ elif self.active:
+ if self.eof_received:
+ out += ' (EOF received)'
+ if self.eof_sent:
+ out += ' (EOF sent)'
+ out += ' (open) window=%d' % (self.out_window_size)
+ if len(self.in_buffer) > 0:
+ out += ' in-buffer=%d' % (len(self.in_buffer),)
+ out += ' -> ' + repr(self.transport)
+ out += '>'
+ return out
+
+ def log(self, level, msg):
+ self.logger.log(level, msg)
+
+ def set_window(self, window_size, max_packet_size):
+ self.in_window_size = window_size
+ self.in_max_packet_size = max_packet_size
+ # threshold of bytes we receive before we bother to send a window update
+ self.in_window_threshold = window_size // 10
+ self.in_window_sofar = 0
+
+ def set_server_channel(self, chanid, window_size, max_packet_size):
+ self.server_chanid = chanid
+ self.out_window_size = window_size
+ self.out_max_packet_size = max_packet_size
+ self.active = 1
+
+ def request_success(self, m):
+ self.log(DEBUG, 'Sesch channel %d request ok' % self.chanid)
+ return
+
+ def request_failed(self, m):
+ self.close()
+
+ def feed(self, m):
+ s = m.get_string()
+ try:
+ self.lock.acquire()
+ self.log(DEBUG, 'fed %d bytes' % len(s))
+ if self.pipe_wfd != None:
+ self.feed_pipe(s)
+ else:
+ self.in_buffer += s
+ self.in_buffer_cv.notifyAll()
+ self.log(DEBUG, '(out from feed)')
+ finally:
+ self.lock.release()
+
+ def window_adjust(self, m):
+ nbytes = m.get_int()
+ try:
+ self.lock.acquire()
+ self.log(DEBUG, 'window up %d' % nbytes)
+ self.out_window_size += nbytes
+ self.out_buffer_cv.notifyAll()
+ finally:
+ self.lock.release()
+
+ def handle_request(self, m):
+ key = m.get_string()
+ if key == 'exit-status':
+ self.exit_status = m.get_int()
+ return
+ elif key == 'xon-xoff':
+ # ignore
+ return
+ else:
+ self.log(DEBUG, 'Unhandled channel request "%s"' % key)
+
+ def handle_eof(self, m):
+ self.eof_received = 1
+ try:
+ self.lock.acquire()
+ self.in_buffer_cv.notifyAll()
+ if self.pipe_wfd != None:
+ os.close(self.pipe_wfd)
+ self.pipe_wfd = None
+ finally:
+ self.lock.release()
+ self.log(DEBUG, 'EOF received')
+
+ def handle_close(self, m):
+ self.close()
+ try:
+ self.lock.acquire()
+ self.in_buffer_cv.notifyAll()
+ self.out_buffer_cv.notifyAll()
+ if self.pipe_wfd != None:
+ os.close(self.pipe_wfd)
+ self.pipe_wfd = None
+ finally:
+ self.lock.release()
+
+
+ # API for external use
+
+ def get_pty(self, term='vt100', width=80, height=24):
+ if self.closed or self.eof_received or self.eof_sent or not self.active:
+ raise SSHException('Channel is not open')
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_REQUEST))
+ m.add_int(self.server_chanid)
+ m.add_string('pty-req')
+ m.add_boolean(0)
+ m.add_string(term)
+ m.add_int(width)
+ m.add_int(height)
+ # pixel height, width (usually useless)
+ m.add_int(0).add_int(0)
+ m.add_string('')
+ self.transport.send_message(m)
+
+ def invoke_shell(self):
+ if self.closed or self.eof_received or self.eof_sent or not self.active:
+ raise SSHException('Channel is not open')
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_REQUEST))
+ m.add_int(self.server_chanid)
+ m.add_string('shell')
+ m.add_boolean(1)
+ self.transport.send_message(m)
+
+ def exec_command(self, command):
+ if self.closed or self.eof_received or self.eof_sent or not self.active:
+ raise SSHException('Channel is not open')
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_REQUEST))
+ m.add_int(self.server_chanid)
+ m.add_string('exec')
+ m.add_boolean(1)
+ m.add_string(command)
+ self.transport.send_message(m)
+
+ def invoke_subsystem(self, subsystem):
+ if self.closed or self.eof_received or self.eof_sent or not self.active:
+ raise SSHException('Channel is not open')
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_REQUEST))
+ m.add_int(self.server_chanid)
+ m.add_string('subsystem')
+ m.add_boolean(1)
+ m.add_string(subsystem)
+ self.transport.send_message(m)
+
+ def resize_pty(self, width=80, height=24):
+ if self.closed or self.eof_received or self.eof_sent or not self.active:
+ raise SSHException('Channel is not open')
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_REQUEST))
+ m.add_int(self.server_chanid)
+ m.add_string('window-change')
+ m.add_boolean(0)
+ m.add_int(width)
+ m.add_int(height)
+ m.add_int(0).add_int(0)
+ self.transport.send_message(m)
+
+ def get_transport(self):
+ return self.transport
+
+ def set_name(self, name):
+ self.name = name
+ self.logger = logging.getLogger('secsh.chan.' + name)
+
+ def get_name(self):
+ return self.name
+
+ def send_eof(self):
+ if self.eof_sent:
+ return
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_EOF))
+ m.add_int(self.server_chanid)
+ self.transport.send_message(m)
+ self.eof_sent = 1
+ self.log(DEBUG, 'EOF sent')
+ return
+
+
+ # socket equivalency methods...
+
+ def settimeout(self, timeout):
+ self.timeout = timeout
+
+ def gettimeout(self):
+ return self.timeout
+
+ def setblocking(self, blocking):
+ if blocking:
+ self.settimeout(None)
+ else:
+ self.settimeout(0.0)
+
+ def close(self):
+ if self.closed or not self.active:
+ return
+ self.send_eof()
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_CLOSE))
+ m.add_int(self.server_chanid)
+ self.transport.send_message(m)
+ self.closed = 1
+ self.transport.unlink_channel(self.chanid)
+
+ def recv_ready(self):
+ "doesn't work if you've called fileno()"
+ try:
+ self.lock.acquire()
+ if len(self.in_buffer) == 0:
+ return 0
+ return 1
+ finally:
+ self.lock.release()
+
+ def recv(self, nbytes):
+ out = ''
+ try:
+ self.lock.acquire()
+ if self.pipe_rfd != None:
+ # use the pipe
+ return self.read_pipe(nbytes)
+ if len(self.in_buffer) == 0:
+ if self.closed or self.eof_received:
+ return out
+ # should we block?
+ if self.timeout == 0.0:
+ raise socket.timeout()
+ # loop here in case we get woken up but a different thread has grabbed everything in the buffer
+ timeout = self.timeout
+ while (len(self.in_buffer) == 0) and not self.closed and not self.eof_received:
+ then = time.time()
+ self.in_buffer_cv.wait(timeout)
+ if timeout != None:
+ timeout -= time.time() - then
+ if timeout <= 0.0:
+ raise socket.timeout()
+ # something in the buffer and we have the lock
+ if len(self.in_buffer) <= nbytes:
+ out = self.in_buffer
+ self.in_buffer = ''
+ else:
+ out = self.in_buffer[:nbytes]
+ self.in_buffer = self.in_buffer[nbytes:]
+ self.check_add_window(len(out))
+ finally:
+ self.lock.release()
+ return out
+
+ def send(self, s):
+ size = 0
+ if self.closed or self.eof_sent:
+ return size
+ try:
+ self.lock.acquire()
+ if self.out_window_size == 0:
+ # should we block?
+ if self.timeout == 0.0:
+ raise socket.timeout()
+ # loop here in case we get woken up but a different thread has filled the buffer
+ timeout = self.timeout
+ while self.out_window_size == 0:
+ then = time.time()
+ self.out_buffer_cv.wait(timeout)
+ if timeout != None:
+ timeout -= time.time() - then
+ if timeout <= 0.0:
+ raise socket.timeout()
+ # we have some window to squeeze into
+ if self.closed:
+ return 0
+ size = len(s)
+ if self.out_window_size < size:
+ size = self.out_window_size
+ if self.out_max_packet_size < size:
+ size = self.out_max_packet_size
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_DATA))
+ m.add_int(self.server_chanid)
+ m.add_string(s[:size])
+ self.transport.send_message(m)
+ self.out_window_size -= size
+ finally:
+ self.lock.release()
+ return size
+
+ def sendall(self, s):
+ while s:
+ if self.closed:
+ # this doesn't seem useful, but it is the documented behavior of Socket
+ raise socket.error('Socket is closed')
+ sent = self.send(s)
+ s = s[sent:]
+ return None
+
+ def makefile(self, *params):
+ return ChannelFile(*([self] + list(params)))
+
+ def fileno(self):
+ """
+ returns an OS-level fd which can be used for polling and reading (but
+ NOT for writing). this is primarily to allow python's \"select\" module
+ to work. the first time this function is called, a pipe is created to
+ simulate real OS-level fd behavior. because of this, two actual fds are
+ created: one to return and one to feed. this may be inefficient if you
+ plan to use many fds.
+
+ the channel's receive window will be updated as data comes in, not as
+ you read it, so if you fail to poll the channel often enough, it may
+ block ALL channels across the transport.
+ """
+ try:
+ self.lock.acquire()
+ if self.pipe_rfd != None:
+ return self.pipe_rfd
+ # create the pipe and feed in any existing data
+ self.pipe_rfd, self.pipe_wfd = os.pipe()
+ set_nonblocking(self.pipe_wfd)
+ set_nonblocking(self.pipe_rfd)
+ if len(self.in_buffer) > 0:
+ x = self.in_buffer
+ self.in_buffer = ''
+ self.feed_pipe(x)
+ return self.pipe_rfd
+ finally:
+ self.lock.release()
+
+ def shutdown(self, how):
+ if (how == 0) or (how == 2):
+ # feign "read" shutdown
+ self.eof_received = 1
+ if (how == 1) or (how == 2):
+ self.send_eof()
+
+
+ # internal use...
+
+ def feed_pipe(self, data):
+ "you are already holding the lock"
+ if len(self.in_buffer) > 0:
+ self.in_buffer += data
+ return
+ try:
+ n = os.write(self.pipe_wfd, data)
+ if n < len(data):
+ # at least on linux, this will never happen, as the writes are
+ # considered atomic... but just in case.
+ self.in_buffer = data[n:]
+ self.check_add_window(n)
+ self.in_buffer_cv.notifyAll()
+ return
+ except OSError, e:
+ pass
+ if len(data) > 1:
+ # try writing just one byte then
+ x = data[0]
+ data = data[1:]
+ try:
+ os.write(self.pipe_wfd, x)
+ self.in_buffer = data
+ self.check_add_window(1)
+ self.in_buffer_cv.notifyAll()
+ return
+ except OSError, e:
+ data = x + data
+ # pipe is very full
+ self.in_buffer = data
+ self.in_buffer_cv.notifyAll()
+
+ def read_pipe(self, nbytes):
+ "you are already holding the lock"
+ try:
+ x = os.read(self.pipe_rfd, nbytes)
+ if len(x) > 0:
+ self.push_pipe(len(x))
+ return x
+ except OSError, e:
+ pass
+ # nothing in the pipe
+ if self.closed or self.eof_received:
+ return ''
+ # should we block?
+ if self.timeout == 0.0:
+ raise socket.timeout()
+ # loop here in case we get woken up but a different thread has grabbed everything in the buffer
+ timeout = self.timeout
+ while not self.closed and not self.eof_received:
+ then = time.time()
+ self.in_buffer_cv.wait(timeout)
+ if timeout != None:
+ timeout -= time.time() - then
+ if timeout <= 0.0:
+ raise socket.timeout()
+ try:
+ x = os.read(self.pipe_rfd, nbytes)
+ if len(x) > 0:
+ self.push_pipe(len(x))
+ return x
+ except OSError, e:
+ pass
+ pass
+
+ def push_pipe(self, nbytes):
+ # successfully read N bytes from the pipe, now re-feed the pipe if necessary
+ # (assumption: the pipe can hold as many bytes as were read out)
+ if len(self.in_buffer) == 0:
+ return
+ if len(self.in_buffer) <= nbytes:
+ os.write(self.pipe_wfd, self.in_buffer)
+ self.in_buffer = ''
+ return
+ x = self.in_buffer[:nbytes]
+ self.in_buffer = self.in_buffer[nbytes:]
+ os.write(self.pipd_wfd, x)
+
+ def unlink(self):
+ if self.closed or not self.active:
+ return
+ self.closed = 1
+ self.transport.unlink_channel(self.chanid)
+
+ def check_add_window(self, n):
+ # already holding the lock!
+ if self.closed or self.eof_received or not self.active:
+ return
+ self.log(DEBUG, 'addwindow %d' % n)
+ self.in_window_sofar += n
+ if self.in_window_sofar > self.in_window_threshold:
+ self.log(DEBUG, 'addwindow send %d' % self.in_window_sofar)
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST))
+ m.add_int(self.server_chanid)
+ m.add_int(self.in_window_sofar)
+ self.transport.send_message(m)
+ self.in_window_sofar = 0
+
+
+class ChannelFile(object):
+ """
+ A file-like wrapper around Channel.
+ Doesn't have the non-portable side effect of Channel.fileno().
+ XXX Todo: the channel and its file-wrappers should be able to be closed or
+ garbage-collected independently, for compatibility with real sockets and
+ their file-wrappers. Currently, closing does nothing but flush the buffer.
+ XXX Todo: translation of the various forms of newline is not implemented,
+ let alone the universal newline. Line buffering (for writing) is
+ implemented, though, which makes little sense without text mode support.
+ """
+
+ def __init__(self, channel, mode = "r", buf_size = -1):
+ self.channel = channel
+ self.mode = mode
+ if buf_size < 0:
+ self.buf_size = 1024
+ self.line_buffered = 0
+ elif buf_size == 1:
+ self.buf_size = 1
+ self.line_buffered = 1
+ else:
+ self.buf_size = buf_size
+ self.line_buffered = 0
+ self.wbuffer = ""
+ self.rbuffer = ""
+ self.readable = ("r" in mode)
+ self.writable = ("w" in mode) or ("+" in mode) or ("a" in mode)
+ self.binary = ("b" in mode)
+ if not self.binary:
+ raise NotImplementedError("text mode not supported")
+ self.softspace = 0
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ line = self.readline()
+ if not line:
+ raise StopIteration
+ return line
+
+ def write(self, str):
+ if not self.writable:
+ raise IOError("file not open for writing")
+ if self.buf_size == 0 and not self.line_buffered:
+ self.channel.sendall(str)
+ return
+ self.wbuffer += str
+ if self.line_buffered:
+ last_newline_pos = self.wbuffer.rfind("\n")
+ if last_newline_pos >= 0:
+ self.channel.sendall(self.wbuffer[:last_newline_pos+1])
+ self.wbuffer = self.wbuffer[last_newline_pos+1:]
+ else:
+ if len(self.wbuffer) >= self.buf_size:
+ self.channel.sendall(self.wbuffer)
+ self.wbuffer = ""
+ return
+
+ def writelines(self, sequence):
+ for s in sequence:
+ self.write(s)
+ return
+
+ def flush(self):
+ self.channel.sendall(self.wbuffer)
+ self.wbuffer = ""
+ return
+
+ def read(self, size = None):
+ if not self.readable:
+ raise IOError("file not open for reading")
+ if size is None or size < 0:
+ result = self.rbuffer
+ self.rbuffer = ""
+ while not self.channel.eof_received:
+ new_data = self.channel.recv(65536)
+ if not new_data:
+ break
+ result += new_data
+ return result
+ if size <= len(self.rbuffer):
+ result = self.rbuffer[:size]
+ self.rbuffer = self.rbuffer[size:]
+ return result
+ while len(self.rbuffer) < size and not self.channel.eof_received:
+ new_data = self.channel.recv(max(self.buf_size, size-len(self.rbuffer)))
+ if not new_data:
+ break
+ self.rbuffer += new_data
+ result = self.rbuffer[:size]
+ self.rbuffer[size:]
+ return result
+
+ def readline(self, size = None):
+ line = ""
+ while "\n" not in line:
+ if size >= 0:
+ new_data = self.read(size - len(line))
+ else:
+ new_data = self.read(64)
+ if not new_data:
+ break
+ line += new_data
+ newline_pos = line.find("\n")
+ if newline_pos >= 0:
+ self.rbuffer = line[newline_pos+1:] + self.rbuffer
+ return line[:newline_pos+1]
+ elif len(line) > size:
+ self.rbuffer = line[size:] + self.rbuffer
+ return line[:size]
+ return line
+
+ def readlines(self, sizehint = None):
+ lines = []
+ while 1:
+ line = self.readline()
+ if not line:
+ break
+ lines.append(line)
+ return lines
+
+ def xreadlines(self):
+ return self
+
+ def close(self):
+ self.flush()
+ return
+
+# vim: set shiftwidth=4 expandtab :