summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
authorScott Maxwell <scott@codecobblers.com>2013-10-30 17:14:52 -0700
committerScott Maxwell <scott@codecobblers.com>2013-10-30 17:14:52 -0700
commit0b7d0cf0a23e4f16f8552ae05a66539119e2e920 (patch)
tree2ba26a535dceb8b2684adedae99fdbc4c77b3a67 /tests
parent2d738fa08b85c5065cc898d5f9e4d36ee753e871 (diff)
Convert and detect types properly, use helper constants, use StringIO and range
Diffstat (limited to 'tests')
-rw-r--r--tests/loop.py4
-rw-r--r--tests/stub_sftp.py2
-rw-r--r--tests/test_auth.py9
-rw-r--r--tests/test_kex.py6
-rw-r--r--tests/test_pkey.py6
-rwxr-xr-xtests/test_sftp.py45
-rw-r--r--tests/test_sftp_big.py32
-rw-r--r--tests/test_transport.py6
-rw-r--r--tests/test_util.py20
9 files changed, 72 insertions, 58 deletions
diff --git a/tests/loop.py b/tests/loop.py
index 2f3f5dfc..cfd76265 100644
--- a/tests/loop.py
+++ b/tests/loop.py
@@ -32,7 +32,7 @@ class LoopSocket (object):
"""
def __init__(self):
- self.__in_buffer = ''
+ self.__in_buffer = bytes()
self.__lock = threading.Lock()
self.__cv = threading.Condition(self.__lock)
self.__timeout = None
@@ -42,7 +42,7 @@ class LoopSocket (object):
self.__unlink()
try:
self.__lock.acquire()
- self.__in_buffer = ''
+ self.__in_buffer = bytes()
finally:
self.__lock.release()
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py
index b8bea9b5..26ca13b3 100644
--- a/tests/stub_sftp.py
+++ b/tests/stub_sftp.py
@@ -104,7 +104,7 @@ class StubSFTPServer (SFTPServerInterface):
else:
# os.open() defaults to 0777 which is
# an odd default mode for files
- fd = os.open(path, flags, 0666)
+ fd = os.open(path, flags, o666)
except OSError:
e = sys.exc_info()[1]
return SFTPServer.convert_errno(e.errno)
diff --git a/tests/test_auth.py b/tests/test_auth.py
index a7c9e61b..5fd0bf5d 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -32,6 +32,11 @@ from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
from tests.loop import LoopSocket
from tests.util import test_path
+try:
+ _pwd = u'\u2022'
+except Exception:
+ _pwd = '\u2022'
+
class NullServer (ServerInterface):
paranoid_did_password = False
@@ -65,7 +70,7 @@ class NullServer (ServerInterface):
if self.paranoid_did_public_key:
return AUTH_SUCCESSFUL
return AUTH_PARTIALLY_SUCCESSFUL
- if (username == 'utf8') and (password == u'\u2022'):
+ if (username == 'utf8') and (password == _pwd):
return AUTH_SUCCESSFUL
if (username == 'non-utf8') and (password == '\xff'):
return AUTH_SUCCESSFUL
@@ -203,7 +208,7 @@ class AuthTest (unittest.TestCase):
"""
self.start_server()
self.tc.connect(hostkey=self.public_host_key)
- remain = self.tc.auth_password('utf8', u'\u2022')
+ remain = self.tc.auth_password('utf8', _pwd)
self.assertEquals([], remain)
self.verify_finished()
diff --git a/tests/test_kex.py b/tests/test_kex.py
index 21986fcc..c94b777b 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -31,7 +31,7 @@ from paramiko.common import *
class FakeRng (object):
def read(self, n):
- return chr(0xcc) * n
+ return byte_chr(0xcc) * n
class FakeKey (object):
@@ -44,7 +44,7 @@ class FakeKey (object):
class FakeModulusPack (object):
- P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFFL
+ P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF
G = 2
def get_modulus(self, min, ask, max):
return self.G, self.P
@@ -78,7 +78,7 @@ class FakeTransport (object):
class KexTest (unittest.TestCase):
- K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504L
+ K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504
def setUp(self):
pass
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 589a35d8..c7240db3 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -99,7 +99,7 @@ class KeyTest (unittest.TestCase):
self.assertEquals(PUB_RSA.split()[1], key.get_base64())
self.assertEquals(1024, key.get_bits())
- s = StringIO.StringIO()
+ s = StringIO()
key.write_private_key(s)
self.assertEquals(RSA_PRIVATE_OUT, s.getvalue())
s.seek(0)
@@ -124,7 +124,7 @@ class KeyTest (unittest.TestCase):
self.assertEquals(PUB_DSS.split()[1], key.get_base64())
self.assertEquals(1024, key.get_bits())
- s = StringIO.StringIO()
+ s = StringIO()
key.write_private_key(s)
self.assertEquals(DSS_PRIVATE_OUT, s.getvalue())
s.seek(0)
@@ -207,7 +207,7 @@ class KeyTest (unittest.TestCase):
self.assertEquals(PUB_ECDSA.split()[1], key.get_base64())
self.assertEquals(256, key.get_bits())
- s = StringIO.StringIO()
+ s = StringIO()
key.write_private_key(s)
self.assertEquals(ECDSA_PRIVATE_OUT, s.getvalue())
s.seek(0)
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index a4452711..c80967d1 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -71,7 +71,10 @@ FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000')
sftp = None
tc = None
g_big_file_test = True
-
+try:
+ unicode_folder = u'\u00fcnic\u00f8de'
+except SyntaxError:
+ unicode_folder = '\u00fcnic\u00f8de'
def get_sftp():
global sftp
@@ -141,7 +144,7 @@ class SFTPTest (unittest.TestCase):
def setUp(self):
global FOLDER
- for i in xrange(1000):
+ for i in range(1000):
FOLDER = FOLDER[:-3] + '%03d' % i
try:
sftp.mkdir(FOLDER)
@@ -236,7 +239,7 @@ class SFTPTest (unittest.TestCase):
pass
f = sftp.open(FOLDER + '/second.txt', 'r')
f.seek(-6, f.SEEK_END)
- self.assertEqual(f.read(4), 'tent')
+ self.assertEqual(u(f.read(4)), 'tent')
f.close()
finally:
try:
@@ -301,16 +304,16 @@ class SFTPTest (unittest.TestCase):
f.close()
stat = sftp.stat(FOLDER + '/special')
- sftp.chmod(FOLDER + '/special', (stat.st_mode & ~0777) | 0600)
+ sftp.chmod(FOLDER + '/special', (stat.st_mode & ~o777) | o600)
stat = sftp.stat(FOLDER + '/special')
- expected_mode = 0600
+ expected_mode = o600
if sys.platform == 'win32':
# chmod not really functional on windows
- expected_mode = 0666
+ expected_mode = o666
if sys.platform == 'cygwin':
# even worse.
- expected_mode = 0644
- self.assertEqual(stat.st_mode & 0777, expected_mode)
+ expected_mode = o644
+ self.assertEqual(stat.st_mode & o777, expected_mode)
self.assertEqual(stat.st_size, 1024)
mtime = stat.st_mtime - 3600
@@ -341,17 +344,17 @@ class SFTPTest (unittest.TestCase):
f = sftp.open(FOLDER + '/special', 'r+')
stat = f.stat()
- f.chmod((stat.st_mode & ~0777) | 0600)
+ f.chmod((stat.st_mode & ~o777) | o600)
stat = f.stat()
- expected_mode = 0600
+ expected_mode = o600
if sys.platform == 'win32':
# chmod not really functional on windows
- expected_mode = 0666
+ expected_mode = o666
if sys.platform == 'cygwin':
# even worse.
- expected_mode = 0644
- self.assertEqual(stat.st_mode & 0777, expected_mode)
+ expected_mode = o644
+ self.assertEqual(stat.st_mode & o777, expected_mode)
self.assertEqual(stat.st_size, 1024)
mtime = stat.st_mtime - 3600
@@ -643,7 +646,7 @@ class SFTPTest (unittest.TestCase):
f.close()
try:
- sftp.rename(FOLDER + '/something', FOLDER + u'/\u00fcnic\u00f8de')
+ sftp.rename(FOLDER + '/something', FOLDER + '/' + unicode_folder)
sftp.open(FOLDER + '/\xc3\xbcnic\xc3\xb8\x64\x65', 'r')
except Exception:
e = sys.exc_info()[1]
@@ -651,16 +654,16 @@ class SFTPTest (unittest.TestCase):
sftp.unlink(FOLDER + '/\xc3\xbcnic\xc3\xb8\x64\x65')
def test_L_utf8_chdir(self):
- sftp.mkdir(FOLDER + u'\u00fcnic\u00f8de')
+ sftp.mkdir(FOLDER + '/' + unicode_folder)
try:
- sftp.chdir(FOLDER + u'\u00fcnic\u00f8de')
+ sftp.chdir(FOLDER + '/' + unicode_folder)
f = sftp.open('something', 'w')
f.write('okay')
f.close()
sftp.unlink('something')
finally:
sftp.chdir(None)
- sftp.rmdir(FOLDER + u'\u00fcnic\u00f8de')
+ sftp.rmdir(FOLDER + '/' + unicode_folder)
def test_M_bad_readv(self):
"""
@@ -733,10 +736,16 @@ class SFTPTest (unittest.TestCase):
Send an empty file and confirm it is sent.
"""
target = FOLDER + '/empty file.txt'
- stream = StringIO.StringIO()
+ stream = StringIO()
try:
attrs = sftp.putfo(stream, target)
# the returned attributes should not be null
self.assertNotEqual(attrs, None)
finally:
sftp.remove(target)
+
+
+if __name__ == '__main__':
+ SFTPTest.init_loopback()
+ from unittest import main
+ main()
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index 94088f78..20bf0075 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -46,7 +46,7 @@ class BigSFTPTest (unittest.TestCase):
def setUp(self):
global FOLDER
sftp = get_sftp()
- for i in xrange(1000):
+ for i in range(1000):
FOLDER = FOLDER[:-3] + '%03d' % i
try:
sftp.mkdir(FOLDER)
@@ -69,7 +69,7 @@ class BigSFTPTest (unittest.TestCase):
f = sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1)
f.write('this is file #%d.\n' % i)
f.close()
- sftp.chmod('%s/file%d.txt' % (FOLDER, i), 0660)
+ sftp.chmod('%s/file%d.txt' % (FOLDER, i), o660)
# now make sure every file is there, by creating a list of filenmes
# and reading them in random order.
@@ -124,7 +124,7 @@ class BigSFTPTest (unittest.TestCase):
write a 1MB file, with no linefeeds, using pipelining.
"""
sftp = get_sftp()
- kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
+ kblob = ''.join([struct.pack('>H', n) for n in range(512)])
start = time.time()
try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
@@ -165,7 +165,7 @@ class BigSFTPTest (unittest.TestCase):
def test_4_prefetch_seek(self):
sftp = get_sftp()
- kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
+ kblob = ''.join([struct.pack('>H', n) for n in range(512)])
try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
f.set_pipelined(True)
@@ -181,13 +181,13 @@ class BigSFTPTest (unittest.TestCase):
start = time.time()
k2blob = kblob + kblob
chunk = 793
- for i in xrange(10):
+ for i in range(10):
f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
f.prefetch()
base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
- offsets = [base_offset + j * chunk for j in xrange(100)]
+ offsets = [base_offset + j * chunk for j in range(100)]
# randomly seek around and read them out
- for j in xrange(100):
+ for j in range(100):
offset = offsets[random.randint(0, len(offsets) - 1)]
offsets.remove(offset)
f.seek(offset)
@@ -203,7 +203,7 @@ class BigSFTPTest (unittest.TestCase):
def test_5_readv_seek(self):
sftp = get_sftp()
- kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
+ kblob = ''.join([struct.pack('>H', n) for n in range(512)])
try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
f.set_pipelined(True)
@@ -219,21 +219,21 @@ class BigSFTPTest (unittest.TestCase):
start = time.time()
k2blob = kblob + kblob
chunk = 793
- for i in xrange(10):
+ for i in range(10):
f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
# make a bunch of offsets and put them in random order
- offsets = [base_offset + j * chunk for j in xrange(100)]
+ offsets = [base_offset + j * chunk for j in range(100)]
readv_list = []
- for j in xrange(100):
+ for j in range(100):
o = offsets[random.randint(0, len(offsets) - 1)]
offsets.remove(o)
readv_list.append((o, chunk))
ret = f.readv(readv_list)
- for i in xrange(len(readv_list)):
+ for i in range(len(readv_list)):
offset = readv_list[i][0]
n_offset = offset % 1024
- self.assertEqual(ret.next(), k2blob[n_offset:n_offset + chunk])
+ self.assertEqual(next(ret), k2blob[n_offset:n_offset + chunk])
f.close()
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
@@ -279,7 +279,7 @@ class BigSFTPTest (unittest.TestCase):
verify that prefetch and readv don't conflict with each other.
"""
sftp = get_sftp()
- kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
+ kblob = ''.join([struct.pack('>H', n) for n in range(512)])
try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
f.set_pipelined(True)
@@ -318,7 +318,7 @@ class BigSFTPTest (unittest.TestCase):
returned as a single blob.
"""
sftp = get_sftp()
- kblob = ''.join([struct.pack('>H', n) for n in xrange(512)])
+ kblob = ''.join([struct.pack('>H', n) for n in range(512)])
try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
f.set_pipelined(True)
@@ -367,7 +367,7 @@ class BigSFTPTest (unittest.TestCase):
k32blob = (32 * 1024 * 'x')
try:
f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024)
- for i in xrange(32):
+ for i in range(32):
f.write(k32blob)
f.close()
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 6c62b3e3..69fdbbb2 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -158,7 +158,7 @@ class TransportTest(ParamikoTest):
pass
def test_2_compute_key(self):
- self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929L
+ self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929
self.tc.H = unhexlify('0C8307CDE6856FF30BA93684EB0F04C2520E9ED3')
self.tc.session_id = self.tc.H
key = self.tc._compute_key('C', 32)
@@ -406,7 +406,7 @@ class TransportTest(ParamikoTest):
chan.close()
# allow a few seconds for the rekeying to complete
- for i in xrange(50):
+ for i in range(50):
if self.tc.H != self.tc.session_id:
break
time.sleep(0.1)
@@ -659,7 +659,7 @@ class TransportTest(ParamikoTest):
def run(self):
try:
- for i in xrange(1, 1+self.iterations):
+ for i in range(1, 1+self.iterations):
if self.done_event.isSet():
break
self.watchdog_event.set()
diff --git a/tests/test_util.py b/tests/test_util.py
index 7e656df8..858dedba 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -101,7 +101,7 @@ class UtilTest(ParamikoTest):
def test_2_parse_config(self):
global test_config_file
- f = cStringIO.StringIO(test_config_file)
+ f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEquals(config._config,
[{'host': ['*'], 'config': {}}, {'host': ['*'], 'config': {'identityfile': ['~/.ssh/id_rsa'], 'user': 'robey'}},
@@ -111,7 +111,7 @@ class UtilTest(ParamikoTest):
def test_3_host_config(self):
global test_config_file
- f = cStringIO.StringIO(test_config_file)
+ f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
for host, values in {
@@ -138,7 +138,7 @@ class UtilTest(ParamikoTest):
def test_4_generate_key_bytes(self):
x = paramiko.util.generate_key_bytes(SHA, 'ABCDEFGH', 'This is my secret passphrase.', 64)
- hex = ''.join(['%02x' % ord(c) for c in x])
+ hex = ''.join(['%02x' % byte_ord(c) for c in x])
self.assertEquals(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')
def test_5_host_keys(self):
@@ -172,7 +172,7 @@ Host *.example.com
Host *
Port 3333
"""
- f = cStringIO.StringIO(test_config_file)
+ f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
host = 'www13.example.com'
self.assertEquals(
@@ -216,7 +216,7 @@ Host space-delimited
Host equals-delimited
ProxyCommand=foo bar=biz baz
"""
- f = cStringIO.StringIO(conf)
+ f = StringIO(conf)
config = paramiko.util.parse_ssh_config(f)
for host in ('space-delimited', 'equals-delimited'):
self.assertEquals(
@@ -228,7 +228,7 @@ Host equals-delimited
"""
ProxyCommand should perform interpolation on the value
"""
- config = paramiko.util.parse_ssh_config(cStringIO.StringIO("""
+ config = paramiko.util.parse_ssh_config(StringIO("""
Host specific
Port 37
ProxyCommand host %h port %p lol
@@ -264,7 +264,7 @@ Host www13.*
Host *
Port 3333
"""
- f = cStringIO.StringIO(test_config_file)
+ f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
host = 'www13.example.com'
self.assertEquals(
@@ -293,7 +293,7 @@ ProxyCommand foo=bar:%h-%p
'foo=bar:proxy-without-equal-divisor-22'}
}.items():
- f = cStringIO.StringIO(test_config_file)
+ f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEquals(
paramiko.util.lookup_ssh_host_config(host, config),
@@ -323,7 +323,7 @@ IdentityFile id_dsa22
'identityfile': ['id_dsa0', 'id_dsa1', 'id_dsa22']}
}.items():
- f = cStringIO.StringIO(test_config_file)
+ f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEquals(
paramiko.util.lookup_ssh_host_config(host, config),
@@ -338,5 +338,5 @@ IdentityFile id_dsa22
AddressFamily inet
IdentityFile something_%l_using_fqdn
"""
- config = paramiko.util.parse_ssh_config(cStringIO.StringIO(test_config))
+ config = paramiko.util.parse_ssh_config(StringIO(test_config))
assert config.lookup('meh') # will die during lookup() if bug regresses