summaryrefslogtreecommitdiffhomepage
path: root/tests/test_sftp_big.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_sftp_big.py')
-rw-r--r--tests/test_sftp_big.py294
1 files changed, 147 insertions, 147 deletions
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index cfad5682..9df566e8 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -31,107 +31,94 @@ import time
import unittest
from paramiko.common import o660
-from tests.test_sftp import get_sftp
-FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000')
+from .util import slow
-class BigSFTPTest (unittest.TestCase):
-
- def setUp(self):
- global FOLDER
- sftp = get_sftp()
- for i in range(1000):
- FOLDER = FOLDER[:-3] + '%03d' % i
- try:
- sftp.mkdir(FOLDER)
- break
- except (IOError, OSError):
- pass
-
- def tearDown(self):
- sftp = get_sftp()
- sftp.rmdir(FOLDER)
-
- def test_1_lots_of_files(self):
+@slow
+class TestBigSFTP(object):
+ def test_1_lots_of_files(self, sftp):
"""
create a bunch of files over the same session.
"""
- sftp = get_sftp()
numfiles = 100
try:
for i in range(numfiles):
- with sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1) as f:
- f.write('this is file #%d.\n' % i)
- sftp.chmod('%s/file%d.txt' % (FOLDER, i), o660)
+ with sftp.open(
+ "%s/file%d.txt" % (sftp.FOLDER, i), "w", 1
+ ) as f:
+ f.write("this is file #%d.\n" % i)
+ sftp.chmod("%s/file%d.txt" % (sftp.FOLDER, i), o660)
# now make sure every file is there, by creating a list of filenmes
# and reading them in random order.
numlist = list(range(numfiles))
while len(numlist) > 0:
r = numlist[random.randint(0, len(numlist) - 1)]
- with sftp.open('%s/file%d.txt' % (FOLDER, r)) as f:
- self.assertEqual(f.readline(), 'this is file #%d.\n' % r)
+ with sftp.open("%s/file%d.txt" % (sftp.FOLDER, r)) as f:
+ assert f.readline() == "this is file #%d.\n" % r
numlist.remove(r)
finally:
for i in range(numfiles):
try:
- sftp.remove('%s/file%d.txt' % (FOLDER, i))
+ sftp.remove("%s/file%d.txt" % (sftp.FOLDER, i))
except:
pass
- def test_2_big_file(self):
+ def test_2_big_file(self, sftp):
"""
write a 1MB file with no buffering.
"""
- sftp = get_sftp()
- kblob = (1024 * b'x')
+ kblob = 1024 * b"x"
start = time.time()
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w') as f:
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "w") as f:
for n in range(1024):
f.write(kblob)
if n % 128 == 0:
- sys.stderr.write('.')
- sys.stderr.write(' ')
+ sys.stderr.write(".")
+ sys.stderr.write(" ")
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert (
+ sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ )
end = time.time()
- sys.stderr.write('%ds ' % round(end - start))
-
+ sys.stderr.write("%ds " % round(end - start))
+
start = time.time()
- with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "r") as f:
for n in range(1024):
data = f.read(1024)
- self.assertEqual(data, kblob)
+ assert data == kblob
end = time.time()
- sys.stderr.write('%ds ' % round(end - start))
+ sys.stderr.write("%ds " % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_3_big_file_pipelined(self):
+ def test_3_big_file_pipelined(self, sftp):
"""
write a 1MB file, with no linefeeds, using pipelining.
"""
- sftp = get_sftp()
- kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
+ kblob = bytes().join([struct.pack(">H", n) for n in range(512)])
start = time.time()
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
if n % 128 == 0:
- sys.stderr.write('.')
- sys.stderr.write(' ')
+ sys.stderr.write(".")
+ sys.stderr.write(" ")
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert (
+ sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ )
end = time.time()
- sys.stderr.write('%ds ' % round(end - start))
-
+ sys.stderr.write("%ds " % round(end - start))
+
start = time.time()
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f:
file_size = f.stat().st_size
f.prefetch(file_size)
@@ -145,36 +132,39 @@ class BigSFTPTest (unittest.TestCase):
chunk = size - n
data = f.read(chunk)
offset = n % 1024
- self.assertEqual(data, k2blob[offset:offset + chunk])
+ assert data == k2blob[offset : offset + chunk]
n += chunk
end = time.time()
- sys.stderr.write('%ds ' % round(end - start))
+ sys.stderr.write("%ds " % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_4_prefetch_seek(self):
- sftp = get_sftp()
- kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
+ def test_4_prefetch_seek(self, sftp):
+ kblob = bytes().join([struct.pack(">H", n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
if n % 128 == 0:
- sys.stderr.write('.')
- sys.stderr.write(' ')
-
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
-
+ sys.stderr.write(".")
+ sys.stderr.write(" ")
+
+ assert (
+ sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ )
+
start = time.time()
k2blob = kblob + kblob
chunk = 793
for i in range(10):
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f:
file_size = f.stat().st_size
f.prefetch(file_size)
- base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
+ base_offset = (512 * 1024) + 17 * random.randint(
+ 1000, 2000
+ )
offsets = [base_offset + j * chunk for j in range(100)]
# randomly seek around and read them out
for j in range(100):
@@ -183,33 +173,36 @@ class BigSFTPTest (unittest.TestCase):
f.seek(offset)
data = f.read(chunk)
n_offset = offset % 1024
- self.assertEqual(data, k2blob[n_offset:n_offset + chunk])
+ assert data == k2blob[n_offset : n_offset + chunk]
offset += chunk
end = time.time()
- sys.stderr.write('%ds ' % round(end - start))
+ sys.stderr.write("%ds " % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_5_readv_seek(self):
- sftp = get_sftp()
- kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
+ def test_5_readv_seek(self, sftp):
+ kblob = bytes().join([struct.pack(">H", n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
if n % 128 == 0:
- sys.stderr.write('.')
- sys.stderr.write(' ')
+ sys.stderr.write(".")
+ sys.stderr.write(" ")
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert (
+ sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ )
start = time.time()
k2blob = kblob + kblob
chunk = 793
for i in range(10):
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
- base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f:
+ base_offset = (512 * 1024) + 17 * random.randint(
+ 1000, 2000
+ )
# make a bunch of offsets and put them in random order
offsets = [base_offset + j * chunk for j in range(100)]
readv_list = []
@@ -221,155 +214,162 @@ class BigSFTPTest (unittest.TestCase):
for i in range(len(readv_list)):
offset = readv_list[i][0]
n_offset = offset % 1024
- self.assertEqual(next(ret), k2blob[n_offset:n_offset + chunk])
+ assert next(ret) == k2blob[n_offset : n_offset + chunk]
end = time.time()
- sys.stderr.write('%ds ' % round(end - start))
+ sys.stderr.write("%ds " % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove("%s/hongry.txt" % sftp.FOLDER)
- def test_6_lots_of_prefetching(self):
+ def test_6_lots_of_prefetching(self, sftp):
"""
prefetch a 1MB file a bunch of times, discarding the file object
without using it, to verify that paramiko doesn't get confused.
"""
- sftp = get_sftp()
- kblob = (1024 * b'x')
+ kblob = 1024 * b"x"
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w') as f:
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "w") as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
if n % 128 == 0:
- sys.stderr.write('.')
- sys.stderr.write(' ')
+ sys.stderr.write(".")
+ sys.stderr.write(" ")
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert (
+ sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ )
for i in range(10):
- with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "r") as f:
file_size = f.stat().st_size
f.prefetch(file_size)
- with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "r") as f:
file_size = f.stat().st_size
f.prefetch(file_size)
for n in range(1024):
data = f.read(1024)
- self.assertEqual(data, kblob)
+ assert data == kblob
if n % 128 == 0:
- sys.stderr.write('.')
- sys.stderr.write(' ')
+ sys.stderr.write(".")
+ sys.stderr.write(" ")
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
-
- def test_7_prefetch_readv(self):
+ sftp.remove("%s/hongry.txt" % sftp.FOLDER)
+
+ def test_7_prefetch_readv(self, sftp):
"""
verify that prefetch and readv don't conflict with each other.
"""
- sftp = get_sftp()
- kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
+ kblob = bytes().join([struct.pack(">H", n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
if n % 128 == 0:
- sys.stderr.write('.')
- sys.stderr.write(' ')
-
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ sys.stderr.write(".")
+ sys.stderr.write(" ")
+
+ assert (
+ sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ )
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f:
file_size = f.stat().st_size
f.prefetch(file_size)
data = f.read(1024)
- self.assertEqual(data, kblob)
+ assert data == kblob
chunk_size = 793
base_offset = 512 * 1024
k2blob = kblob + kblob
- chunks = [(base_offset + (chunk_size * i), chunk_size) for i in range(20)]
+ chunks = [
+ (base_offset + (chunk_size * i), chunk_size)
+ for i in range(20)
+ ]
for data in f.readv(chunks):
offset = base_offset % 1024
- self.assertEqual(chunk_size, len(data))
- self.assertEqual(k2blob[offset:offset + chunk_size], data)
+ assert chunk_size == len(data)
+ assert k2blob[offset : offset + chunk_size] == data
base_offset += chunk_size
- sys.stderr.write(' ')
+ sys.stderr.write(" ")
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
-
- def test_8_large_readv(self):
+ sftp.remove("%s/hongry.txt" % sftp.FOLDER)
+
+ def test_8_large_readv(self, sftp):
"""
verify that a very large readv is broken up correctly and still
returned as a single blob.
"""
- sftp = get_sftp()
- kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
+ kblob = bytes().join([struct.pack(">H", n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
if n % 128 == 0:
- sys.stderr.write('.')
- sys.stderr.write(' ')
+ sys.stderr.write(".")
+ sys.stderr.write(" ")
+
+ assert (
+ sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ )
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
-
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open("%s/hongry.txt" % sftp.FOLDER, "rb") as f:
data = list(f.readv([(23 * 1024, 128 * 1024)]))
- self.assertEqual(1, len(data))
+ assert len(data) == 1
data = data[0]
- self.assertEqual(128 * 1024, len(data))
-
- sys.stderr.write(' ')
+ assert len(data) == 128 * 1024
+
+ sys.stderr.write(" ")
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
-
- def test_9_big_file_big_buffer(self):
+ sftp.remove("%s/hongry.txt" % sftp.FOLDER)
+
+ def test_9_big_file_big_buffer(self, sftp):
"""
write a 1MB file, with no linefeeds, and a big buffer.
"""
- sftp = get_sftp()
- mblob = (1024 * 1024 * 'x')
+ mblob = 1024 * 1024 * "x"
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) as f:
+ with sftp.open(
+ "%s/hongry.txt" % sftp.FOLDER, "w", 128 * 1024
+ ) as f:
f.write(mblob)
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert (
+ sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ )
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
-
- def test_A_big_file_renegotiate(self):
+ sftp.remove("%s/hongry.txt" % sftp.FOLDER)
+
+ def test_A_big_file_renegotiate(self, sftp):
"""
write a 1MB file, forcing key renegotiation in the middle.
"""
- sftp = get_sftp()
t = sftp.sock.get_transport()
t.packetizer.REKEY_BYTES = 512 * 1024
- k32blob = (32 * 1024 * 'x')
+ k32blob = 32 * 1024 * "x"
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) as f:
+ with sftp.open(
+ "%s/hongry.txt" % sftp.FOLDER, "w", 128 * 1024
+ ) as f:
for i in range(32):
f.write(k32blob)
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
- self.assertNotEqual(t.H, t.session_id)
-
+ assert (
+ sftp.stat("%s/hongry.txt" % sftp.FOLDER).st_size == 1024 * 1024
+ )
+ assert t.H != t.session_id
+
# try to read it too.
- with sftp.open('%s/hongry.txt' % FOLDER, 'r', 128 * 1024) as f:
+ with sftp.open(
+ "%s/hongry.txt" % sftp.FOLDER, "r", 128 * 1024
+ ) as f:
file_size = f.stat().st_size
f.prefetch(file_size)
total = 0
while total < 1024 * 1024:
total += len(f.read(32 * 1024))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove("%s/hongry.txt" % sftp.FOLDER)
t.packetizer.REKEY_BYTES = pow(2, 30)
-
-
-if __name__ == '__main__':
- from tests.test_sftp import SFTPTest
- SFTPTest.init_loopback()
- from unittest import main
- main()