diff options
author | Jeff Forcier <jeff@bitprophet.org> | 2017-06-06 15:15:40 -0700 |
---|---|---|
committer | Jeff Forcier <jeff@bitprophet.org> | 2017-06-06 15:15:40 -0700 |
commit | 996fb6fd8ffb6df4f56c81e2ff199b9a600ecfc6 (patch) | |
tree | 4571659cb5f9320275cfedccc3ae897b0f425343 /tests | |
parent | 57394f5199ff75abc87b0373e18be2102540d50d (diff) | |
parent | ddb277d4e4989e914b67ff26c14c7c298e7fab9f (diff) |
Merge branch 'master' into 471-int
Diffstat (limited to 'tests')
-rw-r--r-- | tests/loop.py | 2 | ||||
-rw-r--r-- | tests/stub_sftp.py | 8 | ||||
-rw-r--r-- | tests/test_auth.py | 44 | ||||
-rw-r--r-- | tests/test_client.py | 178 | ||||
-rw-r--r-- | tests/test_ecdsa_256.key (renamed from tests/test_ecdsa.key) | 0 | ||||
-rw-r--r-- | tests/test_ecdsa_384.key | 6 | ||||
-rw-r--r-- | tests/test_ecdsa_521.key | 7 | ||||
-rw-r--r-- | tests/test_ecdsa_password_256.key (renamed from tests/test_ecdsa_password.key) | 0 | ||||
-rw-r--r-- | tests/test_ecdsa_password_384.key | 9 | ||||
-rw-r--r-- | tests/test_ecdsa_password_521.key | 10 | ||||
-rw-r--r-- | tests/test_ed25519.key | 8 | ||||
-rw-r--r-- | tests/test_ed25519_password.key | 8 | ||||
-rwxr-xr-x | tests/test_file.py | 27 | ||||
-rw-r--r-- | tests/test_gssapi.py | 8 | ||||
-rw-r--r-- | tests/test_hostkeys.py | 13 | ||||
-rw-r--r-- | tests/test_kex.py | 177 | ||||
-rw-r--r-- | tests/test_message.py | 8 | ||||
-rw-r--r-- | tests/test_packetizer.py | 41 | ||||
-rw-r--r-- | tests/test_pkey.py | 262 | ||||
-rwxr-xr-x | tests/test_sftp.py | 15 | ||||
-rw-r--r-- | tests/test_sftp_big.py | 18 | ||||
-rw-r--r-- | tests/test_ssh_exception.py | 31 | ||||
-rw-r--r-- | tests/test_ssh_gss.py | 4 | ||||
-rw-r--r-- | tests/test_transport.py | 82 | ||||
-rw-r--r-- | tests/test_util.py | 89 |
25 files changed, 896 insertions, 159 deletions
diff --git a/tests/loop.py b/tests/loop.py index 4f5dc163..e805ad96 100644 --- a/tests/loop.py +++ b/tests/loop.py @@ -37,9 +37,11 @@ class LoopSocket (object): self.__cv = threading.Condition(self.__lock) self.__timeout = None self.__mate = None + self._closed = False def close(self): self.__unlink() + self._closed = True try: self.__lock.acquire() self.__in_buffer = bytes() diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py index a894c2ba..0d673091 100644 --- a/tests/stub_sftp.py +++ b/tests/stub_sftp.py @@ -22,8 +22,10 @@ A stub SFTP server for loopback SFTP testing. import os import sys -from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \ - SFTPHandle, SFTP_OK, SFTP_FAILURE, AUTH_SUCCESSFUL, OPEN_SUCCEEDED +from paramiko import ( + ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, + SFTPHandle, SFTP_OK, SFTP_FAILURE, AUTH_SUCCESSFUL, OPEN_SUCCEEDED, +) from paramiko.common import o666 @@ -55,7 +57,7 @@ class StubSFTPHandle (SFTPHandle): class StubSFTPServer (SFTPServerInterface): # assume current folder is a fine root - # (the tests always create and eventualy delete a subfolder, so there shouldn't be any mess) + # (the tests always create and eventually delete a subfolder, so there shouldn't be any mess) ROOT = os.getcwd() def _realpath(self, path): diff --git a/tests/test_auth.py b/tests/test_auth.py index ec78e3ce..58b2f44f 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -23,10 +23,12 @@ Some unit tests for authenticating over a Transport. import sys import threading import unittest +from time import sleep -from paramiko import Transport, ServerInterface, RSAKey, DSSKey, \ - BadAuthenticationType, InteractiveQuery, \ - AuthenticationException +from paramiko import ( + Transport, ServerInterface, RSAKey, DSSKey, BadAuthenticationType, + InteractiveQuery, AuthenticationException, +) from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL from paramiko.py3compat import u from tests.loop import LoopSocket @@ -73,6 +75,9 @@ class NullServer (ServerInterface): return AUTH_SUCCESSFUL if username == 'bad-server': raise Exception("Ack!") + if username == 'unresponsive-server': + sleep(5) + return AUTH_SUCCESSFUL return AUTH_FAILED def check_auth_publickey(self, username, key): @@ -83,13 +88,13 @@ class NullServer (ServerInterface): return AUTH_SUCCESSFUL return AUTH_PARTIALLY_SUCCESSFUL return AUTH_FAILED - + def check_auth_interactive(self, username, submethods): if username == 'commie': self.username = username return InteractiveQuery('password', 'Please enter a password.', ('Password', False)) return AUTH_FAILED - + def check_auth_interactive_response(self, responses): if self.username == 'commie': if (len(responses) == 1) and (responses[0] == 'cat'): @@ -111,7 +116,7 @@ class AuthTest (unittest.TestCase): self.ts.close() self.socks.close() self.sockc.close() - + def start_server(self): host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) self.public_host_key = RSAKey(data=host_key.asbytes()) @@ -120,7 +125,7 @@ class AuthTest (unittest.TestCase): self.server = NullServer() self.assertTrue(not self.event.is_set()) self.ts.start_server(self.event, self.server) - + def verify_finished(self): self.event.wait(1.0) self.assertTrue(self.event.is_set()) @@ -156,7 +161,7 @@ class AuthTest (unittest.TestCase): self.assertTrue(issubclass(etype, AuthenticationException)) self.tc.auth_password(username='slowdive', password='pygmalion') self.verify_finished() - + def test_3_multipart_auth(self): """ verify that multipart auth works. @@ -187,7 +192,7 @@ class AuthTest (unittest.TestCase): self.assertEqual(self.got_prompts, [('Password', False)]) self.assertEqual([], remain) self.verify_finished() - + def test_5_interactive_auth_fallback(self): """ verify that a password auth attempt will fallback to "interactive" @@ -232,3 +237,24 @@ class AuthTest (unittest.TestCase): except: etype, evalue, etb = sys.exc_info() self.assertTrue(issubclass(etype, AuthenticationException)) + + def test_9_auth_non_responsive(self): + """ + verify that authentication times out if server takes to long to + respond (or never responds). + """ + auth_timeout = self.tc.auth_timeout + self.tc.auth_timeout = 2 # Reduce to 2 seconds to speed up test + + try: + self.start_server() + self.tc.connect() + try: + remain = self.tc.auth_password('unresponsive-server', 'hello') + except: + etype, evalue, etb = sys.exc_info() + self.assertTrue(issubclass(etype, AuthenticationException)) + self.assertTrue('Authentication timeout' in str(evalue)) + finally: + # Restore value + self.tc.auth_timeout = auth_timeout diff --git a/tests/test_client.py b/tests/test_client.py index 3d2e75c9..aa3ff59b 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -22,6 +22,8 @@ Some unit tests for SSHClient. from __future__ import with_statement +import gc +import platform import socket from tempfile import mkstemp import threading @@ -31,8 +33,9 @@ import warnings import os import time from tests.util import test_path + import paramiko -from paramiko.common import PY2, b +from paramiko.common import PY2 from paramiko.ssh_exception import SSHException @@ -40,6 +43,7 @@ FINGERPRINTS = { 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c', 'ssh-rsa': b'\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5', 'ecdsa-sha2-nistp256': b'\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60', + 'ssh-ed25519': b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80', } @@ -57,6 +61,9 @@ class NullServer (paramiko.ServerInterface): def check_auth_password(self, username, password): if (username == 'slowdive') and (password == 'pygmalion'): return paramiko.AUTH_SUCCESSFUL + if (username == 'slowdive') and (password == 'unresponsive-server'): + time.sleep(5) + return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED def check_auth_publickey(self, username, key): @@ -75,10 +82,20 @@ class NullServer (paramiko.ServerInterface): return paramiko.OPEN_SUCCEEDED def check_channel_exec_request(self, channel, command): - if command != 'yes': + if command != b'yes': return False return True + def check_channel_env_request(self, channel, name, value): + if name == 'INVALID_ENV': + return False + + if not hasattr(channel, 'env'): + setattr(channel, 'env', {}) + + channel.env[name] = value + return True + class SSHClientTest (unittest.TestCase): @@ -87,6 +104,12 @@ class SSHClientTest (unittest.TestCase): self.sockl.bind(('localhost', 0)) self.sockl.listen(1) self.addr, self.port = self.sockl.getsockname() + self.connect_kwargs = dict( + hostname=self.addr, + port=self.port, + username='slowdive', + look_for_keys=False, + ) self.event = threading.Event() def tearDown(self): @@ -124,7 +147,7 @@ class SSHClientTest (unittest.TestCase): self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key) # Actual connection - self.tc.connect(self.addr, self.port, username='slowdive', **kwargs) + self.tc.connect(**dict(self.connect_kwargs, **kwargs)) # Authentication successful? self.event.wait(1.0) @@ -173,7 +196,10 @@ class SSHClientTest (unittest.TestCase): """ verify that SSHClient works with an ECDSA key. """ - self._test_connection(key_filename=test_path('test_ecdsa.key')) + self._test_connection(key_filename=test_path('test_ecdsa_256.key')) + + def test_client_ed25519(self): + self._test_connection(key_filename=test_path('test_ed25519.key')) def test_3_multiple_key_files(self): """ @@ -190,15 +216,21 @@ class SSHClientTest (unittest.TestCase): for attempt, accept in ( (['rsa', 'dss'], ['dss']), # Original test #3 (['dss', 'rsa'], ['dss']), # Ordering matters sometimes, sadly - (['dss', 'rsa', 'ecdsa'], ['dss']), # Try ECDSA but fail - (['rsa', 'ecdsa'], ['ecdsa']), # ECDSA success + (['dss', 'rsa', 'ecdsa_256'], ['dss']), # Try ECDSA but fail + (['rsa', 'ecdsa_256'], ['ecdsa']), # ECDSA success ): - self._test_connection( - key_filename=[ - test_path('test_{0}.key'.format(x)) for x in attempt - ], - allowed_keys=[types_[x] for x in accept], - ) + try: + self._test_connection( + key_filename=[ + test_path('test_{0}.key'.format(x)) for x in attempt + ], + allowed_keys=[types_[x] for x in accept], + ) + finally: + # Clean up to avoid occasional gc-related deadlocks. + # TODO: use nose test generators after nose port + self.tearDown() + self.setUp() def test_multiple_key_files_failure(self): """ @@ -223,7 +255,7 @@ class SSHClientTest (unittest.TestCase): self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.assertEqual(0, len(self.tc.get_host_keys())) - self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') + self.tc.connect(password='pygmalion', **self.connect_kwargs) self.event.wait(1.0) self.assertTrue(self.event.is_set()) @@ -266,19 +298,18 @@ class SSHClientTest (unittest.TestCase): transport's packetizer) is closed. """ # Unclear why this is borked on Py3, but it is, and does not seem worth - # pursuing at the moment. + # pursuing at the moment. Skipped on PyPy because it fails on travis + # for unknown reasons, works fine locally. # XXX: It's the release of the references to e.g packetizer that fails # in py3... - if not PY2: + if not PY2 or platform.python_implementation() == "PyPy": return threading.Thread(target=self._run).start() - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) - public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.assertEqual(0, len(self.tc.get_host_keys())) - self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') + self.tc.connect(**dict(self.connect_kwargs, password='pygmalion')) self.event.wait(1.0) self.assertTrue(self.event.is_set()) @@ -289,14 +320,10 @@ class SSHClientTest (unittest.TestCase): self.tc.close() del self.tc - # hrm, sometimes p isn't cleared right away. why is that? - #st = time.time() - #while (time.time() - st < 5.0) and (p() is not None): - # time.sleep(0.1) - - # instead of dumbly waiting for the GC to collect, force a collection - # to see whether the SSHClient object is deallocated correctly - import gc + # force a collection to see whether the SSHClient object is deallocated + # correctly. 2 GCs are needed to make sure it's really collected on + # PyPy + gc.collect() gc.collect() self.assertTrue(p() is None) @@ -306,14 +333,12 @@ class SSHClientTest (unittest.TestCase): verify that an SSHClient can be used a context manager """ threading.Thread(target=self._run).start() - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) - public_host_key = paramiko.RSAKey(data=host_key.asbytes()) with paramiko.SSHClient() as tc: self.tc = tc self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.assertEquals(0, len(self.tc.get_host_keys())) - self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') + self.tc.connect(**dict(self.connect_kwargs, password='pygmalion')) self.event.wait(1.0) self.assertTrue(self.event.is_set()) @@ -335,12 +360,99 @@ class SSHClientTest (unittest.TestCase): self.tc = paramiko.SSHClient() self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key) # Connect with a half second banner timeout. + kwargs = dict(self.connect_kwargs, banner_timeout=0.5) self.assertRaises( paramiko.SSHException, self.tc.connect, - self.addr, - self.port, - username='slowdive', + **kwargs + ) + + def test_8_auth_trickledown(self): + """ + Failed key auth doesn't prevent subsequent pw auth from succeeding + """ + # NOTE: re #387, re #394 + # If pkey module used within Client._auth isn't correctly handling auth + # errors (e.g. if it allows things like ValueError to bubble up as per + # midway through #394) client.connect() will fail (at key load step) + # instead of succeeding (at password step) + kwargs = dict( + # Password-protected key whose passphrase is not 'pygmalion' (it's + # 'television' as per tests/test_pkey.py). NOTE: must use + # key_filename, loading the actual key here with PKey will except + # immediately; we're testing the try/except crap within Client. + key_filename=[test_path('test_rsa_password.key')], + # Actual password for default 'slowdive' user password='pygmalion', - banner_timeout=0.5 ) + self._test_connection(**kwargs) + + def test_9_auth_timeout(self): + """ + verify that the SSHClient has a configurable auth timeout + """ + threading.Thread(target=self._run).start() + host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + public_host_key = paramiko.RSAKey(data=host_key.asbytes()) + + self.tc = paramiko.SSHClient() + self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key) + # Connect with a half second auth timeout + kwargs = dict(self.connect_kwargs, password='unresponsive-server', auth_timeout=0.5) + self.assertRaises( + paramiko.AuthenticationException, + self.tc.connect, + **kwargs + ) + + def test_update_environment(self): + """ + Verify that environment variables can be set by the client. + """ + threading.Thread(target=self._run).start() + + self.tc = paramiko.SSHClient() + self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self.assertEqual(0, len(self.tc.get_host_keys())) + self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') + + self.event.wait(1.0) + self.assertTrue(self.event.isSet()) + self.assertTrue(self.ts.is_active()) + + target_env = {b'A': b'B', b'C': b'd'} + + self.tc.exec_command('yes', environment=target_env) + schan = self.ts.accept(1.0) + self.assertEqual(target_env, getattr(schan, 'env', {})) + schan.close() + + # Cannot use assertRaises in context manager mode as it is not supported + # in Python 2.6. + try: + # Verify that a rejection by the server can be detected + self.tc.exec_command('yes', environment={b'INVALID_ENV': b''}) + except SSHException as e: + self.assertTrue('INVALID_ENV' in str(e), + 'Expected variable name in error message') + self.assertTrue(isinstance(e.args[1], SSHException), + 'Expected original SSHException in exception') + else: + self.assertFalse(False, 'SSHException was not thrown.') + + + def test_missing_key_policy_accepts_classes_or_instances(self): + """ + Client.missing_host_key_policy() can take classes or instances. + """ + # AN ACTUAL UNIT TEST?! GOOD LORD + # (But then we have to test a private API...meh.) + client = paramiko.SSHClient() + # Default + assert isinstance(client._policy, paramiko.RejectPolicy) + # Hand in an instance (classic behavior) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + assert isinstance(client._policy, paramiko.AutoAddPolicy) + # Hand in just the class (new behavior) + client.set_missing_host_key_policy(paramiko.AutoAddPolicy) + assert isinstance(client._policy, paramiko.AutoAddPolicy) diff --git a/tests/test_ecdsa.key b/tests/test_ecdsa_256.key index 42d44734..42d44734 100644 --- a/tests/test_ecdsa.key +++ b/tests/test_ecdsa_256.key diff --git a/tests/test_ecdsa_384.key b/tests/test_ecdsa_384.key new file mode 100644 index 00000000..796bf417 --- /dev/null +++ b/tests/test_ecdsa_384.key @@ -0,0 +1,6 @@ +-----BEGIN EC PRIVATE KEY----- +MIGkAgEBBDBDdO8IXvlLJgM7+sNtPl7tI7FM5kzuEUEEPRjXIPQM7mISciwJPBt+ +y43EuG8nL4mgBwYFK4EEACKhZANiAAQWxom0C1vQAGYhjdoREMVmGKBWlisDdzyk +mgyUjKpiJ9WfbIEVLsPGP8OdNjhr1y/8BZNIts+dJd6VmYw+4HzB+4F+U1Igs8K0 +JEvh59VNkvWheViadDXCM2MV8Nq+DNg= +-----END EC PRIVATE KEY----- diff --git a/tests/test_ecdsa_521.key b/tests/test_ecdsa_521.key new file mode 100644 index 00000000..b87dc90f --- /dev/null +++ b/tests/test_ecdsa_521.key @@ -0,0 +1,7 @@ +-----BEGIN EC PRIVATE KEY----- +MIHcAgEBBEIAprQtAS3OF6iVUkT8IowTHWicHzShGgk86EtuEXvfQnhZFKsWm6Jo +iqAr1yEaiuI9LfB3Xs8cjuhgEEfbduYr/f6gBwYFK4EEACOhgYkDgYYABACaOaFL +ZGuxa5AW16qj6VLypFbLrEWrt9AZUloCMefxO8bNLjK/O5g0rAVasar1TnyHE9qj +4NwzANZASWjQNbc4MAG8vzqezFwLIn/kNyNTsXNfqEko9OgHZknlj2Z79dwTJcRA +L4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA== +-----END EC PRIVATE KEY----- diff --git a/tests/test_ecdsa_password.key b/tests/test_ecdsa_password_256.key index eb7910ed..eb7910ed 100644 --- a/tests/test_ecdsa_password.key +++ b/tests/test_ecdsa_password_256.key diff --git a/tests/test_ecdsa_password_384.key b/tests/test_ecdsa_password_384.key new file mode 100644 index 00000000..eba33c14 --- /dev/null +++ b/tests/test_ecdsa_password_384.key @@ -0,0 +1,9 @@ +-----BEGIN EC PRIVATE KEY----- +Proc-Type: 4,ENCRYPTED +DEK-Info: AES-128-CBC,7F7B5DBE4CE040D822441AFE7A023A1D + +y/d6tGonAXYgJniQoFCdto+CuT1y1s41qzwNLN9YdNq/+R/dtQvZAaOuGtHJRFE6 +wWabhY1bSjavVPT2z1Zw1jhDJX5HGrf9LDoyORKtUWtUJoUvGdYLHbcg8Q+//WRf +R0A01YuSw1SJX0a225S1aRcsDAk1k5F8EMb8QzSSDgjAOI8ldQF35JI+ofNSGjgS +BPOlorQXTJxDOGmokw/Wql6MbhajXKPO39H2Z53W88U= +-----END EC PRIVATE KEY----- diff --git a/tests/test_ecdsa_password_521.key b/tests/test_ecdsa_password_521.key new file mode 100644 index 00000000..5986b930 --- /dev/null +++ b/tests/test_ecdsa_password_521.key @@ -0,0 +1,10 @@ +-----BEGIN EC PRIVATE KEY----- +Proc-Type: 4,ENCRYPTED +DEK-Info: AES-128-CBC,AEB2DE62C65D1A88C4940A3476B2F10A + +5kNk/FFPbHa0402QTrgpIT28uirJ4Amvb2/ryOEyOCe0NPbTLCqlQekj2RFYH2Un +pgCLUDkelKQv4pyuK8qWS7R+cFjE/gHHCPUWkK3djZUC8DKuA9lUKeQIE+V1vBHc +L5G+MpoYrPgaydcGx/Uqnc/kVuZx1DXLwrGGtgwNROVBtmjXC9EdfeXHLL1y0wvH +paNgacJpUtgqJEmiehf7eL/eiReegG553rZK3jjfboGkREUaKR5XOgamiKUtgKoc +sMpImVYCsRKd/9RI+VOqErZaEvy/9j0Ye3iH32wGOaA= +-----END EC PRIVATE KEY----- diff --git a/tests/test_ed25519.key b/tests/test_ed25519.key new file mode 100644 index 00000000..eb9f94c2 --- /dev/null +++ b/tests/test_ed25519.key @@ -0,0 +1,8 @@ +-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW +QyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXwAAAKhjwAdrY8AH +awAAAAtzc2gtZWQyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXw +AAAEA9tGQi2IrprbOSbDCF+RmAHd6meNSXBUQ2ekKXm4/8xnr1K9komH/1WBIvQbbtvnFV +hryd62EfcgRFuLRiokNfAAAAI2FsZXhfZ2F5bm9yQEFsZXhzLU1hY0Jvb2stQWlyLmxvY2 +FsAQI= +-----END OPENSSH PRIVATE KEY----- diff --git a/tests/test_ed25519_password.key b/tests/test_ed25519_password.key new file mode 100644 index 00000000..d178aaae --- /dev/null +++ b/tests/test_ed25519_password.key @@ -0,0 +1,8 @@ +-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jYmMAAAAGYmNyeXB0AAAAGAAAABDaKD4ac7 +kieb+UfXaLaw68AAAAEAAAAAEAAAAzAAAAC3NzaC1lZDI1NTE5AAAAIOQn7fjND5ozMSV3 +CvbEtIdT73hWCMRjzS/lRdUDw50xAAAAsE8kLGyYBnl9ihJNqv378y6mO3SkzrDbWXOnK6 +ij0vnuTAvcqvWHAnyu6qBbplu/W2m55ZFeAItgaEcV2/V76sh/sAKlERqrLFyXylN0xoOW +NU5+zU08aTlbSKGmeNUU2xE/xfJq12U9XClIRuVUkUpYANxNPbmTRpVrbD3fgXMhK97Jrb +DEn8ca1IqMPiYmd/hpe5+tq3OxyRljXjCUFWTnqkp9VvUdzSTdSGZHsW9i +-----END OPENSSH PRIVATE KEY----- diff --git a/tests/test_file.py b/tests/test_file.py index a6ff69e9..7fab6985 100755 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -70,9 +70,9 @@ class BufferedFileTest (unittest.TestCase): def test_2_readline(self): f = LoopbackFile('r+U') - f.write(b'First line.\nSecond line.\r\nThird line.\n' + + f.write(b'First line.\nSecond line.\r\nThird line.\n' + b'Fourth line.\nFinal line non-terminated.') - + self.assertEqual(f.readline(), 'First line.\n') # universal newline mode should convert this linefeed: self.assertEqual(f.readline(), 'Second line.\n') @@ -165,7 +165,28 @@ class BufferedFileTest (unittest.TestCase): f.write(buffer(b'Too small.')) f.close() + def test_9_readable(self): + f = LoopbackFile('r') + self.assertTrue(f.readable()) + self.assertFalse(f.writable()) + self.assertFalse(f.seekable()) + f.close() + + def test_A_writable(self): + f = LoopbackFile('w') + self.assertTrue(f.writable()) + self.assertFalse(f.readable()) + self.assertFalse(f.seekable()) + f.close() + + def test_B_readinto(self): + data = bytearray(5) + f = LoopbackFile('r+') + f._write(b"hello") + f.readinto(data) + self.assertEqual(data, b'hello') + f.close() + if __name__ == '__main__': from unittest import main main() - diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index 96c268d9..bc220108 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -104,9 +104,11 @@ class GSSAPITest(unittest.TestCase): status = gss_srv_ctxt.verify_mic(mic_msg, mic_token) self.assertEquals(0, status) else: - gss_flags = sspicon.ISC_REQ_INTEGRITY |\ - sspicon.ISC_REQ_MUTUAL_AUTH |\ - sspicon.ISC_REQ_DELEGATE + gss_flags = ( + sspicon.ISC_REQ_INTEGRITY | + sspicon.ISC_REQ_MUTUAL_AUTH | + sspicon.ISC_REQ_DELEGATE + ) # Initialize a GSS-API context. target_name = "host/" + socket.getfqdn(targ_name) gss_ctxt = sspi.ClientAuth("Kerberos", diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py index 0ee1bbf0..2c7ceeb9 100644 --- a/tests/test_hostkeys.py +++ b/tests/test_hostkeys.py @@ -31,6 +31,7 @@ test_hosts_file = """\ secure.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA1PD6U2/TVxET6lkpKhOk5r\ 9q/kAYG6sP9f5zuUYP8i7FOFp/6ncCEbbtg/lB+A3iidyxoSWl+9jtoyyDOOVX4UIDV9G11Ml8om3\ D+jrpI9cycZHqilK0HmxDeCuxbwyMuaCygU9gS2qoRvNLWZk70OpIKSSpBo0Wl3/XUmz9uhc= +broken.example.com ssh-rsa AAAA happy.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31M\ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\ 5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M= @@ -114,3 +115,15 @@ class HostKeysTest (unittest.TestCase): self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp) fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper() self.assertEqual(b'4478F0B9A23CC5182009FF755BC1D26C', fp) + + def test_delitem(self): + hostdict = paramiko.HostKeys('hostfile.temp') + target = 'happy.example.com' + entry = hostdict[target] # will KeyError if not present + del hostdict[target] + try: + entry = hostdict[target] + except KeyError: + pass # Good + else: + assert False, "Entry was not deleted from HostKeys on delitem!" diff --git a/tests/test_kex.py b/tests/test_kex.py index 56f1b7c7..b7f588f7 100644 --- a/tests/test_kex.py +++ b/tests/test_kex.py @@ -20,20 +20,33 @@ Some unit tests for the key exchange protocols. """ -from binascii import hexlify +from binascii import hexlify, unhexlify import os import unittest import paramiko.util from paramiko.kex_group1 import KexGroup1 -from paramiko.kex_gex import KexGex +from paramiko.kex_gex import KexGex, KexGexSHA256 from paramiko import Message from paramiko.common import byte_chr +from paramiko.kex_ecdh_nist import KexNistp256 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import ec def dummy_urandom(n): return byte_chr(0xcc) * n +def dummy_generate_key_pair(obj): + private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037 + public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989" + public_key_numbers_obj = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)) + obj.P = ec.EllipticCurvePrivateNumbers(private_value=private_key_value, public_numbers=public_key_numbers_obj).private_key(default_backend()) + if obj.transport.server_mode: + obj.Q_S = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)).public_key(default_backend()) + return + obj.Q_C = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256R1(), unhexlify(public_key_numbers)).public_key(default_backend()) + class FakeKey (object): def __str__(self): @@ -93,9 +106,12 @@ class KexTest (unittest.TestCase): def setUp(self): self._original_urandom = os.urandom os.urandom = dummy_urandom + self._original_generate_key_pair = KexNistp256._generate_key_pair + KexNistp256._generate_key_pair = dummy_generate_key_pair def tearDown(self): os.urandom = self._original_urandom + KexNistp256._generate_key_pair = self._original_generate_key_pair def test_1_group1_client(self): transport = FakeTransport() @@ -252,3 +268,160 @@ class KexTest (unittest.TestCase): self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) + + def test_7_gex_sha256_client(self): + transport = FakeTransport() + transport.server_mode = False + kex = KexGexSHA256(transport) + kex.start_kex() + x = b'22000004000000080000002000' + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect) + + msg = Message() + msg.add_mpint(FakeModulusPack.P) + msg.add_mpint(FakeModulusPack.G) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) + x = b'20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4' + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect) + + msg = Message() + msg.add_string('fake-host-key') + msg.add_mpint(69) + msg.add_string('fake-sig') + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg) + H = b'AD1A9365A67B4496F05594AD1BF656E3CDA0851289A4C1AFF549FEAE50896DF4' + self.assertEqual(self.K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify) + self.assertTrue(transport._activated) + + def test_8_gex_sha256_old_client(self): + transport = FakeTransport() + transport.server_mode = False + kex = KexGexSHA256(transport) + kex.start_kex(_test_old_style=True) + x = b'1E00000800' + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect) + + msg = Message() + msg.add_mpint(FakeModulusPack.P) + msg.add_mpint(FakeModulusPack.G) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) + x = b'20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4' + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect) + + msg = Message() + msg.add_string('fake-host-key') + msg.add_mpint(69) + msg.add_string('fake-sig') + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg) + H = b'518386608B15891AE5237DEE08DCADDE76A0BCEFCE7F6DB3AD66BC41D256DFE5' + self.assertEqual(self.K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify) + self.assertTrue(transport._activated) + + def test_9_gex_sha256_server(self): + transport = FakeTransport() + transport.server_mode = True + kex = KexGexSHA256(transport) + kex.start_kex() + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect) + + msg = Message() + msg.add_int(1024) + msg.add_int(2048) + msg.add_int(4096) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg) + x = b'1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102' + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect) + + msg = Message() + msg.add_mpint(12345) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) + K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 + H = b'CCAC0497CF0ABA1DBF55E1A3995D17F4CC31824B0E8D95CDF8A06F169D050D80' + x = b'210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967' + self.assertEqual(K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertTrue(transport._activated) + + def test_10_gex_sha256_server_with_old_client(self): + transport = FakeTransport() + transport.server_mode = True + kex = KexGexSHA256(transport) + kex.start_kex() + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect) + + msg = Message() + msg.add_int(2048) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg) + x = b'1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102' + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect) + + msg = Message() + msg.add_mpint(12345) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) + K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 + H = b'3DDD2AD840AD095E397BA4D0573972DC60F6461FD38A187CACA6615A5BC8ADBB' + x = b'210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967' + self.assertEqual(K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertTrue(transport._activated) + + def test_11_kex_nistp256_client(self): + K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 + transport = FakeTransport() + transport.server_mode = False + kex = KexNistp256(transport) + kex.start_kex() + self.assertEqual((paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY,), transport._expect) + + #fake reply + msg = Message() + msg.add_string('fake-host-key') + Q_S = unhexlify("043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210") + msg.add_string(Q_S) + msg.add_string('fake-sig') + msg.rewind() + kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_REPLY, msg) + H = b'BAF7CE243A836037EB5D2221420F35C02B9AB6C957FE3BDE3369307B9612570A' + self.assertEqual(K, kex.transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify) + self.assertTrue(transport._activated) + + def test_12_kex_nistp256_server(self): + K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 + transport = FakeTransport() + transport.server_mode = True + kex = KexNistp256(transport) + kex.start_kex() + self.assertEqual((paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT,), transport._expect) + + #fake init + msg=Message() + Q_C = unhexlify("043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210") + H = b'2EF4957AFD530DD3F05DBEABF68D724FACC060974DA9704F2AEE4C3DE861E7CA' + msg.add_string(Q_C) + msg.rewind() + kex.parse_next(paramiko.kex_ecdh_nist._MSG_KEXECDH_INIT, msg) + self.assertEqual(K, transport._K) + self.assertTrue(transport._activated) + self.assertEqual(H, hexlify(transport._H).upper()) diff --git a/tests/test_message.py b/tests/test_message.py index f308c037..f18cae90 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -92,12 +92,12 @@ class MessageTest (unittest.TestCase): def test_4_misc(self): msg = Message(self.__d) - self.assertEqual(msg.get_int(), 5) - self.assertEqual(msg.get_int(), 0x1122334455) - self.assertEqual(msg.get_int(), 0xf00000000000000000) + self.assertEqual(msg.get_adaptive_int(), 5) + self.assertEqual(msg.get_adaptive_int(), 0x1122334455) + self.assertEqual(msg.get_adaptive_int(), 0xf00000000000000000) self.assertEqual(msg.get_so_far(), self.__d[:29]) self.assertEqual(msg.get_remainder(), self.__d[29:]) msg.rewind() - self.assertEqual(msg.get_int(), 5) + self.assertEqual(msg.get_adaptive_int(), 5) self.assertEqual(msg.get_so_far(), self.__d[:4]) self.assertEqual(msg.get_remainder(), self.__d[4:]) diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py index 8faec03c..02173292 100644 --- a/tests/test_packetizer.py +++ b/tests/test_packetizer.py @@ -20,12 +20,14 @@ Some unit tests for the ssh2 protocol in Transport. """ +import sys import unittest from hashlib import sha1 -from tests.loop import LoopSocket +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes -from Crypto.Cipher import AES +from tests.loop import LoopSocket from paramiko import Message, Packetizer, util from paramiko.common import byte_chr, zero_byte @@ -33,7 +35,6 @@ from paramiko.common import byte_chr, zero_byte x55 = byte_chr(0x55) x1f = byte_chr(0x1f) - class PacketizerTest (unittest.TestCase): def test_1_write(self): @@ -43,8 +44,12 @@ class PacketizerTest (unittest.TestCase): p = Packetizer(wsock) p.set_log(util.get_logger('paramiko.transport')) p.set_hexdump(True) - cipher = AES.new(zero_byte * 16, AES.MODE_CBC, x55 * 16) - p.set_outbound_cipher(cipher, 16, sha1, 12, x1f * 20) + encryptor = Cipher( + algorithms.AES(zero_byte * 16), + modes.CBC(x55 * 16), + backend=default_backend() + ).encryptor() + p.set_outbound_cipher(encryptor, 16, sha1, 12, x1f * 20) # message has to be at least 16 bytes long, so we'll have at least one # block of data encrypted that contains zero random padding bytes @@ -66,8 +71,12 @@ class PacketizerTest (unittest.TestCase): p = Packetizer(rsock) p.set_log(util.get_logger('paramiko.transport')) p.set_hexdump(True) - cipher = AES.new(zero_byte * 16, AES.MODE_CBC, x55 * 16) - p.set_inbound_cipher(cipher, 16, sha1, 12, x1f * 20) + decryptor = Cipher( + algorithms.AES(zero_byte * 16), + modes.CBC(x55 * 16), + backend=default_backend() + ).decryptor() + p.set_inbound_cipher(decryptor, 16, sha1, 12, x1f * 20) wsock.send(b'\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0\x90\xd2\x16\x56\x0d\x71\x73\x61\x38\x7c\x4c\x3d\xfb\x97\x7d\xe2\x6e\x03\xb1\xa0\xc2\x1c\xd6\x41\x41\x4c\xb4\x59') cmd, m = p.read_message() self.assertEqual(100, cmd) @@ -76,14 +85,20 @@ class PacketizerTest (unittest.TestCase): self.assertEqual(900, m.get_int()) def test_3_closed(self): + if sys.platform.startswith("win"): # no SIGALRM on windows + return rsock = LoopSocket() wsock = LoopSocket() rsock.link(wsock) p = Packetizer(wsock) p.set_log(util.get_logger('paramiko.transport')) p.set_hexdump(True) - cipher = AES.new(zero_byte * 16, AES.MODE_CBC, x55 * 16) - p.set_outbound_cipher(cipher, 16, sha1, 12, x1f * 20) + encryptor = Cipher( + algorithms.AES(zero_byte * 16), + modes.CBC(x55 * 16), + backend=default_backend() + ).encryptor() + p.set_outbound_cipher(encryptor, 16, sha1, 12, x1f * 20) # message has to be at least 16 bytes long, so we'll have at least one # block of data encrypted that contains zero random padding bytes @@ -99,9 +114,13 @@ class PacketizerTest (unittest.TestCase): import signal class TimeoutError(Exception): - pass + def __init__(self, error_message): + if hasattr(errno, 'ETIME'): + self.message = os.sterror(errno.ETIME) + else: + self.messaage = error_message - def timeout(seconds=1, error_message=os.strerror(errno.ETIME)): + def timeout(seconds=1, error_message='Timer expired'): def decorator(func): def _handle_timeout(signum, frame): raise TimeoutError(error_message) diff --git a/tests/test_pkey.py b/tests/test_pkey.py index f673254f..a26ff170 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Copyright (C) 2003-2009 Robey Pointer <robeypointer@gmail.com> # # This file is part of paramiko. @@ -24,56 +25,61 @@ import unittest import os from binascii import hexlify from hashlib import md5 +import base64 -from paramiko import RSAKey, DSSKey, ECDSAKey, Message, util -from paramiko.py3compat import StringIO, byte_chr, b, bytes +from paramiko import RSAKey, DSSKey, ECDSAKey, Ed25519Key, Message, util +from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2 from tests.util import test_path # from openssh's ssh-keygen PUB_RSA = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c=' PUB_DSS = 'ssh-dss AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgwj/oLjYbVtjAy6pl/h0EKCWx2rf1IetyNsTxWrniA9I6HeDj65X1FyDkg6g8tvCnaNB8Xp/UUhuzHuGsMIipRxBxw9LF608EqZcj1E3ytktoW5B5OcjrkEoz3xG7C+rpIjYvAAAAFQDwz4UnmsGiSNu5iqjn3uTzwUpshwAAAIEAkxfFeY8P2wZpDjX0MimZl5wkoFQDL25cPzGBuB4OnB8NoUk/yjAHIIpEShw8V+LzouMK5CTJQo5+Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg4Ok10+XFDxlqZo8Y+wAAACARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmnjO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacIBlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgE=' -PUB_ECDSA = 'ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJSPZm3ZWkvk/Zx8WP+fZRZ5/NBBHnGQwR6uIC6XHGPDIHuWUzIjAwA0bzqkOUffEsbLe+uQgKl5kbc/L8KA/eo=' +PUB_ECDSA_256 = 'ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJSPZm3ZWkvk/Zx8WP+fZRZ5/NBBHnGQwR6uIC6XHGPDIHuWUzIjAwA0bzqkOUffEsbLe+uQgKl5kbc/L8KA/eo=' +PUB_ECDSA_384 = 'ecdsa-sha2-nistp384 AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBBbGibQLW9AAZiGN2hEQxWYYoFaWKwN3PKSaDJSMqmIn1Z9sgRUuw8Y/w502OGvXL/wFk0i2z50l3pWZjD7gfMH7gX5TUiCzwrQkS+Hn1U2S9aF5WJp0NcIzYxXw2r4M2A==' +PUB_ECDSA_521 = 'ecdsa-sha2-nistp521 AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBACaOaFLZGuxa5AW16qj6VLypFbLrEWrt9AZUloCMefxO8bNLjK/O5g0rAVasar1TnyHE9qj4NwzANZASWjQNbc4MAG8vzqezFwLIn/kNyNTsXNfqEko9OgHZknlj2Z79dwTJcRAL4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA==' FINGER_RSA = '1024 60:73:38:44:cb:51:86:65:7f:de:da:a2:2b:5a:57:d5' FINGER_DSS = '1024 44:78:f0:b9:a2:3c:c5:18:20:09:ff:75:5b:c1:d2:6c' -FINGER_ECDSA = '256 25:19:eb:55:e6:a1:47:ff:4f:38:d2:75:6f:a5:d5:60' +FINGER_ECDSA_256 = '256 25:19:eb:55:e6:a1:47:ff:4f:38:d2:75:6f:a5:d5:60' +FINGER_ECDSA_384 = '384 c1:8d:a0:59:09:47:41:8e:a8:a6:07:01:29:23:b4:65' +FINGER_ECDSA_521 = '521 44:58:22:52:12:33:16:0e:ce:0e:be:2c:7c:7e:cc:1e' SIGNED_RSA = '20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8' RSA_PRIVATE_OUT = """\ -----BEGIN RSA PRIVATE KEY----- -MIICXAIBAAKCAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAM -s6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZ -v3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4cCASMC -ggCAEiI6plhqipt4P05L3PYr0pHZq2VPEbE4k9eI/gRKo/c1VJxY3DJnc1cenKsk -trQRtW3OxCEufqsX5PNec6VyKkW+Ox6beJjMKm4KF8ZDpKi9Nw6MdX3P6Gele9D9 -+ieyhVFljrnAqcXsgChTBOYlL2imqCs3qRGAJ3cMBIAx3VsCQQD3pIFVYW398kE0 -n0e1icEpkbDRV4c5iZVhu8xKy2yyfy6f6lClSb2+Ub9uns7F3+b5v0pYSHbE9+/r -OpRq83AfAkEA2rMZlr8SnMXgnyka2LuggA9QgMYy18hyao1dUxySubNDa9N+q2QR -mwDisTUgRFHKIlDHoQmzPbXAmYZX1YlDmQJBAPCRLS5epV0XOAc7pL762OaNhzHC -veAfQKgVhKBt105PqaKpGyQ5AXcNlWQlPeTK4GBTbMrKDPna6RBkyrEJvV8CQBK+ -5O+p+kfztCrmRCE0p1tvBuZ3Y3GU1ptrM+KNa6mEZN1bRV8l1Z+SXJLYqv6Kquz/ -nBUeFq2Em3rfoSDugiMCQDyG3cxD5dKX3IgkhLyBWls/FLDk4x/DQ+NUTu0F1Cu6 -JJye+5ARLkL0EweMXf0tmIYfWItDLsWB0fKg/56h0js= +MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz +oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/ +d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB +gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0 +EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon +soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H +tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU +avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA +4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g +H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv +qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV +HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc +nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7 -----END RSA PRIVATE KEY----- """ DSS_PRIVATE_OUT = """\ -----BEGIN DSA PRIVATE KEY----- -MIIBvgIBAAKCAIEA54GmA2d9HOv+3CYBBG7ZfBYCncIW2tWe6Dqzp+DCP+guNhtW -2MDLqmX+HQQoJbHat/Uh63I2xPFaueID0jod4OPrlfUXIOSDqDy28Kdo0Hxen9RS -G7Me4awwiKlHEHHD0sXrTwSplyPUTfK2S2hbkHk5yOuQSjPfEbsL6ukiNi8CFQDw -z4UnmsGiSNu5iqjn3uTzwUpshwKCAIEAkxfFeY8P2wZpDjX0MimZl5wkoFQDL25c -PzGBuB4OnB8NoUk/yjAHIIpEShw8V+LzouMK5CTJQo5+Ngw3qIch/WgRmMHy4kBq -1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg4Ok10+X -FDxlqZo8Y+wCggCARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lY -ukmnjO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+N -wacIBlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgECFGI9 -QPSch9pT9XHqn+1rZ4bK+QGA +MIIBuwIBAAKBgQDngaYDZ30c6/7cJgEEbtl8FgKdwhba1Z7oOrOn4MI/6C42G1bY +wMuqZf4dBCglsdq39SHrcjbE8Vq54gPSOh3g4+uV9Rcg5IOoPLbwp2jQfF6f1FIb +sx7hrDCIqUcQccPSxetPBKmXI9RN8rZLaFuQeTnI65BKM98Ruwvq6SI2LwIVAPDP +hSeawaJI27mKqOfe5PPBSmyHAoGBAJMXxXmPD9sGaQ419DIpmZecJKBUAy9uXD8x +gbgeDpwfDaFJP8owByCKREocPFfi86LjCuQkyUKOfjYMN6iHIf1oEZjB8uJAatUr +FzI0ArXtUqOhwTLwTyFuUojE5own2WYsOAGByvgfyWjsGhvckYNhI4ODpNdPlxQ8 +ZamaPGPsAoGARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmn +jO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacI +BlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgECFGI9QPSc +h9pT9XHqn+1rZ4bK+QGA -----END DSA PRIVATE KEY----- """ -ECDSA_PRIVATE_OUT = """\ +ECDSA_PRIVATE_OUT_256 = """\ -----BEGIN EC PRIVATE KEY----- MHcCAQEEIKB6ty3yVyKEnfF/zprx0qwC76MsMlHY4HXCnqho2eKioAoGCCqGSM49 AwEHoUQDQgAElI9mbdlaS+T9nHxY/59lFnn80EEecZDBHq4gLpccY8Mge5ZTMiMD @@ -81,17 +87,32 @@ ADRvOqQ5R98Sxst765CAqXmRtz8vwoD96g== -----END EC PRIVATE KEY----- """ -x1234 = b'\x01\x02\x03\x04' +ECDSA_PRIVATE_OUT_384 = """\ +-----BEGIN EC PRIVATE KEY----- +MIGkAgEBBDBDdO8IXvlLJgM7+sNtPl7tI7FM5kzuEUEEPRjXIPQM7mISciwJPBt+ +y43EuG8nL4mgBwYFK4EEACKhZANiAAQWxom0C1vQAGYhjdoREMVmGKBWlisDdzyk +mgyUjKpiJ9WfbIEVLsPGP8OdNjhr1y/8BZNIts+dJd6VmYw+4HzB+4F+U1Igs8K0 +JEvh59VNkvWheViadDXCM2MV8Nq+DNg= +-----END EC PRIVATE KEY----- +""" +ECDSA_PRIVATE_OUT_521 = """\ +-----BEGIN EC PRIVATE KEY----- +MIHcAgEBBEIAprQtAS3OF6iVUkT8IowTHWicHzShGgk86EtuEXvfQnhZFKsWm6Jo +iqAr1yEaiuI9LfB3Xs8cjuhgEEfbduYr/f6gBwYFK4EEACOhgYkDgYYABACaOaFL +ZGuxa5AW16qj6VLypFbLrEWrt9AZUloCMefxO8bNLjK/O5g0rAVasar1TnyHE9qj +4NwzANZASWjQNbc4MAG8vzqezFwLIn/kNyNTsXNfqEko9OgHZknlj2Z79dwTJcRA +L4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA== +-----END EC PRIVATE KEY----- +""" -class KeyTest (unittest.TestCase): +x1234 = b'\x01\x02\x03\x04' - def setUp(self): - pass +TEST_KEY_BYTESTR_2 = '\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x81\x00\xd3\x8fV\xea\x07\x85\xa6k%\x8d<\x1f\xbc\x8dT\x98\xa5\x96$\xf3E#\xbe>\xbc\xd2\x93\x93\x87f\xceD\x18\xdb \x0c\xb3\xa1a\x96\xf8e#\xcc\xacS\x8a#\xefVlE\x83\x1epv\xc1o\x17M\xef\xdf\x89DUXL\xa6\x8b\xaa<\x06\x10\xd7\x93w\xec\xaf\xe2\xaf\x95\xd8\xfb\xd9\xbfw\xcb\x9f0)#y{\x10\x90\xaa\x85l\tPru\x8c\t\x19\xce\xa0\xf1\xd2\xdc\x8e/\x8b\xa8f\x9c0\xdey\x84\xd2F\xf7\xcbmm\x1f\x87' +TEST_KEY_BYTESTR_3 = '\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\x07k%<\x1fT$E#>ғfD\x18 \x0cae#̬S#VlE\x1epvo\x17M߉DUXL<\x06\x10דw\u2bd5ٿw˟0)#y{\x10l\tPru\t\x19Π\u070e/f0yFmm\x1f' - def tearDown(self): - pass +class KeyTest(unittest.TestCase): def test_1_generate_key_bytes(self): key = util.generate_key_bytes(md5, x1234, 'happy birthday', 30) exp = b'\x61\xE1\xF2\x72\xF4\xC1\xC4\x56\x15\x86\xBD\x32\x24\x98\xC0\xE9\x24\x67\x27\x80\xF4\x7B\xB3\x7D\xDA\x7D\x54\x01\x9E\x64' @@ -121,7 +142,7 @@ class KeyTest (unittest.TestCase): self.assertEqual(exp_rsa, my_rsa) self.assertEqual(PUB_RSA.split()[1], key.get_base64()) self.assertEqual(1024, key.get_bits()) - + def test_4_load_dss(self): key = DSSKey.from_private_key_file(test_path('test_dss.key')) self.assertEqual('ssh-dss', key.get_name()) @@ -205,43 +226,72 @@ class KeyTest (unittest.TestCase): msg.rewind() self.assertTrue(key.verify_ssh_sig(b'jerri blank', msg)) - def test_10_load_ecdsa(self): - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key')) + def test_C_generate_ecdsa(self): + key = ECDSAKey.generate() + msg = key.sign_ssh_data(b'jerri blank') + msg.rewind() + self.assertTrue(key.verify_ssh_sig(b'jerri blank', msg)) + self.assertEqual(key.get_bits(), 256) + self.assertEqual(key.get_name(), 'ecdsa-sha2-nistp256') + + key = ECDSAKey.generate(bits=256) + msg = key.sign_ssh_data(b'jerri blank') + msg.rewind() + self.assertTrue(key.verify_ssh_sig(b'jerri blank', msg)) + self.assertEqual(key.get_bits(), 256) + self.assertEqual(key.get_name(), 'ecdsa-sha2-nistp256') + + key = ECDSAKey.generate(bits=384) + msg = key.sign_ssh_data(b'jerri blank') + msg.rewind() + self.assertTrue(key.verify_ssh_sig(b'jerri blank', msg)) + self.assertEqual(key.get_bits(), 384) + self.assertEqual(key.get_name(), 'ecdsa-sha2-nistp384') + + key = ECDSAKey.generate(bits=521) + msg = key.sign_ssh_data(b'jerri blank') + msg.rewind() + self.assertTrue(key.verify_ssh_sig(b'jerri blank', msg)) + self.assertEqual(key.get_bits(), 521) + self.assertEqual(key.get_name(), 'ecdsa-sha2-nistp521') + + def test_10_load_ecdsa_256(self): + key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key')) self.assertEqual('ecdsa-sha2-nistp256', key.get_name()) - exp_ecdsa = b(FINGER_ECDSA.split()[1].replace(':', '')) + exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) self.assertEqual(exp_ecdsa, my_ecdsa) - self.assertEqual(PUB_ECDSA.split()[1], key.get_base64()) + self.assertEqual(PUB_ECDSA_256.split()[1], key.get_base64()) self.assertEqual(256, key.get_bits()) s = StringIO() key.write_private_key(s) - self.assertEqual(ECDSA_PRIVATE_OUT, s.getvalue()) + self.assertEqual(ECDSA_PRIVATE_OUT_256, s.getvalue()) s.seek(0) key2 = ECDSAKey.from_private_key(s) self.assertEqual(key, key2) - def test_11_load_ecdsa_password(self): - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password.key'), b'television') + def test_11_load_ecdsa_password_256(self): + key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_256.key'), b'television') self.assertEqual('ecdsa-sha2-nistp256', key.get_name()) - exp_ecdsa = b(FINGER_ECDSA.split()[1].replace(':', '')) + exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) self.assertEqual(exp_ecdsa, my_ecdsa) - self.assertEqual(PUB_ECDSA.split()[1], key.get_base64()) + self.assertEqual(PUB_ECDSA_256.split()[1], key.get_base64()) self.assertEqual(256, key.get_bits()) - def test_12_compare_ecdsa(self): + def test_12_compare_ecdsa_256(self): # verify that the private & public keys compare equal - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key')) + key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key')) self.assertEqual(key, key) pub = ECDSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) self.assertTrue(not pub.can_sign()) self.assertEqual(key, pub) - def test_13_sign_ecdsa(self): + def test_13_sign_ecdsa_256(self): # verify that the rsa private key can sign and verify - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key')) + key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key')) msg = key.sign_ssh_data(b'ice weasels') self.assertTrue(type(msg) is Message) msg.rewind() @@ -255,6 +305,109 @@ class KeyTest (unittest.TestCase): pub = ECDSAKey(data=key.asbytes()) self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg)) + def test_14_load_ecdsa_384(self): + key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key')) + self.assertEqual('ecdsa-sha2-nistp384', key.get_name()) + exp_ecdsa = b(FINGER_ECDSA_384.split()[1].replace(':', '')) + my_ecdsa = hexlify(key.get_fingerprint()) + self.assertEqual(exp_ecdsa, my_ecdsa) + self.assertEqual(PUB_ECDSA_384.split()[1], key.get_base64()) + self.assertEqual(384, key.get_bits()) + + s = StringIO() + key.write_private_key(s) + self.assertEqual(ECDSA_PRIVATE_OUT_384, s.getvalue()) + s.seek(0) + key2 = ECDSAKey.from_private_key(s) + self.assertEqual(key, key2) + + def test_15_load_ecdsa_password_384(self): + key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_384.key'), b'television') + self.assertEqual('ecdsa-sha2-nistp384', key.get_name()) + exp_ecdsa = b(FINGER_ECDSA_384.split()[1].replace(':', '')) + my_ecdsa = hexlify(key.get_fingerprint()) + self.assertEqual(exp_ecdsa, my_ecdsa) + self.assertEqual(PUB_ECDSA_384.split()[1], key.get_base64()) + self.assertEqual(384, key.get_bits()) + + def test_16_compare_ecdsa_384(self): + # verify that the private & public keys compare equal + key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key')) + self.assertEqual(key, key) + pub = ECDSAKey(data=key.asbytes()) + self.assertTrue(key.can_sign()) + self.assertTrue(not pub.can_sign()) + self.assertEqual(key, pub) + + def test_17_sign_ecdsa_384(self): + # verify that the rsa private key can sign and verify + key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key')) + msg = key.sign_ssh_data(b'ice weasels') + self.assertTrue(type(msg) is Message) + msg.rewind() + self.assertEqual('ecdsa-sha2-nistp384', msg.get_text()) + # ECDSA signatures, like DSS signatures, tend to be different + # each time, so we can't compare against a "known correct" + # signature. + # Even the length of the signature can change. + + msg.rewind() + pub = ECDSAKey(data=key.asbytes()) + self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg)) + + def test_18_load_ecdsa_521(self): + key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key')) + self.assertEqual('ecdsa-sha2-nistp521', key.get_name()) + exp_ecdsa = b(FINGER_ECDSA_521.split()[1].replace(':', '')) + my_ecdsa = hexlify(key.get_fingerprint()) + self.assertEqual(exp_ecdsa, my_ecdsa) + self.assertEqual(PUB_ECDSA_521.split()[1], key.get_base64()) + self.assertEqual(521, key.get_bits()) + + s = StringIO() + key.write_private_key(s) + # Different versions of OpenSSL (SSLeay versions 0x1000100f and + # 0x1000207f for instance) use different apparently valid (as far as + # ssh-keygen is concerned) padding. So we can't check the actual value + # of the pem encoded key. + s.seek(0) + key2 = ECDSAKey.from_private_key(s) + self.assertEqual(key, key2) + + def test_19_load_ecdsa_password_521(self): + key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_521.key'), b'television') + self.assertEqual('ecdsa-sha2-nistp521', key.get_name()) + exp_ecdsa = b(FINGER_ECDSA_521.split()[1].replace(':', '')) + my_ecdsa = hexlify(key.get_fingerprint()) + self.assertEqual(exp_ecdsa, my_ecdsa) + self.assertEqual(PUB_ECDSA_521.split()[1], key.get_base64()) + self.assertEqual(521, key.get_bits()) + + def test_20_compare_ecdsa_521(self): + # verify that the private & public keys compare equal + key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key')) + self.assertEqual(key, key) + pub = ECDSAKey(data=key.asbytes()) + self.assertTrue(key.can_sign()) + self.assertTrue(not pub.can_sign()) + self.assertEqual(key, pub) + + def test_21_sign_ecdsa_521(self): + # verify that the rsa private key can sign and verify + key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key')) + msg = key.sign_ssh_data(b'ice weasels') + self.assertTrue(type(msg) is Message) + msg.rewind() + self.assertEqual('ecdsa-sha2-nistp521', msg.get_text()) + # ECDSA signatures, like DSS signatures, tend to be different + # each time, so we can't compare against a "known correct" + # signature. + # Even the length of the signature can change. + + msg.rewind() + pub = ECDSAKey(data=key.asbytes()) + self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg)) + def test_salt_size(self): # Read an existing encrypted private key file_ = test_path('test_rsa_password.key') @@ -271,3 +424,16 @@ class KeyTest (unittest.TestCase): self.assertEqual(key, key2) finally: os.remove(newfile) + + def test_stringification(self): + key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + comparable = TEST_KEY_BYTESTR_2 if PY2 else TEST_KEY_BYTESTR_3 + self.assertEqual(str(key), comparable) + + def test_ed25519(self): + key1 = Ed25519Key.from_private_key_file(test_path('test_ed25519.key')) + key2 = Ed25519Key.from_private_key_file( + test_path('test_ed25519_password.key'), b'abc123' + ) + + self.assertNotEqual(key1.asbytes(), key2.asbytes()) diff --git a/tests/test_sftp.py b/tests/test_sftp.py index b7ace8e2..8f1b7d2e 100755 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -446,7 +446,7 @@ class SFTPTest (unittest.TestCase): def test_A_readline_seek(self): """ create a text file and write a bunch of text into it. then count the lines - in the file, and seek around to retreive particular lines. this should + in the file, and seek around to retrieve particular lines. this should verify that read buffering and 'tell' work well together, and that read buffering is reset on 'seek'. """ @@ -462,6 +462,7 @@ class SFTPTest (unittest.TestCase): line_number += 1 pos_list.append(loc) loc = f.tell() + self.assertTrue(f.seekable()) f.seek(pos_list[6], f.SEEK_SET) self.assertEqual(f.readline(), 'Nouzilly, France.\n') f.seek(pos_list[17], f.SEEK_SET) @@ -643,7 +644,7 @@ class SFTPTest (unittest.TestCase): with sftp.open(FOLDER + '/bunny.txt', 'rb') as f: self.assertEqual(text, f.read(128)) - self.assertEqual((41, 41), saved_progress[-1]) + self.assertEqual([(41, 41)], saved_progress) os.unlink(localname) fd, localname = mkstemp() @@ -653,7 +654,7 @@ class SFTPTest (unittest.TestCase): with open(localname, 'rb') as f: self.assertEqual(text, f.read(128)) - self.assertEqual((41, 41), saved_progress[-1]) + self.assertEqual([(41, 41)], saved_progress) os.unlink(localname) sftp.unlink(FOLDER + '/bunny.txt') @@ -729,7 +730,8 @@ class SFTPTest (unittest.TestCase): f.readv([(0, 12)]) with sftp.open(FOLDER + '/zero', 'r') as f: - f.prefetch() + file_size = f.stat().st_size + f.prefetch(file_size) f.read(100) finally: sftp.unlink(FOLDER + '/zero') @@ -844,6 +846,11 @@ class SFTPTest (unittest.TestCase): sftp.remove('%s/nonutf8data' % FOLDER) + def test_sftp_attributes_empty_str(self): + sftp_attributes = SFTPAttributes() + self.assertEqual(str(sftp_attributes), "?--------- 1 0 0 0 (unknown date) ?") + + if __name__ == '__main__': SFTPTest.init_loopback() # logging is required by test_N_file_with_percent diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py index abed27b8..cfad5682 100644 --- a/tests/test_sftp_big.py +++ b/tests/test_sftp_big.py @@ -132,7 +132,8 @@ class BigSFTPTest (unittest.TestCase): start = time.time() with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f: - f.prefetch() + file_size = f.stat().st_size + f.prefetch(file_size) # read on odd boundaries to make sure the bytes aren't getting scrambled n = 0 @@ -171,7 +172,8 @@ class BigSFTPTest (unittest.TestCase): chunk = 793 for i in range(10): with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f: - f.prefetch() + file_size = f.stat().st_size + f.prefetch(file_size) base_offset = (512 * 1024) + 17 * random.randint(1000, 2000) offsets = [base_offset + j * chunk for j in range(100)] # randomly seek around and read them out @@ -245,9 +247,11 @@ class BigSFTPTest (unittest.TestCase): for i in range(10): with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f: - f.prefetch() + file_size = f.stat().st_size + f.prefetch(file_size) with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f: - f.prefetch() + file_size = f.stat().st_size + f.prefetch(file_size) for n in range(1024): data = f.read(1024) self.assertEqual(data, kblob) @@ -275,7 +279,8 @@ class BigSFTPTest (unittest.TestCase): self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f: - f.prefetch() + file_size = f.stat().st_size + f.prefetch(file_size) data = f.read(1024) self.assertEqual(data, kblob) @@ -353,7 +358,8 @@ class BigSFTPTest (unittest.TestCase): # try to read it too. with sftp.open('%s/hongry.txt' % FOLDER, 'r', 128 * 1024) as f: - f.prefetch() + file_size = f.stat().st_size + f.prefetch(file_size) total = 0 while total < 1024 * 1024: total += len(f.read(32 * 1024)) diff --git a/tests/test_ssh_exception.py b/tests/test_ssh_exception.py new file mode 100644 index 00000000..18f2a97d --- /dev/null +++ b/tests/test_ssh_exception.py @@ -0,0 +1,31 @@ +import pickle +import unittest + +from paramiko.ssh_exception import NoValidConnectionsError + + +class NoValidConnectionsErrorTest (unittest.TestCase): + + def test_pickling(self): + # Regression test for https://github.com/paramiko/paramiko/issues/617 + exc = NoValidConnectionsError({('127.0.0.1', '22'): Exception()}) + new_exc = pickle.loads(pickle.dumps(exc)) + self.assertEqual(type(exc), type(new_exc)) + self.assertEqual(str(exc), str(new_exc)) + self.assertEqual(exc.args, new_exc.args) + + def test_error_message_for_single_host(self): + exc = NoValidConnectionsError({('127.0.0.1', '22'): Exception()}) + assert "Unable to connect to port 22 on 127.0.0.1" in str(exc) + + def test_error_message_for_two_hosts(self): + exc = NoValidConnectionsError({('127.0.0.1', '22'): Exception(), + ('::1', '22'): Exception()}) + assert "Unable to connect to port 22 on 127.0.0.1 or ::1" in str(exc) + + def test_error_message_for_multiple_hosts(self): + exc = NoValidConnectionsError({('127.0.0.1', '22'): Exception(), + ('::1', '22'): Exception(), + ('10.0.0.42', '22'): Exception()}) + exp = "Unable to connect to port 22 on 10.0.0.42, 127.0.0.1 or ::1" + assert exp in str(exc) diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index e20d348f..967b3b81 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -43,9 +43,7 @@ class NullServer (paramiko.ServerInterface): return paramiko.AUTH_FAILED def enable_auth_gssapi(self): - UseGSSAPI = True - GSSAPICleanupCredentials = True - return UseGSSAPI + return True def check_channel_request(self, kind, chanid): return paramiko.OPEN_SUCCEEDED diff --git a/tests/test_transport.py b/tests/test_transport.py index 5cf9a867..c426cef1 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -28,15 +28,19 @@ import socket import time import threading import random +from hashlib import sha1 import unittest -from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \ - SSHException, ChannelException +from paramiko import ( + Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, SSHException, + ChannelException, Packetizer, +) from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED -from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, \ - MIN_PACKET_SIZE, MIN_WINDOW_SIZE, MAX_WINDOW_SIZE, \ - DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE +from paramiko.common import ( + MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, MIN_PACKET_SIZE, MIN_WINDOW_SIZE, + MAX_WINDOW_SIZE, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE, +) from paramiko.py3compat import bytes from paramiko.message import Message from tests.loop import LoopSocket @@ -77,7 +81,7 @@ class NullServer (ServerInterface): return OPEN_SUCCEEDED def check_channel_exec_request(self, channel, command): - if command != 'yes': + if command != b'yes': return False return True @@ -161,6 +165,15 @@ class TransportTest(unittest.TestCase): except TypeError: pass + def test_1b_security_options_reset(self): + o = self.tc.get_security_options() + # should not throw any exceptions + o.ciphers = o.ciphers + o.digests = o.digests + o.key_types = o.key_types + o.kex = o.kex + o.compression = o.compression + def test_2_compute_key(self): self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929 self.tc.H = b'\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3' @@ -251,7 +264,7 @@ class TransportTest(unittest.TestCase): chan = self.tc.open_session() schan = self.ts.accept(1.0) try: - chan.exec_command('no') + chan.exec_command(b'command contains \xfc and is not a valid UTF-8 string') self.assertTrue(False) except SSHException: pass @@ -447,9 +460,11 @@ class TransportTest(unittest.TestCase): bytes = self.tc.packetizer._Packetizer__sent_bytes chan.send('x' * 1024) bytes2 = self.tc.packetizer._Packetizer__sent_bytes + block_size = self.tc._cipher_info[self.tc.local_cipher]['block-size'] + mac_size = self.tc._mac_info[self.tc.local_mac]['size'] # tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :) self.assertTrue(bytes2 - bytes < 1024) - self.assertEqual(52, bytes2 - bytes) + self.assertEqual(16 + block_size + mac_size, bytes2 - bytes) chan.close() schan.close() @@ -792,3 +807,54 @@ class TransportTest(unittest.TestCase): (None, DEFAULT_WINDOW_SIZE), (2**32, MAX_WINDOW_SIZE)]: self.assertEqual(self.tc._sanitize_window_size(val), correct) + + def test_L_handshake_timeout(self): + """ + verify that we can get a hanshake timeout. + """ + # Tweak client Transport instance's Packetizer instance so + # its read_message() sleeps a bit. This helps prevent race conditions + # where the client Transport's timeout timer thread doesn't even have + # time to get scheduled before the main client thread finishes + # handshaking with the server. + # (Doing this on the server's transport *sounds* more 'correct' but + # actually doesn't work nearly as well for whatever reason.) + class SlowPacketizer(Packetizer): + def read_message(self): + time.sleep(1) + return super(SlowPacketizer, self).read_message() + # NOTE: prettttty sure since the replaced .packetizer Packetizer is now + # no longer doing anything with its copy of the socket...everything'll + # be fine. Even tho it's a bit squicky. + self.tc.packetizer = SlowPacketizer(self.tc.sock) + # Continue with regular test red tape. + host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + public_host_key = RSAKey(data=host_key.asbytes()) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assertTrue(not event.is_set()) + self.tc.handshake_timeout = 0.000000000001 + self.ts.start_server(event, server) + self.assertRaises(EOFError, self.tc.connect, + hostkey=public_host_key, + username='slowdive', + password='pygmalion') + + def test_M_select_after_close(self): + """ + verify that select works when a channel is already closed. + """ + self.setup_test_server() + chan = self.tc.open_session() + chan.invoke_shell() + schan = self.ts.accept(1.0) + schan.close() + + # give client a moment to receive close notification + time.sleep(0.1) + + r, w, e = select.select([chan], [], [], 0.1) + self.assertEqual([chan], r) + self.assertEqual([], w) + self.assertEqual([], e) diff --git a/tests/test_util.py b/tests/test_util.py index bfdc525e..7880e156 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -30,6 +30,7 @@ import paramiko.util from paramiko.util import lookup_ssh_host_config as host_config, safe_string from paramiko.py3compat import StringIO, byte_ord, b +# Note some lines in this configuration have trailing spaces on purpose test_config_file = """\ Host * User robey @@ -65,7 +66,7 @@ from paramiko import * class UtilTest(unittest.TestCase): - def test_1_import(self): + def test_import(self): """ verify that all the classes can be imported from paramiko. """ @@ -103,30 +104,30 @@ class UtilTest(unittest.TestCase): self.assertTrue('SSHConfig' in symbols) self.assertTrue('util' in symbols) - def test_2_parse_config(self): + def test_parse_config(self): global test_config_file f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) self.assertEqual(config._config, [{'host': ['*'], 'config': {}}, {'host': ['*'], 'config': {'identityfile': ['~/.ssh/id_rsa'], 'user': 'robey'}}, {'host': ['*.example.com'], 'config': {'user': 'bjork', 'port': '3333'}}, - {'host': ['*'], 'config': {'crazy': 'something dumb '}}, + {'host': ['*'], 'config': {'crazy': 'something dumb'}}, {'host': ['spoo.example.com'], 'config': {'crazy': 'something else'}}]) - def test_3_host_config(self): + def test_host_config(self): global test_config_file f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) for host, values in { - 'irc.danger.com': {'crazy': 'something dumb ', + 'irc.danger.com': {'crazy': 'something dumb', 'hostname': 'irc.danger.com', 'user': 'robey'}, - 'irc.example.com': {'crazy': 'something dumb ', + 'irc.example.com': {'crazy': 'something dumb', 'hostname': 'irc.example.com', 'user': 'robey', 'port': '3333'}, - 'spoo.example.com': {'crazy': 'something dumb ', + 'spoo.example.com': {'crazy': 'something dumb', 'hostname': 'spoo.example.com', 'user': 'robey', 'port': '3333'} @@ -140,12 +141,12 @@ class UtilTest(unittest.TestCase): values ) - def test_4_generate_key_bytes(self): + def test_generate_key_bytes(self): x = paramiko.util.generate_key_bytes(sha1, b'ABCDEFGH', 'This is my secret passphrase.', 64) hex = ''.join(['%02x' % byte_ord(c) for c in x]) self.assertEqual(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b') - def test_5_host_keys(self): + def test_host_keys(self): with open('hostfile.temp', 'w') as f: f.write(test_hosts_file) try: @@ -158,7 +159,7 @@ class UtilTest(unittest.TestCase): finally: os.unlink('hostfile.temp') - def test_7_host_config_expose_issue_33(self): + def test_host_config_expose_issue_33(self): test_config_file = """ Host www13.* Port 22 @@ -177,7 +178,7 @@ Host * {'hostname': host, 'port': '22'} ) - def test_8_eintr_retry(self): + def test_eintr_retry(self): self.assertEqual('foo', paramiko.util.retry_on_signal(lambda: 'foo')) # Variables that are set by raises_intr @@ -202,7 +203,7 @@ Host * self.assertRaises(AssertionError, lambda: paramiko.util.retry_on_signal(raises_other_exception)) - def test_9_proxycommand_config_equals_parsing(self): + def test_proxycommand_config_equals_parsing(self): """ ProxyCommand should not split on equals signs within the value. """ @@ -221,7 +222,7 @@ Host equals-delimited 'foo bar=biz baz' ) - def test_10_proxycommand_interpolation(self): + def test_proxycommand_interpolation(self): """ ProxyCommand should perform interpolation on the value """ @@ -247,7 +248,20 @@ Host * val ) - def test_11_host_config_test_negation(self): + def test_proxycommand_tilde_expansion(self): + """ + Tilde (~) should be expanded inside ProxyCommand + """ + config = paramiko.util.parse_ssh_config(StringIO(""" +Host test + ProxyCommand ssh -F ~/.ssh/test_config bastion nc %h %p +""")) + self.assertEqual( + 'ssh -F %s/.ssh/test_config bastion nc test 22' % os.path.expanduser('~'), + host_config('test', config)['proxycommand'] + ) + + def test_host_config_test_negation(self): test_config_file = """ Host www13.* !*.example.com Port 22 @@ -269,7 +283,7 @@ Host * {'hostname': host, 'port': '8080'} ) - def test_12_host_config_test_proxycommand(self): + def test_host_config_test_proxycommand(self): test_config_file = """ Host proxy-with-equal-divisor-and-space ProxyCommand = foo=bar @@ -297,7 +311,7 @@ ProxyCommand foo=bar:%h-%p values ) - def test_11_host_config_test_identityfile(self): + def test_host_config_test_identityfile(self): test_config_file = """ IdentityFile id_dsa0 @@ -327,7 +341,7 @@ IdentityFile id_dsa22 values ) - def test_12_config_addressfamily_and_lazy_fqdn(self): + def test_config_addressfamily_and_lazy_fqdn(self): """ Ensure the code path honoring non-'all' AddressFamily doesn't asplode """ @@ -343,13 +357,13 @@ IdentityFile something_%l_using_fqdn self.assertEqual(32767, paramiko.util.clamp_value(32767, 32765, 32769)) self.assertEqual(32769, paramiko.util.clamp_value(32767, 32770, 32769)) - def test_13_config_dos_crlf_succeeds(self): + def test_config_dos_crlf_succeeds(self): config_file = StringIO("host abcqwerty\r\nHostName 127.0.0.1\r\n") config = paramiko.SSHConfig() config.parse(config_file) self.assertEqual(config.lookup("abcqwerty")["hostname"], "127.0.0.1") - def test_14_get_hostnames(self): + def test_get_hostnames(self): f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) self.assertEqual(config.get_hostnames(), set(['*', '*.example.com', 'spoo.example.com'])) @@ -461,9 +475,10 @@ Host param3 parara safe_has_bytes = safe_string(has_bytes) expected_bytes = b("has %07%03 bytes") err = "{0!r} != {1!r}" - assert safe_vanilla == vanilla, err.format(safe_vanilla, vanilla) - assert safe_has_bytes == expected_bytes, \ - err.format(safe_has_bytes, expected_bytes) + msg = err.format(safe_vanilla, vanilla) + assert safe_vanilla == vanilla, msg + msg = err.format(safe_has_bytes, expected_bytes) + assert safe_has_bytes == expected_bytes, msg def test_proxycommand_none_issue_418(self): test_config_file = """ @@ -484,3 +499,33 @@ Host proxycommand-with-equals-none paramiko.util.lookup_ssh_host_config(host, config), values ) + + def test_proxycommand_none_masking(self): + # Re: https://github.com/paramiko/paramiko/issues/670 + source_config = """ +Host specific-host + ProxyCommand none + +Host other-host + ProxyCommand other-proxy + +Host * + ProxyCommand default-proxy +""" + config = paramiko.SSHConfig() + config.parse(StringIO(source_config)) + # When bug is present, the full stripping-out of specific-host's + # ProxyCommand means it actually appears to pick up the default + # ProxyCommand value instead, due to cascading. It should (for + # backwards compatibility reasons in 1.x/2.x) appear completely blank, + # as if the host had no ProxyCommand whatsoever. + # Threw another unrelated host in there just for sanity reasons. + self.assertFalse('proxycommand' in config.lookup('specific-host')) + self.assertEqual( + config.lookup('other-host')['proxycommand'], + 'other-proxy' + ) + self.assertEqual( + config.lookup('some-random-host')['proxycommand'], + 'default-proxy' + ) |