diff options
author | Robey Pointer <robey@lag.net> | 2003-11-04 08:34:24 +0000 |
---|---|---|
committer | Robey Pointer <robey@lag.net> | 2003-11-04 08:34:24 +0000 |
commit | 51607386c7609a483568ad935083c9668fe6241b (patch) | |
tree | 46b1083cfbd387fd181cc8fbef2ce77f837a3bd6 /channel.py |
[project @ Arch-1:robey@lag.net--2003-public%secsh--dev--1.0--base-0]
initial import
(automatically generated log message)
Diffstat (limited to 'channel.py')
-rw-r--r-- | channel.py | 608 |
1 files changed, 608 insertions, 0 deletions
diff --git a/channel.py b/channel.py new file mode 100644 index 00000000..275c0a26 --- /dev/null +++ b/channel.py @@ -0,0 +1,608 @@ +from message import Message +from secsh import SSHException +from transport import MSG_CHANNEL_REQUEST, MSG_CHANNEL_CLOSE, MSG_CHANNEL_WINDOW_ADJUST, MSG_CHANNEL_DATA, \ + MSG_CHANNEL_EOF + +import time, threading, logging, socket, os +from logging import DEBUG + + +# this is ugly, and won't work on windows +def set_nonblocking(fd): + import fcntl + fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK) + + +class Channel(object): + """ + Abstraction for a secsh channel. + """ + + def __init__(self, chanid, transport): + self.chanid = chanid + self.transport = transport + self.active = 0 + self.eof_received = 0 + self.eof_sent = 0 + self.in_buffer = '' + self.timeout = None + self.closed = 0 + self.lock = threading.Lock() + self.in_buffer_cv = threading.Condition(self.lock) + self.out_buffer_cv = threading.Condition(self.lock) + self.name = str(chanid) + self.logger = logging.getLogger('secsh.chan.' + str(chanid)) + self.pipe_rfd = self.pipe_wfd = None + + def __repr__(self): + out = '<secsh.Channel %d' % self.chanid + if self.closed: + out += ' (closed)' + elif self.active: + if self.eof_received: + out += ' (EOF received)' + if self.eof_sent: + out += ' (EOF sent)' + out += ' (open) window=%d' % (self.out_window_size) + if len(self.in_buffer) > 0: + out += ' in-buffer=%d' % (len(self.in_buffer),) + out += ' -> ' + repr(self.transport) + out += '>' + return out + + def log(self, level, msg): + self.logger.log(level, msg) + + def set_window(self, window_size, max_packet_size): + self.in_window_size = window_size + self.in_max_packet_size = max_packet_size + # threshold of bytes we receive before we bother to send a window update + self.in_window_threshold = window_size // 10 + self.in_window_sofar = 0 + + def set_server_channel(self, chanid, window_size, max_packet_size): + self.server_chanid = chanid + self.out_window_size = window_size + self.out_max_packet_size = max_packet_size + self.active = 1 + + def request_success(self, m): + self.log(DEBUG, 'Sesch channel %d request ok' % self.chanid) + return + + def request_failed(self, m): + self.close() + + def feed(self, m): + s = m.get_string() + try: + self.lock.acquire() + self.log(DEBUG, 'fed %d bytes' % len(s)) + if self.pipe_wfd != None: + self.feed_pipe(s) + else: + self.in_buffer += s + self.in_buffer_cv.notifyAll() + self.log(DEBUG, '(out from feed)') + finally: + self.lock.release() + + def window_adjust(self, m): + nbytes = m.get_int() + try: + self.lock.acquire() + self.log(DEBUG, 'window up %d' % nbytes) + self.out_window_size += nbytes + self.out_buffer_cv.notifyAll() + finally: + self.lock.release() + + def handle_request(self, m): + key = m.get_string() + if key == 'exit-status': + self.exit_status = m.get_int() + return + elif key == 'xon-xoff': + # ignore + return + else: + self.log(DEBUG, 'Unhandled channel request "%s"' % key) + + def handle_eof(self, m): + self.eof_received = 1 + try: + self.lock.acquire() + self.in_buffer_cv.notifyAll() + if self.pipe_wfd != None: + os.close(self.pipe_wfd) + self.pipe_wfd = None + finally: + self.lock.release() + self.log(DEBUG, 'EOF received') + + def handle_close(self, m): + self.close() + try: + self.lock.acquire() + self.in_buffer_cv.notifyAll() + self.out_buffer_cv.notifyAll() + if self.pipe_wfd != None: + os.close(self.pipe_wfd) + self.pipe_wfd = None + finally: + self.lock.release() + + + # API for external use + + def get_pty(self, term='vt100', width=80, height=24): + if self.closed or self.eof_received or self.eof_sent or not self.active: + raise SSHException('Channel is not open') + m = Message() + m.add_byte(chr(MSG_CHANNEL_REQUEST)) + m.add_int(self.server_chanid) + m.add_string('pty-req') + m.add_boolean(0) + m.add_string(term) + m.add_int(width) + m.add_int(height) + # pixel height, width (usually useless) + m.add_int(0).add_int(0) + m.add_string('') + self.transport.send_message(m) + + def invoke_shell(self): + if self.closed or self.eof_received or self.eof_sent or not self.active: + raise SSHException('Channel is not open') + m = Message() + m.add_byte(chr(MSG_CHANNEL_REQUEST)) + m.add_int(self.server_chanid) + m.add_string('shell') + m.add_boolean(1) + self.transport.send_message(m) + + def exec_command(self, command): + if self.closed or self.eof_received or self.eof_sent or not self.active: + raise SSHException('Channel is not open') + m = Message() + m.add_byte(chr(MSG_CHANNEL_REQUEST)) + m.add_int(self.server_chanid) + m.add_string('exec') + m.add_boolean(1) + m.add_string(command) + self.transport.send_message(m) + + def invoke_subsystem(self, subsystem): + if self.closed or self.eof_received or self.eof_sent or not self.active: + raise SSHException('Channel is not open') + m = Message() + m.add_byte(chr(MSG_CHANNEL_REQUEST)) + m.add_int(self.server_chanid) + m.add_string('subsystem') + m.add_boolean(1) + m.add_string(subsystem) + self.transport.send_message(m) + + def resize_pty(self, width=80, height=24): + if self.closed or self.eof_received or self.eof_sent or not self.active: + raise SSHException('Channel is not open') + m = Message() + m.add_byte(chr(MSG_CHANNEL_REQUEST)) + m.add_int(self.server_chanid) + m.add_string('window-change') + m.add_boolean(0) + m.add_int(width) + m.add_int(height) + m.add_int(0).add_int(0) + self.transport.send_message(m) + + def get_transport(self): + return self.transport + + def set_name(self, name): + self.name = name + self.logger = logging.getLogger('secsh.chan.' + name) + + def get_name(self): + return self.name + + def send_eof(self): + if self.eof_sent: + return + m = Message() + m.add_byte(chr(MSG_CHANNEL_EOF)) + m.add_int(self.server_chanid) + self.transport.send_message(m) + self.eof_sent = 1 + self.log(DEBUG, 'EOF sent') + return + + + # socket equivalency methods... + + def settimeout(self, timeout): + self.timeout = timeout + + def gettimeout(self): + return self.timeout + + def setblocking(self, blocking): + if blocking: + self.settimeout(None) + else: + self.settimeout(0.0) + + def close(self): + if self.closed or not self.active: + return + self.send_eof() + m = Message() + m.add_byte(chr(MSG_CHANNEL_CLOSE)) + m.add_int(self.server_chanid) + self.transport.send_message(m) + self.closed = 1 + self.transport.unlink_channel(self.chanid) + + def recv_ready(self): + "doesn't work if you've called fileno()" + try: + self.lock.acquire() + if len(self.in_buffer) == 0: + return 0 + return 1 + finally: + self.lock.release() + + def recv(self, nbytes): + out = '' + try: + self.lock.acquire() + if self.pipe_rfd != None: + # use the pipe + return self.read_pipe(nbytes) + if len(self.in_buffer) == 0: + if self.closed or self.eof_received: + return out + # should we block? + if self.timeout == 0.0: + raise socket.timeout() + # loop here in case we get woken up but a different thread has grabbed everything in the buffer + timeout = self.timeout + while (len(self.in_buffer) == 0) and not self.closed and not self.eof_received: + then = time.time() + self.in_buffer_cv.wait(timeout) + if timeout != None: + timeout -= time.time() - then + if timeout <= 0.0: + raise socket.timeout() + # something in the buffer and we have the lock + if len(self.in_buffer) <= nbytes: + out = self.in_buffer + self.in_buffer = '' + else: + out = self.in_buffer[:nbytes] + self.in_buffer = self.in_buffer[nbytes:] + self.check_add_window(len(out)) + finally: + self.lock.release() + return out + + def send(self, s): + size = 0 + if self.closed or self.eof_sent: + return size + try: + self.lock.acquire() + if self.out_window_size == 0: + # should we block? + if self.timeout == 0.0: + raise socket.timeout() + # loop here in case we get woken up but a different thread has filled the buffer + timeout = self.timeout + while self.out_window_size == 0: + then = time.time() + self.out_buffer_cv.wait(timeout) + if timeout != None: + timeout -= time.time() - then + if timeout <= 0.0: + raise socket.timeout() + # we have some window to squeeze into + if self.closed: + return 0 + size = len(s) + if self.out_window_size < size: + size = self.out_window_size + if self.out_max_packet_size < size: + size = self.out_max_packet_size + m = Message() + m.add_byte(chr(MSG_CHANNEL_DATA)) + m.add_int(self.server_chanid) + m.add_string(s[:size]) + self.transport.send_message(m) + self.out_window_size -= size + finally: + self.lock.release() + return size + + def sendall(self, s): + while s: + if self.closed: + # this doesn't seem useful, but it is the documented behavior of Socket + raise socket.error('Socket is closed') + sent = self.send(s) + s = s[sent:] + return None + + def makefile(self, *params): + return ChannelFile(*([self] + list(params))) + + def fileno(self): + """ + returns an OS-level fd which can be used for polling and reading (but + NOT for writing). this is primarily to allow python's \"select\" module + to work. the first time this function is called, a pipe is created to + simulate real OS-level fd behavior. because of this, two actual fds are + created: one to return and one to feed. this may be inefficient if you + plan to use many fds. + + the channel's receive window will be updated as data comes in, not as + you read it, so if you fail to poll the channel often enough, it may + block ALL channels across the transport. + """ + try: + self.lock.acquire() + if self.pipe_rfd != None: + return self.pipe_rfd + # create the pipe and feed in any existing data + self.pipe_rfd, self.pipe_wfd = os.pipe() + set_nonblocking(self.pipe_wfd) + set_nonblocking(self.pipe_rfd) + if len(self.in_buffer) > 0: + x = self.in_buffer + self.in_buffer = '' + self.feed_pipe(x) + return self.pipe_rfd + finally: + self.lock.release() + + def shutdown(self, how): + if (how == 0) or (how == 2): + # feign "read" shutdown + self.eof_received = 1 + if (how == 1) or (how == 2): + self.send_eof() + + + # internal use... + + def feed_pipe(self, data): + "you are already holding the lock" + if len(self.in_buffer) > 0: + self.in_buffer += data + return + try: + n = os.write(self.pipe_wfd, data) + if n < len(data): + # at least on linux, this will never happen, as the writes are + # considered atomic... but just in case. + self.in_buffer = data[n:] + self.check_add_window(n) + self.in_buffer_cv.notifyAll() + return + except OSError, e: + pass + if len(data) > 1: + # try writing just one byte then + x = data[0] + data = data[1:] + try: + os.write(self.pipe_wfd, x) + self.in_buffer = data + self.check_add_window(1) + self.in_buffer_cv.notifyAll() + return + except OSError, e: + data = x + data + # pipe is very full + self.in_buffer = data + self.in_buffer_cv.notifyAll() + + def read_pipe(self, nbytes): + "you are already holding the lock" + try: + x = os.read(self.pipe_rfd, nbytes) + if len(x) > 0: + self.push_pipe(len(x)) + return x + except OSError, e: + pass + # nothing in the pipe + if self.closed or self.eof_received: + return '' + # should we block? + if self.timeout == 0.0: + raise socket.timeout() + # loop here in case we get woken up but a different thread has grabbed everything in the buffer + timeout = self.timeout + while not self.closed and not self.eof_received: + then = time.time() + self.in_buffer_cv.wait(timeout) + if timeout != None: + timeout -= time.time() - then + if timeout <= 0.0: + raise socket.timeout() + try: + x = os.read(self.pipe_rfd, nbytes) + if len(x) > 0: + self.push_pipe(len(x)) + return x + except OSError, e: + pass + pass + + def push_pipe(self, nbytes): + # successfully read N bytes from the pipe, now re-feed the pipe if necessary + # (assumption: the pipe can hold as many bytes as were read out) + if len(self.in_buffer) == 0: + return + if len(self.in_buffer) <= nbytes: + os.write(self.pipe_wfd, self.in_buffer) + self.in_buffer = '' + return + x = self.in_buffer[:nbytes] + self.in_buffer = self.in_buffer[nbytes:] + os.write(self.pipd_wfd, x) + + def unlink(self): + if self.closed or not self.active: + return + self.closed = 1 + self.transport.unlink_channel(self.chanid) + + def check_add_window(self, n): + # already holding the lock! + if self.closed or self.eof_received or not self.active: + return + self.log(DEBUG, 'addwindow %d' % n) + self.in_window_sofar += n + if self.in_window_sofar > self.in_window_threshold: + self.log(DEBUG, 'addwindow send %d' % self.in_window_sofar) + m = Message() + m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST)) + m.add_int(self.server_chanid) + m.add_int(self.in_window_sofar) + self.transport.send_message(m) + self.in_window_sofar = 0 + + +class ChannelFile(object): + """ + A file-like wrapper around Channel. + Doesn't have the non-portable side effect of Channel.fileno(). + XXX Todo: the channel and its file-wrappers should be able to be closed or + garbage-collected independently, for compatibility with real sockets and + their file-wrappers. Currently, closing does nothing but flush the buffer. + XXX Todo: translation of the various forms of newline is not implemented, + let alone the universal newline. Line buffering (for writing) is + implemented, though, which makes little sense without text mode support. + """ + + def __init__(self, channel, mode = "r", buf_size = -1): + self.channel = channel + self.mode = mode + if buf_size < 0: + self.buf_size = 1024 + self.line_buffered = 0 + elif buf_size == 1: + self.buf_size = 1 + self.line_buffered = 1 + else: + self.buf_size = buf_size + self.line_buffered = 0 + self.wbuffer = "" + self.rbuffer = "" + self.readable = ("r" in mode) + self.writable = ("w" in mode) or ("+" in mode) or ("a" in mode) + self.binary = ("b" in mode) + if not self.binary: + raise NotImplementedError("text mode not supported") + self.softspace = 0 + + def __iter__(self): + return self + + def next(self): + line = self.readline() + if not line: + raise StopIteration + return line + + def write(self, str): + if not self.writable: + raise IOError("file not open for writing") + if self.buf_size == 0 and not self.line_buffered: + self.channel.sendall(str) + return + self.wbuffer += str + if self.line_buffered: + last_newline_pos = self.wbuffer.rfind("\n") + if last_newline_pos >= 0: + self.channel.sendall(self.wbuffer[:last_newline_pos+1]) + self.wbuffer = self.wbuffer[last_newline_pos+1:] + else: + if len(self.wbuffer) >= self.buf_size: + self.channel.sendall(self.wbuffer) + self.wbuffer = "" + return + + def writelines(self, sequence): + for s in sequence: + self.write(s) + return + + def flush(self): + self.channel.sendall(self.wbuffer) + self.wbuffer = "" + return + + def read(self, size = None): + if not self.readable: + raise IOError("file not open for reading") + if size is None or size < 0: + result = self.rbuffer + self.rbuffer = "" + while not self.channel.eof_received: + new_data = self.channel.recv(65536) + if not new_data: + break + result += new_data + return result + if size <= len(self.rbuffer): + result = self.rbuffer[:size] + self.rbuffer = self.rbuffer[size:] + return result + while len(self.rbuffer) < size and not self.channel.eof_received: + new_data = self.channel.recv(max(self.buf_size, size-len(self.rbuffer))) + if not new_data: + break + self.rbuffer += new_data + result = self.rbuffer[:size] + self.rbuffer[size:] + return result + + def readline(self, size = None): + line = "" + while "\n" not in line: + if size >= 0: + new_data = self.read(size - len(line)) + else: + new_data = self.read(64) + if not new_data: + break + line += new_data + newline_pos = line.find("\n") + if newline_pos >= 0: + self.rbuffer = line[newline_pos+1:] + self.rbuffer + return line[:newline_pos+1] + elif len(line) > size: + self.rbuffer = line[size:] + self.rbuffer + return line[:size] + return line + + def readlines(self, sizehint = None): + lines = [] + while 1: + line = self.readline() + if not line: + break + lines.append(line) + return lines + + def xreadlines(self): + return self + + def close(self): + self.flush() + return + +# vim: set shiftwidth=4 expandtab : |