summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--paramiko/auth_handler.py16
-rw-r--r--paramiko/transport.py4
-rw-r--r--tests/test_auth.py109
3 files changed, 71 insertions, 58 deletions
diff --git a/paramiko/auth_handler.py b/paramiko/auth_handler.py
index 6cf6565f..2b336e07 100644
--- a/paramiko/auth_handler.py
+++ b/paramiko/auth_handler.py
@@ -200,7 +200,7 @@ class AuthHandler (object):
password = self.password
if isinstance(password, unicode):
password = password.encode('UTF-8')
- m.add_string(self.password.encode('UTF-8'))
+ m.add_string(password)
elif self.auth_method == 'publickey':
m.add_boolean(True)
m.add_string(self.private_key.get_name())
@@ -283,12 +283,22 @@ class AuthHandler (object):
result = self.transport.server_object.check_auth_none(username)
elif method == 'password':
changereq = m.get_boolean()
- password = m.get_string().decode('UTF-8', 'replace')
+ password = m.get_string()
+ try:
+ password = password.decode('UTF-8')
+ except UnicodeError:
+ # some clients/servers expect non-utf-8 passwords!
+ # in this case, just return the raw byte string.
+ pass
if changereq:
# always treated as failure, since we don't support changing passwords, but collect
# the list of valid auth types from the callback anyway
self.transport._log(DEBUG, 'Auth request to change passwords (rejected)')
- newpassword = m.get_string().decode('UTF-8', 'replace')
+ newpassword = m.get_string()
+ try:
+ newpassword = newpassword.decode('UTF-8', 'replace')
+ except UnicodeError:
+ pass
result = AUTH_FAILED
else:
result = self.transport.server_object.check_auth_password(username, password)
diff --git a/paramiko/transport.py b/paramiko/transport.py
index 32120b0f..68c6d936 100644
--- a/paramiko/transport.py
+++ b/paramiko/transport.py
@@ -1071,9 +1071,9 @@ class Transport (threading.Thread):
step. Otherwise, in the normal case, an empty list is returned.
@param username: the username to authenticate as
- @type username: string
+ @type username: str
@param password: the password to authenticate with
- @type password: string
+ @type password: str or unicode
@param event: an event to trigger when the authentication attempt is
complete (whether it was successful or not)
@type event: threading.Event
diff --git a/tests/test_auth.py b/tests/test_auth.py
index cc35ee9b..a18c57d0 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -48,6 +48,10 @@ class NullServer (ServerInterface):
return 'password'
if username == 'commie':
return 'keyboard-interactive'
+ if username == 'utf8':
+ return 'password'
+ if username == 'non-utf8':
+ return 'password'
return 'publickey'
def check_auth_password(self, username, password):
@@ -59,7 +63,9 @@ class NullServer (ServerInterface):
if self.paranoid_did_public_key:
return AUTH_SUCCESSFUL
return AUTH_PARTIALLY_SUCCESSFUL
- if (username == 'utf8') and (password == u'\u2022'.encode('utf-8')):
+ if (username == 'utf8') and (password == u'\u2022'):
+ return AUTH_SUCCESSFUL
+ if (username == 'non-utf8') and (password == '\xff'):
return AUTH_SUCCESSFUL
return AUTH_FAILED
@@ -99,21 +105,29 @@ class AuthTest (unittest.TestCase):
self.ts.close()
self.socks.close()
self.sockc.close()
+
+ def start_server(self):
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ self.public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ self.event = threading.Event()
+ self.server = NullServer()
+ self.assert_(not self.event.isSet())
+ self.ts.start_server(self.event, self.server)
+
+ def verify_finished(self):
+ self.event.wait(1.0)
+ self.assert_(self.event.isSet())
+ self.assert_(self.ts.is_active())
def test_1_bad_auth_type(self):
"""
verify that we get the right exception when an unsupported auth
type is requested.
"""
- host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
- public_host_key = RSAKey(data=str(host_key))
- self.ts.add_server_key(host_key)
- event = threading.Event()
- server = NullServer()
- self.assert_(not event.isSet())
- self.ts.start_server(event, server)
+ self.start_server()
try:
- self.tc.connect(hostkey=public_host_key,
+ self.tc.connect(hostkey=self.public_host_key,
username='unknown', password='error')
self.assert_(False)
except:
@@ -126,14 +140,8 @@ class AuthTest (unittest.TestCase):
verify that a bad password gets the right exception, and that a retry
with the right password works.
"""
- host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
- public_host_key = RSAKey(data=str(host_key))
- self.ts.add_server_key(host_key)
- event = threading.Event()
- server = NullServer()
- self.assert_(not event.isSet())
- self.ts.start_server(event, server)
- self.tc.connect(hostkey=public_host_key)
+ self.start_server()
+ self.tc.connect(hostkey=self.public_host_key)
try:
self.tc.auth_password(username='slowdive', password='error')
self.assert_(False)
@@ -141,43 +149,27 @@ class AuthTest (unittest.TestCase):
etype, evalue, etb = sys.exc_info()
self.assert_(issubclass(etype, SSHException))
self.tc.auth_password(username='slowdive', password='pygmalion')
- event.wait(1.0)
- self.assert_(event.isSet())
- self.assert_(self.ts.is_active())
+ self.verify_finished()
def test_3_multipart_auth(self):
"""
verify that multipart auth works.
"""
- host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
- public_host_key = RSAKey(data=str(host_key))
- self.ts.add_server_key(host_key)
- event = threading.Event()
- server = NullServer()
- self.assert_(not event.isSet())
- self.ts.start_server(event, server)
- self.tc.connect(hostkey=public_host_key)
+ self.start_server()
+ self.tc.connect(hostkey=self.public_host_key)
remain = self.tc.auth_password(username='paranoid', password='paranoid')
self.assertEquals(['publickey'], remain)
key = DSSKey.from_private_key_file('tests/test_dss.key')
remain = self.tc.auth_publickey(username='paranoid', key=key)
self.assertEquals([], remain)
- event.wait(1.0)
- self.assert_(event.isSet())
- self.assert_(self.ts.is_active())
+ self.verify_finished()
def test_4_interactive_auth(self):
"""
verify keyboard-interactive auth works.
"""
- host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
- public_host_key = RSAKey(data=str(host_key))
- self.ts.add_server_key(host_key)
- event = threading.Event()
- server = NullServer()
- self.assert_(not event.isSet())
- self.ts.start_server(event, server)
- self.tc.connect(hostkey=public_host_key)
+ self.start_server()
+ self.tc.connect(hostkey=self.public_host_key)
def handler(title, instructions, prompts):
self.got_title = title
@@ -188,25 +180,36 @@ class AuthTest (unittest.TestCase):
self.assertEquals(self.got_title, 'password')
self.assertEquals(self.got_prompts, [('Password', False)])
self.assertEquals([], remain)
- event.wait(1.0)
- self.assert_(event.isSet())
- self.assert_(self.ts.is_active())
+ self.verify_finished()
def test_5_interactive_auth_fallback(self):
"""
verify that a password auth attempt will fallback to "interactive"
if password auth isn't supported but interactive is.
"""
- host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
- public_host_key = RSAKey(data=str(host_key))
- self.ts.add_server_key(host_key)
- event = threading.Event()
- server = NullServer()
- self.assert_(not event.isSet())
- self.ts.start_server(event, server)
- self.tc.connect(hostkey=public_host_key)
+ self.start_server()
+ self.tc.connect(hostkey=self.public_host_key)
remain = self.tc.auth_password('commie', 'cat')
self.assertEquals([], remain)
- event.wait(1.0)
- self.assert_(event.isSet())
- self.assert_(self.ts.is_active())
+ self.verify_finished()
+
+ def test_6_auth_utf8(self):
+ """
+ verify that utf-8 encoding happens in authentication.
+ """
+ self.start_server()
+ self.tc.connect(hostkey=self.public_host_key)
+ remain = self.tc.auth_password('utf8', u'\u2022')
+ self.assertEquals([], remain)
+ self.verify_finished()
+
+ def test_7_auth_non_utf8(self):
+ """
+ verify that non-utf-8 encoded passwords can be used for broken
+ servers.
+ """
+ self.start_server()
+ self.tc.connect(hostkey=self.public_host_key)
+ remain = self.tc.auth_password('non-utf8', '\xff')
+ self.assertEquals([], remain)
+ self.verify_finished()