From 89e39a455956848268924ea54fbab9ad374b1bb8 Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Thu, 9 Mar 2006 00:14:55 -0800 Subject: [project @ robey@lag.net-20060309081455-84be2ae54f98e897] move sftp big-file tests into a separate class and add one that does a prefetch, then seeks in random order --- tests/test_sftp.py | 196 ++++------------------------------------------------- 1 file changed, 13 insertions(+), 183 deletions(-) (limited to 'tests/test_sftp.py') diff --git a/tests/test_sftp.py b/tests/test_sftp.py index 77ab2ed6..34690aba 100755 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -26,6 +26,7 @@ do test file operations in (so no existing files will be harmed). import logging import os import random +import struct import sys import threading import time @@ -69,6 +70,11 @@ tc = None g_big_file_test = True +def get_sftp(): + global sftp + return sftp + + class SFTPTest (unittest.TestCase): def init(hostname, username, keyfile, passwd): @@ -449,183 +455,7 @@ class SFTPTest (unittest.TestCase): except: pass - def test_E_lots_of_files(self): - """ - create a bunch of files over the same session. - """ - global g_big_file_test - if not g_big_file_test: - return - numfiles = 100 - try: - for i in range(numfiles): - f = sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1) - f.write('this is file #%d.\n' % i) - f.close() - sftp.chmod('%s/file%d.txt' % (FOLDER, i), 0660) - - # now make sure every file is there, by creating a list of filenmes - # and reading them in random order. - numlist = range(numfiles) - while len(numlist) > 0: - r = numlist[random.randint(0, len(numlist) - 1)] - f = sftp.open('%s/file%d.txt' % (FOLDER, r)) - self.assertEqual(f.readline(), 'this is file #%d.\n' % r) - f.close() - numlist.remove(r) - finally: - for i in range(numfiles): - try: - sftp.remove('%s/file%d.txt' % (FOLDER, i)) - except: - pass - - def test_F_big_file(self): - """ - write a 1MB file with no buffering. - """ - global g_big_file_test - if not g_big_file_test: - return - kblob = (1024 * 'x') - start = time.time() - try: - f = sftp.open('%s/hongry.txt' % FOLDER, 'w') - for n in range(1024): - f.write(kblob) - if n % 128 == 0: - sys.stderr.write('.') - f.close() - sys.stderr.write(' ') - - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) - end = time.time() - sys.stderr.write('%ds ' % round(end - start)) - - start = time.time() - f = sftp.open('%s/hongry.txt' % FOLDER, 'r') - for n in range(1024): - data = f.read(1024) - self.assertEqual(data, kblob) - f.close() - - end = time.time() - sys.stderr.write('%ds ' % round(end - start)) - finally: - sftp.remove('%s/hongry.txt' % FOLDER) - - def test_G_big_file_pipelined(self): - """ - write a 1MB file, with no linefeeds, using pipelining. - """ - global g_big_file_test - if not g_big_file_test: - return - kblob = (1024 * 'x') - start = time.time() - try: - f = sftp.open('%s/hongry.txt' % FOLDER, 'w') - f.set_pipelined(True) - for n in range(1024): - f.write(kblob) - if n % 128 == 0: - sys.stderr.write('.') - f.close() - sys.stderr.write(' ') - - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) - end = time.time() - sys.stderr.write('%ds ' % round(end - start)) - - start = time.time() - f = sftp.open('%s/hongry.txt' % FOLDER, 'r') - f.prefetch() - for n in range(1024): - data = f.read(1024) - self.assertEqual(data, kblob) - f.close() - - end = time.time() - sys.stderr.write('%ds ' % round(end - start)) - finally: - sftp.remove('%s/hongry.txt' % FOLDER) - - def test_H_lots_of_prefetching(self): - """ - prefetch a 1MB file a bunch of times, discarding the file object - without using it, to verify that paramiko doesn't get confused. - """ - global g_big_file_test - if not g_big_file_test: - return - kblob = (1024 * 'x') - try: - f = sftp.open('%s/hongry.txt' % FOLDER, 'w') - f.set_pipelined(True) - for n in range(1024): - f.write(kblob) - if n % 128 == 0: - sys.stderr.write('.') - f.close() - sys.stderr.write(' ') - - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) - - for i in range(10): - f = sftp.open('%s/hongry.txt' % FOLDER, 'r') - f.prefetch() - f = sftp.open('%s/hongry.txt' % FOLDER, 'r') - f.prefetch() - for n in range(1024): - data = f.read(1024) - self.assertEqual(data, kblob) - if n % 128 == 0: - sys.stderr.write('.') - f.close() - sys.stderr.write(' ') - finally: - sftp.remove('%s/hongry.txt' % FOLDER) - - def test_I_big_file_big_buffer(self): - """ - write a 1MB file, with no linefeeds, and a big buffer. - """ - global g_big_file_test - if not g_big_file_test: - return - mblob = (1024 * 1024 * 'x') - try: - f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) - f.write(mblob) - f.close() - - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) - finally: - sftp.remove('%s/hongry.txt' % FOLDER) - - def test_J_big_file_renegotiate(self): - """ - write a 1MB file, forcing key renegotiation in the middle. - """ - global g_big_file_test - if not g_big_file_test: - return - t = sftp.sock.get_transport() - t.packetizer.REKEY_BYTES = 512 * 1024 - k32blob = (32 * 1024 * 'x') - try: - f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) - for i in xrange(32): - f.write(k32blob) - f.close() - - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) - self.assertNotEquals(t.H, t.session_id) - finally: - sftp.remove('%s/hongry.txt' % FOLDER) - t.packetizer.REKEY_BYTES = pow(2, 30) - - def test_K_realpath(self): + def test_E_realpath(self): """ test that realpath is returning something non-empty and not an error. @@ -636,7 +466,7 @@ class SFTPTest (unittest.TestCase): self.assert_(len(f) > 0) self.assertEquals(os.path.join(pwd, FOLDER), f) - def test_L_mkdir(self): + def test_F_mkdir(self): """ verify that mkdir/rmdir work. """ @@ -659,7 +489,7 @@ class SFTPTest (unittest.TestCase): except IOError: pass - def test_M_chdir(self): + def test_G_chdir(self): """ verify that chdir/getcwd work. """ @@ -696,7 +526,7 @@ class SFTPTest (unittest.TestCase): except: pass - def test_N_get_put(self): + def test_H_get_put(self): """ verify that get/put work. """ @@ -725,7 +555,7 @@ class SFTPTest (unittest.TestCase): os.unlink(localname) sftp.unlink(FOLDER + '/bunny.txt') - def test_O_check(self): + def test_I_check(self): """ verify that file.check() works against our own server. (it's an sftp extension that we support, and may be the only ones who @@ -747,7 +577,7 @@ class SFTPTest (unittest.TestCase): finally: sftp.unlink(FOLDER + '/kitty.txt') - def test_P_x_flag(self): + def test_J_x_flag(self): """ verify that the 'x' flag works when opening a file. """ @@ -763,7 +593,7 @@ class SFTPTest (unittest.TestCase): finally: sftp.unlink(FOLDER + '/unusual.txt') - def test_Q_utf8(self): + def test_K_utf8(self): """ verify that unicode strings are encoded into utf8 correctly. """ -- cgit v1.2.3