diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_auth.py | 4 | ||||
-rw-r--r-- | tests/test_client.py | 8 | ||||
-rwxr-xr-x | tests/test_file.py | 6 | ||||
-rw-r--r-- | tests/test_gssapi.py | 4 | ||||
-rw-r--r-- | tests/test_kex_gss.py | 6 | ||||
-rwxr-xr-x | tests/test_sftp.py | 7 | ||||
-rw-r--r-- | tests/test_ssh_gss.py | 6 | ||||
-rw-r--r-- | tests/test_transport.py | 28 | ||||
-rw-r--r-- | tests/test_util.py | 35 |
9 files changed, 66 insertions, 38 deletions
diff --git a/tests/test_auth.py b/tests/test_auth.py index 1d972d53..ec78e3ce 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -118,12 +118,12 @@ class AuthTest (unittest.TestCase): self.ts.add_server_key(host_key) self.event = threading.Event() self.server = NullServer() - self.assertTrue(not self.event.isSet()) + self.assertTrue(not self.event.is_set()) self.ts.start_server(self.event, self.server) def verify_finished(self): self.event.wait(1.0) - self.assertTrue(self.event.isSet()) + self.assertTrue(self.event.is_set()) self.assertTrue(self.ts.is_active()) def test_1_bad_auth_type(self): diff --git a/tests/test_client.py b/tests/test_client.py index 1978004e..cef7b28d 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -131,7 +131,7 @@ class SSHClientTest (unittest.TestCase): # Authentication successful? self.event.wait(1.0) - self.assertTrue(self.event.isSet()) + self.assertTrue(self.event.is_set()) self.assertTrue(self.ts.is_active()) self.assertEqual('slowdive', self.ts.get_username()) self.assertEqual(True, self.ts.is_authenticated()) @@ -229,7 +229,7 @@ class SSHClientTest (unittest.TestCase): self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') self.event.wait(1.0) - self.assertTrue(self.event.isSet()) + self.assertTrue(self.event.is_set()) self.assertTrue(self.ts.is_active()) self.assertEqual('slowdive', self.ts.get_username()) self.assertEqual(True, self.ts.is_authenticated()) @@ -283,7 +283,7 @@ class SSHClientTest (unittest.TestCase): self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') self.event.wait(1.0) - self.assertTrue(self.event.isSet()) + self.assertTrue(self.event.is_set()) self.assertTrue(self.ts.is_active()) p = weakref.ref(self.tc._transport.packetizer) @@ -312,7 +312,7 @@ class SSHClientTest (unittest.TestCase): self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') self.event.wait(1.0) - self.assertTrue(self.event.isSet()) + self.assertTrue(self.event.is_set()) self.assertTrue(self.ts.is_active()) self.assertTrue(self.tc._transport is not None) diff --git a/tests/test_file.py b/tests/test_file.py index 22a34aca..a6ff69e9 100755 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -70,13 +70,17 @@ class BufferedFileTest (unittest.TestCase): def test_2_readline(self): f = LoopbackFile('r+U') - f.write(b'First line.\nSecond line.\r\nThird line.\nFinal line non-terminated.') + f.write(b'First line.\nSecond line.\r\nThird line.\n' + + b'Fourth line.\nFinal line non-terminated.') + self.assertEqual(f.readline(), 'First line.\n') # universal newline mode should convert this linefeed: self.assertEqual(f.readline(), 'Second line.\n') # truncated line: self.assertEqual(f.readline(7), 'Third l') self.assertEqual(f.readline(), 'ine.\n') + # newline should be detected and only the fourth line returned + self.assertEqual(f.readline(39), 'Fourth line.\n') self.assertEqual(f.readline(), 'Final line non-terminated.') self.assertEqual(f.readline(), '') f.close() diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index a328dd65..96c268d9 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -27,15 +27,13 @@ import socket class GSSAPITest(unittest.TestCase): - + @staticmethod def init(hostname=None, srv_mode=False): global krb5_mech, targ_name, server_mode krb5_mech = "1.2.840.113554.1.2.2" targ_name = hostname server_mode = srv_mode - init = staticmethod(init) - def test_1_pyasn1(self): """ Test the used methods of pyasn1. diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index 8769d09c..3bf788da 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -58,14 +58,12 @@ class NullServer (paramiko.ServerInterface): class GSSKexTest(unittest.TestCase): - + @staticmethod def init(username, hostname): global krb5_principal, targ_name krb5_principal = username targ_name = hostname - init = staticmethod(init) - def setUp(self): self.username = krb5_principal self.hostname = socket.getfqdn(targ_name) @@ -111,7 +109,7 @@ class GSSKexTest(unittest.TestCase): gss_auth=True, gss_kex=True) self.event.wait(1.0) - self.assert_(self.event.isSet()) + self.assert_(self.event.is_set()) self.assert_(self.ts.is_active()) self.assertEquals(self.username, self.ts.get_username()) self.assertEquals(True, self.ts.is_authenticated()) diff --git a/tests/test_sftp.py b/tests/test_sftp.py index 72c7ba03..cb8f7f84 100755 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -97,7 +97,7 @@ def get_sftp(): class SFTPTest (unittest.TestCase): - + @staticmethod def init(hostname, username, keyfile, passwd): global sftp, tc @@ -129,8 +129,8 @@ class SFTPTest (unittest.TestCase): sys.stderr.write('\n') sys.exit(1) sftp = paramiko.SFTP.from_transport(t) - init = staticmethod(init) + @staticmethod def init_loopback(): global sftp, tc @@ -150,12 +150,11 @@ class SFTPTest (unittest.TestCase): event.wait(1.0) sftp = paramiko.SFTP.from_transport(tc) - init_loopback = staticmethod(init_loopback) + @staticmethod def set_big_file_test(onoff): global g_big_file_test g_big_file_test = onoff - set_big_file_test = staticmethod(set_big_file_test) def setUp(self): global FOLDER diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index 595081b8..e20d348f 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -57,14 +57,12 @@ class NullServer (paramiko.ServerInterface): class GSSAuthTest(unittest.TestCase): - + @staticmethod def init(username, hostname): global krb5_principal, targ_name krb5_principal = username targ_name = hostname - init = staticmethod(init) - def setUp(self): self.username = krb5_principal self.hostname = socket.getfqdn(targ_name) @@ -104,7 +102,7 @@ class GSSAuthTest(unittest.TestCase): gss_auth=True) self.event.wait(1.0) - self.assert_(self.event.isSet()) + self.assert_(self.event.is_set()) self.assert_(self.ts.is_active()) self.assertEquals(self.username, self.ts.get_username()) self.assertEquals(True, self.ts.is_authenticated()) diff --git a/tests/test_transport.py b/tests/test_transport.py index 50b1d86b..5cf9a867 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -35,7 +35,7 @@ from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, \ - MIN_PACKET_SIZE, MAX_WINDOW_SIZE, \ + MIN_PACKET_SIZE, MIN_WINDOW_SIZE, MAX_WINDOW_SIZE, \ DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE from paramiko.py3compat import bytes from paramiko.message import Message @@ -136,12 +136,12 @@ class TransportTest(unittest.TestCase): event = threading.Event() self.server = NullServer() - self.assertTrue(not event.isSet()) + self.assertTrue(not event.is_set()) self.ts.start_server(event, self.server) self.tc.connect(hostkey=public_host_key, username='slowdive', password='pygmalion') event.wait(1.0) - self.assertTrue(event.isSet()) + self.assertTrue(event.is_set()) self.assertTrue(self.ts.is_active()) def test_1_security_options(self): @@ -180,7 +180,7 @@ class TransportTest(unittest.TestCase): self.ts.add_server_key(host_key) event = threading.Event() server = NullServer() - self.assertTrue(not event.isSet()) + self.assertTrue(not event.is_set()) self.assertEqual(None, self.tc.get_username()) self.assertEqual(None, self.ts.get_username()) self.assertEqual(False, self.tc.is_authenticated()) @@ -189,7 +189,7 @@ class TransportTest(unittest.TestCase): self.tc.connect(hostkey=public_host_key, username='slowdive', password='pygmalion') event.wait(1.0) - self.assertTrue(event.isSet()) + self.assertTrue(event.is_set()) self.assertTrue(self.ts.is_active()) self.assertEqual('slowdive', self.tc.get_username()) self.assertEqual('slowdive', self.ts.get_username()) @@ -205,13 +205,13 @@ class TransportTest(unittest.TestCase): self.ts.add_server_key(host_key) event = threading.Event() server = NullServer() - self.assertTrue(not event.isSet()) + self.assertTrue(not event.is_set()) self.socks.send(LONG_BANNER) self.ts.start_server(event, server) self.tc.connect(hostkey=public_host_key, username='slowdive', password='pygmalion') event.wait(1.0) - self.assertTrue(event.isSet()) + self.assertTrue(event.is_set()) self.assertTrue(self.ts.is_active()) def test_4_special(self): @@ -680,7 +680,7 @@ class TransportTest(unittest.TestCase): def run(self): try: for i in range(1, 1+self.iterations): - if self.done_event.isSet(): + if self.done_event.is_set(): break self.watchdog_event.set() #print i, "SEND" @@ -699,7 +699,7 @@ class TransportTest(unittest.TestCase): def run(self): try: - while not self.done_event.isSet(): + while not self.done_event.is_set(): if self.chan.recv_ready(): chan.recv(65536) self.watchdog_event.set() @@ -753,12 +753,12 @@ class TransportTest(unittest.TestCase): # Act as a watchdog timer, checking deadlocked = False - while not deadlocked and not done_event.isSet(): + while not deadlocked and not done_event.is_set(): for event in (st.watchdog_event, rt.watchdog_event): event.wait(timeout) - if done_event.isSet(): + if done_event.is_set(): break - if not event.isSet(): + if not event.is_set(): deadlocked = True break event.clear() @@ -779,7 +779,7 @@ class TransportTest(unittest.TestCase): """ verify that we conform to the rfc of packet and window sizes. """ - for val, correct in [(32767, MIN_PACKET_SIZE), + for val, correct in [(4095, MIN_PACKET_SIZE), (None, DEFAULT_MAX_PACKET_SIZE), (2**32, MAX_WINDOW_SIZE)]: self.assertEqual(self.tc._sanitize_packet_size(val), correct) @@ -788,7 +788,7 @@ class TransportTest(unittest.TestCase): """ verify that we conform to the rfc of packet and window sizes. """ - for val, correct in [(32767, MIN_PACKET_SIZE), + for val, correct in [(32767, MIN_WINDOW_SIZE), (None, DEFAULT_WINDOW_SIZE), (2**32, MAX_WINDOW_SIZE)]: self.assertEqual(self.tc._sanitize_window_size(val), correct) diff --git a/tests/test_util.py b/tests/test_util.py index f961fbbc..bfdc525e 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -27,8 +27,8 @@ from hashlib import sha1 import unittest import paramiko.util -from paramiko.util import lookup_ssh_host_config as host_config -from paramiko.py3compat import StringIO, byte_ord +from paramiko.util import lookup_ssh_host_config as host_config, safe_string +from paramiko.py3compat import StringIO, byte_ord, b test_config_file = """\ Host * @@ -453,3 +453,34 @@ Host param3 parara ) for host in incorrect_data: self.assertRaises(Exception, conf._get_hosts, host) + + def test_safe_string(self): + vanilla = b("vanilla") + has_bytes = b("has \7\3 bytes") + safe_vanilla = safe_string(vanilla) + safe_has_bytes = safe_string(has_bytes) + expected_bytes = b("has %07%03 bytes") + err = "{0!r} != {1!r}" + assert safe_vanilla == vanilla, err.format(safe_vanilla, vanilla) + assert safe_has_bytes == expected_bytes, \ + err.format(safe_has_bytes, expected_bytes) + + def test_proxycommand_none_issue_418(self): + test_config_file = """ +Host proxycommand-standard-none + ProxyCommand None + +Host proxycommand-with-equals-none + ProxyCommand=None + """ + for host, values in { + 'proxycommand-standard-none': {'hostname': 'proxycommand-standard-none'}, + 'proxycommand-with-equals-none': {'hostname': 'proxycommand-with-equals-none'} + }.items(): + + f = StringIO(test_config_file) + config = paramiko.util.parse_ssh_config(f) + self.assertEqual( + paramiko.util.lookup_ssh_host_config(host, config), + values + ) |