summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2015-10-30 11:13:24 -0700
committerJeff Forcier <jeff@bitprophet.org>2015-10-30 11:13:24 -0700
commitfa631508e4e736f2cc5dc61c3c5c2c259ea4422f (patch)
tree49f2392384d4659c18f4be1d06e09f3c774b7c1a /tests
parent53e91cc449ff3070cd57af2ed317a17a47d378e1 (diff)
parent0b9d772a21a44af38ecceae0fdbae645e386bd9b (diff)
Merge branch 'master' into 596-int
Diffstat (limited to 'tests')
-rw-r--r--tests/test_auth.py4
-rw-r--r--tests/test_client.py8
-rwxr-xr-xtests/test_file.py6
-rw-r--r--tests/test_gssapi.py4
-rw-r--r--tests/test_hostkeys.py1
-rw-r--r--tests/test_kex_gss.py6
-rw-r--r--tests/test_message.py8
-rwxr-xr-xtests/test_sftp.py7
-rw-r--r--tests/test_ssh_gss.py6
-rw-r--r--tests/test_transport.py45
-rw-r--r--tests/test_util.py20
11 files changed, 75 insertions, 40 deletions
diff --git a/tests/test_auth.py b/tests/test_auth.py
index 1d972d53..ec78e3ce 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -118,12 +118,12 @@ class AuthTest (unittest.TestCase):
self.ts.add_server_key(host_key)
self.event = threading.Event()
self.server = NullServer()
- self.assertTrue(not self.event.isSet())
+ self.assertTrue(not self.event.is_set())
self.ts.start_server(self.event, self.server)
def verify_finished(self):
self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
+ self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
def test_1_bad_auth_type(self):
diff --git a/tests/test_client.py b/tests/test_client.py
index 28d1cb46..3d2e75c9 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -128,7 +128,7 @@ class SSHClientTest (unittest.TestCase):
# Authentication successful?
self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
+ self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
@@ -226,7 +226,7 @@ class SSHClientTest (unittest.TestCase):
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
+ self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
@@ -281,7 +281,7 @@ class SSHClientTest (unittest.TestCase):
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
+ self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
p = weakref.ref(self.tc._transport.packetizer)
@@ -316,7 +316,7 @@ class SSHClientTest (unittest.TestCase):
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
+ self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
self.assertTrue(self.tc._transport is not None)
diff --git a/tests/test_file.py b/tests/test_file.py
index 22a34aca..a6ff69e9 100755
--- a/tests/test_file.py
+++ b/tests/test_file.py
@@ -70,13 +70,17 @@ class BufferedFileTest (unittest.TestCase):
def test_2_readline(self):
f = LoopbackFile('r+U')
- f.write(b'First line.\nSecond line.\r\nThird line.\nFinal line non-terminated.')
+ f.write(b'First line.\nSecond line.\r\nThird line.\n' +
+ b'Fourth line.\nFinal line non-terminated.')
+
self.assertEqual(f.readline(), 'First line.\n')
# universal newline mode should convert this linefeed:
self.assertEqual(f.readline(), 'Second line.\n')
# truncated line:
self.assertEqual(f.readline(7), 'Third l')
self.assertEqual(f.readline(), 'ine.\n')
+ # newline should be detected and only the fourth line returned
+ self.assertEqual(f.readline(39), 'Fourth line.\n')
self.assertEqual(f.readline(), 'Final line non-terminated.')
self.assertEqual(f.readline(), '')
f.close()
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index a328dd65..96c268d9 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -27,15 +27,13 @@ import socket
class GSSAPITest(unittest.TestCase):
-
+ @staticmethod
def init(hostname=None, srv_mode=False):
global krb5_mech, targ_name, server_mode
krb5_mech = "1.2.840.113554.1.2.2"
targ_name = hostname
server_mode = srv_mode
- init = staticmethod(init)
-
def test_1_pyasn1(self):
"""
Test the used methods of pyasn1.
diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py
index 0ee1bbf0..2bdcad9c 100644
--- a/tests/test_hostkeys.py
+++ b/tests/test_hostkeys.py
@@ -31,6 +31,7 @@ test_hosts_file = """\
secure.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA1PD6U2/TVxET6lkpKhOk5r\
9q/kAYG6sP9f5zuUYP8i7FOFp/6ncCEbbtg/lB+A3iidyxoSWl+9jtoyyDOOVX4UIDV9G11Ml8om3\
D+jrpI9cycZHqilK0HmxDeCuxbwyMuaCygU9gS2qoRvNLWZk70OpIKSSpBo0Wl3/XUmz9uhc=
+broken.example.com ssh-rsa AAAA
happy.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31M\
BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M=
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index 8769d09c..3bf788da 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -58,14 +58,12 @@ class NullServer (paramiko.ServerInterface):
class GSSKexTest(unittest.TestCase):
-
+ @staticmethod
def init(username, hostname):
global krb5_principal, targ_name
krb5_principal = username
targ_name = hostname
- init = staticmethod(init)
-
def setUp(self):
self.username = krb5_principal
self.hostname = socket.getfqdn(targ_name)
@@ -111,7 +109,7 @@ class GSSKexTest(unittest.TestCase):
gss_auth=True, gss_kex=True)
self.event.wait(1.0)
- self.assert_(self.event.isSet())
+ self.assert_(self.event.is_set())
self.assert_(self.ts.is_active())
self.assertEquals(self.username, self.ts.get_username())
self.assertEquals(True, self.ts.is_authenticated())
diff --git a/tests/test_message.py b/tests/test_message.py
index f308c037..f18cae90 100644
--- a/tests/test_message.py
+++ b/tests/test_message.py
@@ -92,12 +92,12 @@ class MessageTest (unittest.TestCase):
def test_4_misc(self):
msg = Message(self.__d)
- self.assertEqual(msg.get_int(), 5)
- self.assertEqual(msg.get_int(), 0x1122334455)
- self.assertEqual(msg.get_int(), 0xf00000000000000000)
+ self.assertEqual(msg.get_adaptive_int(), 5)
+ self.assertEqual(msg.get_adaptive_int(), 0x1122334455)
+ self.assertEqual(msg.get_adaptive_int(), 0xf00000000000000000)
self.assertEqual(msg.get_so_far(), self.__d[:29])
self.assertEqual(msg.get_remainder(), self.__d[29:])
msg.rewind()
- self.assertEqual(msg.get_int(), 5)
+ self.assertEqual(msg.get_adaptive_int(), 5)
self.assertEqual(msg.get_so_far(), self.__d[:4])
self.assertEqual(msg.get_remainder(), self.__d[4:])
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index 72c7ba03..cb8f7f84 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -97,7 +97,7 @@ def get_sftp():
class SFTPTest (unittest.TestCase):
-
+ @staticmethod
def init(hostname, username, keyfile, passwd):
global sftp, tc
@@ -129,8 +129,8 @@ class SFTPTest (unittest.TestCase):
sys.stderr.write('\n')
sys.exit(1)
sftp = paramiko.SFTP.from_transport(t)
- init = staticmethod(init)
+ @staticmethod
def init_loopback():
global sftp, tc
@@ -150,12 +150,11 @@ class SFTPTest (unittest.TestCase):
event.wait(1.0)
sftp = paramiko.SFTP.from_transport(tc)
- init_loopback = staticmethod(init_loopback)
+ @staticmethod
def set_big_file_test(onoff):
global g_big_file_test
g_big_file_test = onoff
- set_big_file_test = staticmethod(set_big_file_test)
def setUp(self):
global FOLDER
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index 595081b8..e20d348f 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -57,14 +57,12 @@ class NullServer (paramiko.ServerInterface):
class GSSAuthTest(unittest.TestCase):
-
+ @staticmethod
def init(username, hostname):
global krb5_principal, targ_name
krb5_principal = username
targ_name = hostname
- init = staticmethod(init)
-
def setUp(self):
self.username = krb5_principal
self.hostname = socket.getfqdn(targ_name)
@@ -104,7 +102,7 @@ class GSSAuthTest(unittest.TestCase):
gss_auth=True)
self.event.wait(1.0)
- self.assert_(self.event.isSet())
+ self.assert_(self.event.is_set())
self.assert_(self.ts.is_active())
self.assertEquals(self.username, self.ts.get_username())
self.assertEquals(True, self.ts.is_authenticated())
diff --git a/tests/test_transport.py b/tests/test_transport.py
index d6884e4b..80f5e611 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -36,7 +36,7 @@ from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey
from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, \
- MIN_PACKET_SIZE, MAX_WINDOW_SIZE, \
+ MIN_PACKET_SIZE, MIN_WINDOW_SIZE, MAX_WINDOW_SIZE, \
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE
from paramiko.py3compat import bytes
from paramiko.message import Message
@@ -137,12 +137,12 @@ class TransportTest(unittest.TestCase):
event = threading.Event()
self.server = NullServer()
- self.assertTrue(not event.isSet())
+ self.assertTrue(not event.is_set())
self.ts.start_server(event, self.server)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
- self.assertTrue(event.isSet())
+ self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
def test_1_security_options(self):
@@ -181,7 +181,7 @@ class TransportTest(unittest.TestCase):
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
- self.assertTrue(not event.isSet())
+ self.assertTrue(not event.is_set())
self.assertEqual(None, self.tc.get_username())
self.assertEqual(None, self.ts.get_username())
self.assertEqual(False, self.tc.is_authenticated())
@@ -190,7 +190,7 @@ class TransportTest(unittest.TestCase):
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
- self.assertTrue(event.isSet())
+ self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.tc.get_username())
self.assertEqual('slowdive', self.ts.get_username())
@@ -206,13 +206,13 @@ class TransportTest(unittest.TestCase):
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
- self.assertTrue(not event.isSet())
+ self.assertTrue(not event.is_set())
self.socks.send(LONG_BANNER)
self.ts.start_server(event, server)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
- self.assertTrue(event.isSet())
+ self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
def test_4_special(self):
@@ -683,7 +683,7 @@ class TransportTest(unittest.TestCase):
def run(self):
try:
for i in range(1, 1+self.iterations):
- if self.done_event.isSet():
+ if self.done_event.is_set():
break
self.watchdog_event.set()
#print i, "SEND"
@@ -702,7 +702,7 @@ class TransportTest(unittest.TestCase):
def run(self):
try:
- while not self.done_event.isSet():
+ while not self.done_event.is_set():
if self.chan.recv_ready():
chan.recv(65536)
self.watchdog_event.set()
@@ -756,12 +756,12 @@ class TransportTest(unittest.TestCase):
# Act as a watchdog timer, checking
deadlocked = False
- while not deadlocked and not done_event.isSet():
+ while not deadlocked and not done_event.is_set():
for event in (st.watchdog_event, rt.watchdog_event):
event.wait(timeout)
- if done_event.isSet():
+ if done_event.is_set():
break
- if not event.isSet():
+ if not event.is_set():
deadlocked = True
break
event.clear()
@@ -782,7 +782,7 @@ class TransportTest(unittest.TestCase):
"""
verify that we conform to the rfc of packet and window sizes.
"""
- for val, correct in [(32767, MIN_PACKET_SIZE),
+ for val, correct in [(4095, MIN_PACKET_SIZE),
(None, DEFAULT_MAX_PACKET_SIZE),
(2**32, MAX_WINDOW_SIZE)]:
self.assertEqual(self.tc._sanitize_packet_size(val), correct)
@@ -791,7 +791,24 @@ class TransportTest(unittest.TestCase):
"""
verify that we conform to the rfc of packet and window sizes.
"""
- for val, correct in [(32767, MIN_PACKET_SIZE),
+ for val, correct in [(32767, MIN_WINDOW_SIZE),
(None, DEFAULT_WINDOW_SIZE),
(2**32, MAX_WINDOW_SIZE)]:
self.assertEqual(self.tc._sanitize_window_size(val), correct)
+
+ def test_L_handshake_timeout(self):
+ """
+ verify that we can get a hanshake timeout.
+ """
+ host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ public_host_key = RSAKey(data=host_key.asbytes())
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assertTrue(not event.is_set())
+ self.tc.handshake_timeout = 0.000000000001
+ self.ts.start_server(event, server)
+ self.assertRaises(EOFError, self.tc.connect,
+ hostkey=public_host_key,
+ username='slowdive',
+ password='pygmalion')
diff --git a/tests/test_util.py b/tests/test_util.py
index 7f68de21..bfdc525e 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -464,3 +464,23 @@ Host param3 parara
assert safe_vanilla == vanilla, err.format(safe_vanilla, vanilla)
assert safe_has_bytes == expected_bytes, \
err.format(safe_has_bytes, expected_bytes)
+
+ def test_proxycommand_none_issue_418(self):
+ test_config_file = """
+Host proxycommand-standard-none
+ ProxyCommand None
+
+Host proxycommand-with-equals-none
+ ProxyCommand=None
+ """
+ for host, values in {
+ 'proxycommand-standard-none': {'hostname': 'proxycommand-standard-none'},
+ 'proxycommand-with-equals-none': {'hostname': 'proxycommand-with-equals-none'}
+ }.items():
+
+ f = StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ self.assertEqual(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ values
+ )