From 51607386c7609a483568ad935083c9668fe6241b Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Tue, 4 Nov 2003 08:34:24 +0000 Subject: [project @ Arch-1:robey@lag.net--2003-public%secsh--dev--1.0--base-0] initial import (automatically generated log message) --- auth_transport.py | 224 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 auth_transport.py (limited to 'auth_transport.py') diff --git a/auth_transport.py b/auth_transport.py new file mode 100644 index 00000000..1a06326d --- /dev/null +++ b/auth_transport.py @@ -0,0 +1,224 @@ +#!/usr/bin/python + +from transport import BaseTransport +from transport import MSG_SERVICE_REQUEST, MSG_SERVICE_ACCEPT, MSG_USERAUTH_REQUEST, MSG_USERAUTH_FAILURE, \ + MSG_USERAUTH_SUCCESS, MSG_USERAUTH_BANNER +from message import Message +from secsh import SSHException +from logging import DEBUG, INFO, WARNING, ERROR, CRITICAL + +DISCONNECT_SERVICE_NOT_AVAILABLE, DISCONNECT_AUTH_CANCELLED_BY_USER, \ + DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLE = 7, 13, 14 + +AUTH_SUCCESSFUL, AUTH_PARTIALLY_SUCCESSFUL, AUTH_FAILED = range(3) + + +class Transport(BaseTransport): + "BaseTransport with the auth framework hooked up" + def __init__(self, sock): + BaseTransport.__init__(self, sock) + self.auth_event = None + # for server mode: + self.auth_username = None + self.auth_fail_count = 0 + self.auth_complete = 0 + + def request_auth(self): + m = Message() + m.add_byte(chr(MSG_SERVICE_REQUEST)) + m.add_string('ssh-userauth') + self.send_message(m) + + def auth_key(self, username, key, event): + if (not self.active) or (not self.initial_kex_done): + # we should never try to send the password unless we're on a secure link + raise SSHException('No existing session') + try: + self.lock.acquire() + self.auth_event = event + self.auth_method = 'publickey' + self.username = username + self.private_key = key + self.request_auth() + finally: + self.lock.release() + + def auth_password(self, username, password, event): + 'authenticate using a password; event is triggered on success or fail' + if (not self.active) or (not self.initial_kex_done): + # we should never try to send the password unless we're on a secure link + raise SSHException('No existing session') + try: + self.lock.acquire() + self.auth_event = event + self.auth_method = 'password' + self.username = username + self.password = password + self.request_auth() + finally: + self.lock.release() + + def disconnect_service_not_available(self): + m = Message() + m.add_byte(chr(MSG_DISCONNECT)) + m.add_int(DISCONNECT_SERVICE_NOT_AVAILABLE) + m.add_string('Service not available') + m.add_string('en') + self.send_message(m) + self.close() + + def disconnect_no_more_auth(self): + m = Message() + m.add_byte(chr(MSG_DISCONNECT)) + m.add_int(DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLE) + m.add_string('No more auth methods available') + m.add_string('en') + self.send_message(m) + self.close() + + def parse_service_request(self, m): + service = m.get_string() + if self.server_mode and (service == 'ssh-userauth'): + # accepted + m = Message() + m.add_byte(chr(MSG_SERVICE_ACCEPT)) + m.add_string(service) + self.send_message(m) + return + # dunno this one + self.disconnect_service_not_available() + + def parse_service_accept(self, m): + service = m.get_string() + if service == 'ssh-userauth': + self.log(DEBUG, 'userauth is OK') + m = Message() + m.add_byte(chr(MSG_USERAUTH_REQUEST)) + m.add_string(self.username) + m.add_string('ssh-connection') + m.add_string(self.auth_method) + if self.auth_method == 'password': + m.add_boolean(0) + m.add_string(self.password.encode('UTF-8')) + elif self.auth_method == 'publickey': + m.add_boolean(1) + m.add_string(self.private_key.get_name()) + m.add_string(str(self.private_key)) + m.add_string(self.private_key.sign_ssh_session(self.randpool, self.H, self.username)) + else: + raise SSHException('Unknown auth method "%s"' % self.auth_method) + self.send_message(m) + else: + self.log(DEBUG, 'Service request "%s" accepted (?)' % service) + + def get_allowed_auths(self): + "override me!" + return 'password' + + def check_auth_none(self, username): + "override me! return tuple of (int, string) ==> (auth status, list of acceptable auth methods)" + return (AUTH_FAILED, self.get_allowed_auths()) + + def check_auth_password(self, username, password): + "override me! return tuple of (int, string) ==> (auth status, list of acceptable auth methods)" + return (AUTH_FAILED, self.get_allowed_auths()) + + def check_auth_publickey(self, username, key): + "override me! return tuple of (int, string) ==> (auth status, list of acceptable auth methods)" + return (AUTH_FAILED, self.get_allowed_auths()) + + def parse_userauth_request(self, m): + if not self.server_mode: + # er, uh... what? + m = Message() + m.add_byte(chr(MSG_USERAUTH_FAILURE)) + m.add_string('none') + m.add_boolean(0) + self.send_message(m) + return + if self.auth_complete: + # ignore + return + username = m.get_string() + service = m.get_string() + method = m.get_string() + if service != 'ssh-connection': + self.disconnect_service_not_available() + return + if (self.auth_username is not None) and (self.auth_username != username): + # trying to change username in mid-flight! + self.disconnect_no_more_auth() + return + if method == 'none': + result = self.check_auth_none(username) + elif method == 'password': + changereq = m.get_boolean() + password = m.get_string().decode('UTF-8') + if changereq: + # always treated as failure, since we don't support changing passwords, but collect + # the list of valid auth types from the callback anyway + newpassword = m.get_string().decode('UTF-8') + result = self.check_auth_password(username, password) + result = (AUTH_FAILED, result[1]) + else: + result = self.check_auth_password(username, password) + elif method == 'publickey': + # FIXME + result = self.check_auth_none(username) + result = (AUTH_FAILED, result[1]) + else: + result = self.check_auth_none(username) + result = (AUTH_FAILED, result[1]) + # okay, send result + m = Message() + if result[0] == AUTH_SUCCESSFUL: + m.add_byte(chr(MSG_USERAUTH_SUCCESSFUL)) + self.auth_complete = 1 + else: + m.add_byte(chr(MSG_USERAUTH_FAILURE)) + m.add_string(result[1]) + if result[0] == AUTH_PARTIALLY_SUCCESSFUL: + m.add_boolean(1) + else: + m.add_boolean(0) + self.auth_fail_count += 1 + self.send_message(m) + if self.auth_fail_count >= 10: + self.disconnect_no_more_auth() + + def parse_userauth_success(self, m): + self.log(INFO, 'Authentication successful!') + self.authenticated = 1 + if self.auth_event != None: + self.auth_event.set() + + def parse_userauth_failure(self, m): + authlist = m.get_list() + partial = m.get_boolean() + if partial: + self.log(INFO, 'Authentication continues...') + self.log(DEBUG, 'Methods: ' + str(partial)) + # FIXME - do something + pass + self.log(INFO, 'Authentication failed.') + self.authenticated = 0 + self.close() + if self.auth_event != None: + self.auth_event.set() + + def parse_userauth_banner(self, m): + banner = m.get_string() + lang = m.get_string() + self.log(INFO, 'Auth banner: ' + banner) + # who cares. + + handler_table = BaseTransport.handler_table.copy() + handler_table.update({ + MSG_SERVICE_REQUEST: parse_service_request, + MSG_SERVICE_ACCEPT: parse_service_accept, + MSG_USERAUTH_REQUEST: parse_userauth_request, + MSG_USERAUTH_SUCCESS: parse_userauth_success, + MSG_USERAUTH_FAILURE: parse_userauth_failure, + MSG_USERAUTH_BANNER: parse_userauth_banner, + }) + -- cgit v1.2.3