diff options
Diffstat (limited to 'tests/test_sftp.py')
-rw-r--r--[-rwxr-xr-x] | tests/test_sftp.py | 656 |
1 files changed, 269 insertions, 387 deletions
diff --git a/tests/test_sftp.py b/tests/test_sftp.py index b3c7bf98..09a50453 100755..100644 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -32,16 +32,19 @@ import warnings from binascii import hexlify from tempfile import mkstemp +import pytest + import paramiko +import paramiko.util from paramiko.py3compat import PY2, b, u, StringIO from paramiko.common import o777, o600, o666, o644 -from tests import skipUnlessBuiltin -from tests.stub_sftp import StubServer, StubSFTPServer -from tests.loop import LoopSocket -from tests.util import test_path -import paramiko.util from paramiko.sftp_attr import SFTPAttributes +from .util import needs_builtin +from .stub_sftp import StubServer, StubSFTPServer +from .util import _support, slow + + ARTICLE = ''' Insulin sensitivity and liver insulin receptor structure in ducks from two genera @@ -81,303 +84,201 @@ decreased compared with chicken. # Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation byte" NON_UTF8_DATA = b'\xC3\xC3' -FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000') - -sftp = None -tc = None -g_big_file_test = True -# we need to use eval(compile()) here because Py3.2 doesn't support the 'u' marker for unicode -# this test is the only line in the entire program that has to be treated specially to support Py3.2 -unicode_folder = eval(compile(r"u'\u00fcnic\u00f8de'" if PY2 else r"'\u00fcnic\u00f8de'", 'test_sftp.py', 'eval')) +unicode_folder = u'\u00fcnic\u00f8de' if PY2 else '\u00fcnic\u00f8de' utf8_folder = b'/\xc3\xbcnic\xc3\xb8\x64\x65' -def get_sftp(): - global sftp - return sftp - - -class SFTPTest (unittest.TestCase): - @staticmethod - def init(hostname, username, keyfile, passwd): - global sftp, tc - - t = paramiko.Transport(hostname) - tc = t - try: - key = paramiko.RSAKey.from_private_key_file(keyfile, passwd) - except paramiko.PasswordRequiredException: - sys.stderr.write('\n\nparamiko.RSAKey.from_private_key_file REQUIRES PASSWORD.\n') - sys.stderr.write('You have two options:\n') - sys.stderr.write('* Use the "-K" option to point to a different (non-password-protected)\n') - sys.stderr.write(' private key file.\n') - sys.stderr.write('* Use the "-P" option to provide the password needed to unlock this private\n') - sys.stderr.write(' key.\n') - sys.stderr.write('\n') - sys.exit(1) - try: - t.connect(username=username, pkey=key) - except paramiko.SSHException: - t.close() - sys.stderr.write('\n\nparamiko.Transport.connect FAILED.\n') - sys.stderr.write('There are several possible reasons why it might fail so quickly:\n\n') - sys.stderr.write('* The host to connect to (%s) is not a valid SSH server.\n' % hostname) - sys.stderr.write(' (Use the "-H" option to change the host.)\n') - sys.stderr.write('* The username to auth as (%s) is invalid.\n' % username) - sys.stderr.write(' (Use the "-U" option to change the username.)\n') - sys.stderr.write('* The private key given (%s) is not accepted by the server.\n' % keyfile) - sys.stderr.write(' (Use the "-K" option to provide a different key file.)\n') - sys.stderr.write('\n') - sys.exit(1) - sftp = paramiko.SFTP.from_transport(t) - - @staticmethod - def init_loopback(): - global sftp, tc - - socks = LoopSocket() - sockc = LoopSocket() - sockc.link(socks) - tc = paramiko.Transport(sockc) - ts = paramiko.Transport(socks) - - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) - ts.add_server_key(host_key) - event = threading.Event() - server = StubServer() - ts.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer) - ts.start_server(event, server) - tc.connect(username='slowdive', password='pygmalion') - event.wait(1.0) - - sftp = paramiko.SFTP.from_transport(tc) - - @staticmethod - def set_big_file_test(onoff): - global g_big_file_test - g_big_file_test = onoff - - def setUp(self): - global FOLDER - for i in range(1000): - FOLDER = FOLDER[:-3] + '%03d' % i - try: - sftp.mkdir(FOLDER) - break - except (IOError, OSError): - pass - - def tearDown(self): - #sftp.chdir() - sftp.rmdir(FOLDER) - - def test_1_file(self): +@slow +class TestSFTP(object): + def test_1_file(self, sftp): """ verify that we can create a file. """ - f = sftp.open(FOLDER + '/test', 'w') + f = sftp.open(sftp.FOLDER + '/test', 'w') try: - self.assertEqual(f.stat().st_size, 0) + assert f.stat().st_size == 0 finally: f.close() - sftp.remove(FOLDER + '/test') + sftp.remove(sftp.FOLDER + '/test') - def test_2_close(self): + def test_2_close(self, sftp): """ - verify that closing the sftp session doesn't do anything bad, and that - a new one can be opened. + Verify that SFTP session close() causes a socket error on next action. """ - global sftp sftp.close() - try: - sftp.open(FOLDER + '/test2', 'w') - self.fail('expected exception') - except: - pass - sftp = paramiko.SFTP.from_transport(tc) + with pytest.raises(socket.error, match='Socket is closed'): + sftp.open(sftp.FOLDER + '/test2', 'w') - def test_2_sftp_can_be_used_as_context_manager(self): + def test_2_sftp_can_be_used_as_context_manager(self, sftp): """ verify that the sftp session is closed when exiting the context manager """ - global sftp with sftp: pass - try: - sftp.open(FOLDER + '/test2', 'w') - self.fail('expected exception') - except (EOFError, socket.error): - pass - finally: - sftp = paramiko.SFTP.from_transport(tc) + with pytest.raises(socket.error, match='Socket is closed'): + sftp.open(sftp.FOLDER + '/test2', 'w') - def test_3_write(self): + def test_3_write(self, sftp): """ verify that a file can be created and written, and the size is correct. """ try: - with sftp.open(FOLDER + '/duck.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f: f.write(ARTICLE) - self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) + assert sftp.stat(sftp.FOLDER + '/duck.txt').st_size == 1483 finally: - sftp.remove(FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/duck.txt') - def test_3_sftp_file_can_be_used_as_context_manager(self): + def test_3_sftp_file_can_be_used_as_context_manager(self, sftp): """ verify that an opened file can be used as a context manager """ try: - with sftp.open(FOLDER + '/duck.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f: f.write(ARTICLE) - self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) + assert sftp.stat(sftp.FOLDER + '/duck.txt').st_size == 1483 finally: - sftp.remove(FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/duck.txt') - def test_4_append(self): + def test_4_append(self, sftp): """ verify that a file can be opened for append, and tell() still works. """ try: - with sftp.open(FOLDER + '/append.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/append.txt', 'w') as f: f.write('first line\nsecond line\n') - self.assertEqual(f.tell(), 23) + assert f.tell() == 23 - with sftp.open(FOLDER + '/append.txt', 'a+') as f: + with sftp.open(sftp.FOLDER + '/append.txt', 'a+') as f: f.write('third line!!!\n') - self.assertEqual(f.tell(), 37) - self.assertEqual(f.stat().st_size, 37) + assert f.tell() == 37 + assert f.stat().st_size == 37 f.seek(-26, f.SEEK_CUR) - self.assertEqual(f.readline(), 'second line\n') + assert f.readline() == 'second line\n' finally: - sftp.remove(FOLDER + '/append.txt') + sftp.remove(sftp.FOLDER + '/append.txt') - def test_5_rename(self): + def test_5_rename(self, sftp): """ verify that renaming a file works. """ try: - with sftp.open(FOLDER + '/first.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/first.txt', 'w') as f: f.write('content!\n') - sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt') - try: - sftp.open(FOLDER + '/first.txt', 'r') - self.assertTrue(False, 'no exception on reading nonexistent file') - except IOError: - pass - with sftp.open(FOLDER + '/second.txt', 'r') as f: + sftp.rename(sftp.FOLDER + '/first.txt', sftp.FOLDER + '/second.txt') + with pytest.raises(IOError, match='No such file'): + sftp.open(sftp.FOLDER + '/first.txt', 'r') + with sftp.open(sftp.FOLDER + '/second.txt', 'r') as f: f.seek(-6, f.SEEK_END) - self.assertEqual(u(f.read(4)), 'tent') + assert u(f.read(4)) == 'tent' finally: + # TODO: this is gross, make some sort of 'remove if possible' / 'rm + # -f' a-like, jeez try: - sftp.remove(FOLDER + '/first.txt') + sftp.remove(sftp.FOLDER + '/first.txt') except: pass try: - sftp.remove(FOLDER + '/second.txt') + sftp.remove(sftp.FOLDER + '/second.txt') except: pass - def test_5a_posix_rename(self): + def test_5a_posix_rename(self, sftp): """Test posix-rename@openssh.com protocol extension.""" try: # first check that the normal rename works as specified - with sftp.open(FOLDER + '/a', 'w') as f: + with sftp.open(sftp.FOLDER + '/a', 'w') as f: f.write('one') - sftp.rename(FOLDER + '/a', FOLDER + '/b') - with sftp.open(FOLDER + '/a', 'w') as f: + sftp.rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b') + with sftp.open(sftp.FOLDER + '/a', 'w') as f: f.write('two') - try: - sftp.rename(FOLDER + '/a', FOLDER + '/b') - self.assertTrue(False, 'no exception when rename-ing onto existing file') - except (OSError, IOError): - pass + with pytest.raises(IOError): # actual message seems generic + sftp.rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b') # now check with the posix_rename - sftp.posix_rename(FOLDER + '/a', FOLDER + '/b') - with sftp.open(FOLDER + '/b', 'r') as f: + sftp.posix_rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b') + with sftp.open(sftp.FOLDER + '/b', 'r') as f: data = u(f.read()) - self.assertEqual('two', data, "Contents of renamed file not the same as original file") + err = "Contents of renamed file not the same as original file" + assert 'two' == data, err finally: try: - sftp.remove(FOLDER + '/a') + sftp.remove(sftp.FOLDER + '/a') except: pass try: - sftp.remove(FOLDER + '/b') + sftp.remove(sftp.FOLDER + '/b') except: pass - def test_6_folder(self): + def test_6_folder(self, sftp): """ create a temporary folder, verify that we can create a file in it, then remove the folder and verify that we can't create a file in it anymore. """ - sftp.mkdir(FOLDER + '/subfolder') - sftp.open(FOLDER + '/subfolder/test', 'w').close() - sftp.remove(FOLDER + '/subfolder/test') - sftp.rmdir(FOLDER + '/subfolder') - try: - sftp.open(FOLDER + '/subfolder/test') - # shouldn't be able to create that file - self.assertTrue(False, 'no exception at dummy file creation') - except IOError: - pass + sftp.mkdir(sftp.FOLDER + '/subfolder') + sftp.open(sftp.FOLDER + '/subfolder/test', 'w').close() + sftp.remove(sftp.FOLDER + '/subfolder/test') + sftp.rmdir(sftp.FOLDER + '/subfolder') + # shouldn't be able to create that file if dir removed + with pytest.raises(IOError, match="No such file"): + sftp.open(sftp.FOLDER + '/subfolder/test') - def test_7_listdir(self): + def test_7_listdir(self, sftp): """ verify that a folder can be created, a bunch of files can be placed in it, and those files show up in sftp.listdir. """ try: - sftp.open(FOLDER + '/duck.txt', 'w').close() - sftp.open(FOLDER + '/fish.txt', 'w').close() - sftp.open(FOLDER + '/tertiary.py', 'w').close() - - x = sftp.listdir(FOLDER) - self.assertEqual(len(x), 3) - self.assertTrue('duck.txt' in x) - self.assertTrue('fish.txt' in x) - self.assertTrue('tertiary.py' in x) - self.assertTrue('random' not in x) + sftp.open(sftp.FOLDER + '/duck.txt', 'w').close() + sftp.open(sftp.FOLDER + '/fish.txt', 'w').close() + sftp.open(sftp.FOLDER + '/tertiary.py', 'w').close() + + x = sftp.listdir(sftp.FOLDER) + assert len(x) == 3 + assert 'duck.txt' in x + assert 'fish.txt' in x + assert 'tertiary.py' in x + assert 'random' not in x finally: - sftp.remove(FOLDER + '/duck.txt') - sftp.remove(FOLDER + '/fish.txt') - sftp.remove(FOLDER + '/tertiary.py') + sftp.remove(sftp.FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/fish.txt') + sftp.remove(sftp.FOLDER + '/tertiary.py') - def test_7_5_listdir_iter(self): + def test_7_5_listdir_iter(self, sftp): """ listdir_iter version of above test """ try: - sftp.open(FOLDER + '/duck.txt', 'w').close() - sftp.open(FOLDER + '/fish.txt', 'w').close() - sftp.open(FOLDER + '/tertiary.py', 'w').close() - - x = [x.filename for x in sftp.listdir_iter(FOLDER)] - self.assertEqual(len(x), 3) - self.assertTrue('duck.txt' in x) - self.assertTrue('fish.txt' in x) - self.assertTrue('tertiary.py' in x) - self.assertTrue('random' not in x) + sftp.open(sftp.FOLDER + '/duck.txt', 'w').close() + sftp.open(sftp.FOLDER + '/fish.txt', 'w').close() + sftp.open(sftp.FOLDER + '/tertiary.py', 'w').close() + + x = [x.filename for x in sftp.listdir_iter(sftp.FOLDER)] + assert len(x) == 3 + assert 'duck.txt' in x + assert 'fish.txt' in x + assert 'tertiary.py' in x + assert 'random' not in x finally: - sftp.remove(FOLDER + '/duck.txt') - sftp.remove(FOLDER + '/fish.txt') - sftp.remove(FOLDER + '/tertiary.py') + sftp.remove(sftp.FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/fish.txt') + sftp.remove(sftp.FOLDER + '/tertiary.py') - def test_8_setstat(self): + def test_8_setstat(self, sftp): """ verify that the setstat functions (chown, chmod, utime, truncate) work. """ try: - with sftp.open(FOLDER + '/special', 'w') as f: + with sftp.open(sftp.FOLDER + '/special', 'w') as f: f.write('x' * 1024) - stat = sftp.stat(FOLDER + '/special') - sftp.chmod(FOLDER + '/special', (stat.st_mode & ~o777) | o600) - stat = sftp.stat(FOLDER + '/special') + stat = sftp.stat(sftp.FOLDER + '/special') + sftp.chmod(sftp.FOLDER + '/special', (stat.st_mode & ~o777) | o600) + stat = sftp.stat(sftp.FOLDER + '/special') expected_mode = o600 if sys.platform == 'win32': # chmod not really functional on windows @@ -385,35 +286,35 @@ class SFTPTest (unittest.TestCase): if sys.platform == 'cygwin': # even worse. expected_mode = o644 - self.assertEqual(stat.st_mode & o777, expected_mode) - self.assertEqual(stat.st_size, 1024) + assert stat.st_mode & o777 == expected_mode + assert stat.st_size == 1024 mtime = stat.st_mtime - 3600 atime = stat.st_atime - 1800 - sftp.utime(FOLDER + '/special', (atime, mtime)) - stat = sftp.stat(FOLDER + '/special') - self.assertEqual(stat.st_mtime, mtime) + sftp.utime(sftp.FOLDER + '/special', (atime, mtime)) + stat = sftp.stat(sftp.FOLDER + '/special') + assert stat.st_mtime == mtime if sys.platform not in ('win32', 'cygwin'): - self.assertEqual(stat.st_atime, atime) + assert stat.st_atime == atime # can't really test chown, since we'd have to know a valid uid. - sftp.truncate(FOLDER + '/special', 512) - stat = sftp.stat(FOLDER + '/special') - self.assertEqual(stat.st_size, 512) + sftp.truncate(sftp.FOLDER + '/special', 512) + stat = sftp.stat(sftp.FOLDER + '/special') + assert stat.st_size == 512 finally: - sftp.remove(FOLDER + '/special') + sftp.remove(sftp.FOLDER + '/special') - def test_9_fsetstat(self): + def test_9_fsetstat(self, sftp): """ verify that the fsetstat functions (chown, chmod, utime, truncate) work on open files. """ try: - with sftp.open(FOLDER + '/special', 'w') as f: + with sftp.open(sftp.FOLDER + '/special', 'w') as f: f.write('x' * 1024) - with sftp.open(FOLDER + '/special', 'r+') as f: + with sftp.open(sftp.FOLDER + '/special', 'r+') as f: stat = f.stat() f.chmod((stat.st_mode & ~o777) | o600) stat = f.stat() @@ -425,26 +326,26 @@ class SFTPTest (unittest.TestCase): if sys.platform == 'cygwin': # even worse. expected_mode = o644 - self.assertEqual(stat.st_mode & o777, expected_mode) - self.assertEqual(stat.st_size, 1024) + assert stat.st_mode & o777 == expected_mode + assert stat.st_size == 1024 mtime = stat.st_mtime - 3600 atime = stat.st_atime - 1800 f.utime((atime, mtime)) stat = f.stat() - self.assertEqual(stat.st_mtime, mtime) + assert stat.st_mtime == mtime if sys.platform not in ('win32', 'cygwin'): - self.assertEqual(stat.st_atime, atime) + assert stat.st_atime == atime # can't really test chown, since we'd have to know a valid uid. f.truncate(512) stat = f.stat() - self.assertEqual(stat.st_size, 512) + assert stat.st_size == 512 finally: - sftp.remove(FOLDER + '/special') + sftp.remove(sftp.FOLDER + '/special') - def test_A_readline_seek(self): + def test_A_readline_seek(self, sftp): """ create a text file and write a bunch of text into it. then count the lines in the file, and seek around to retrieve particular lines. this should @@ -452,10 +353,10 @@ class SFTPTest (unittest.TestCase): buffering is reset on 'seek'. """ try: - with sftp.open(FOLDER + '/duck.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f: f.write(ARTICLE) - with sftp.open(FOLDER + '/duck.txt', 'r+') as f: + with sftp.open(sftp.FOLDER + '/duck.txt', 'r+') as f: line_number = 0 loc = 0 pos_list = [] @@ -463,35 +364,35 @@ class SFTPTest (unittest.TestCase): line_number += 1 pos_list.append(loc) loc = f.tell() - self.assertTrue(f.seekable()) + assert f.seekable() f.seek(pos_list[6], f.SEEK_SET) - self.assertEqual(f.readline(), 'Nouzilly, France.\n') + assert f.readline(), 'Nouzilly == France.\n' f.seek(pos_list[17], f.SEEK_SET) - self.assertEqual(f.readline()[:4], 'duck') + assert f.readline()[:4] == 'duck' f.seek(pos_list[10], f.SEEK_SET) - self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n') + assert f.readline() == 'duck types were equally resistant to exogenous insulin compared with chicken.\n' finally: - sftp.remove(FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/duck.txt') - def test_B_write_seek(self): + def test_B_write_seek(self, sftp): """ create a text file, seek back and change part of it, and verify that the changes worked. """ try: - with sftp.open(FOLDER + '/testing.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/testing.txt', 'w') as f: f.write('hello kitty.\n') f.seek(-5, f.SEEK_CUR) f.write('dd') - self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13) - with sftp.open(FOLDER + '/testing.txt', 'r') as f: + assert sftp.stat(sftp.FOLDER + '/testing.txt').st_size == 13 + with sftp.open(sftp.FOLDER + '/testing.txt', 'r') as f: data = f.read(20) - self.assertEqual(data, b'hello kiddy.\n') + assert data == b'hello kiddy.\n' finally: - sftp.remove(FOLDER + '/testing.txt') + sftp.remove(sftp.FOLDER + '/testing.txt') - def test_C_symlink(self): + def test_C_symlink(self, sftp): """ create a symlink and then check that lstat doesn't follow it. """ @@ -500,97 +401,85 @@ class SFTPTest (unittest.TestCase): return try: - with sftp.open(FOLDER + '/original.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/original.txt', 'w') as f: f.write('original\n') - sftp.symlink('original.txt', FOLDER + '/link.txt') - self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt') + sftp.symlink('original.txt', sftp.FOLDER + '/link.txt') + assert sftp.readlink(sftp.FOLDER + '/link.txt') == 'original.txt' - with sftp.open(FOLDER + '/link.txt', 'r') as f: - self.assertEqual(f.readlines(), ['original\n']) + with sftp.open(sftp.FOLDER + '/link.txt', 'r') as f: + assert f.readlines() == ['original\n'] cwd = sftp.normalize('.') if cwd[-1] == '/': cwd = cwd[:-1] - abs_path = cwd + '/' + FOLDER + '/original.txt' - sftp.symlink(abs_path, FOLDER + '/link2.txt') - self.assertEqual(abs_path, sftp.readlink(FOLDER + '/link2.txt')) + abs_path = cwd + '/' + sftp.FOLDER + '/original.txt' + sftp.symlink(abs_path, sftp.FOLDER + '/link2.txt') + assert abs_path == sftp.readlink(sftp.FOLDER + '/link2.txt') - self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12) - self.assertEqual(sftp.stat(FOLDER + '/link.txt').st_size, 9) + assert sftp.lstat(sftp.FOLDER + '/link.txt').st_size == 12 + assert sftp.stat(sftp.FOLDER + '/link.txt').st_size == 9 # the sftp server may be hiding extra path members from us, so the # length may be longer than we expect: - self.assertTrue(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path)) - self.assertEqual(sftp.stat(FOLDER + '/link2.txt').st_size, 9) - self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9) + assert sftp.lstat(sftp.FOLDER + '/link2.txt').st_size >= len(abs_path) + assert sftp.stat(sftp.FOLDER + '/link2.txt').st_size == 9 + assert sftp.stat(sftp.FOLDER + '/original.txt').st_size == 9 finally: try: - sftp.remove(FOLDER + '/link.txt') + sftp.remove(sftp.FOLDER + '/link.txt') except: pass try: - sftp.remove(FOLDER + '/link2.txt') + sftp.remove(sftp.FOLDER + '/link2.txt') except: pass try: - sftp.remove(FOLDER + '/original.txt') + sftp.remove(sftp.FOLDER + '/original.txt') except: pass - def test_D_flush_seek(self): + def test_D_flush_seek(self, sftp): """ verify that buffered writes are automatically flushed on seek. """ try: - with sftp.open(FOLDER + '/happy.txt', 'w', 1) as f: + with sftp.open(sftp.FOLDER + '/happy.txt', 'w', 1) as f: f.write('full line.\n') f.write('partial') f.seek(9, f.SEEK_SET) f.write('?\n') - with sftp.open(FOLDER + '/happy.txt', 'r') as f: - self.assertEqual(f.readline(), u('full line?\n')) - self.assertEqual(f.read(7), b'partial') + with sftp.open(sftp.FOLDER + '/happy.txt', 'r') as f: + assert f.readline() == u('full line?\n') + assert f.read(7) == b'partial' finally: try: - sftp.remove(FOLDER + '/happy.txt') + sftp.remove(sftp.FOLDER + '/happy.txt') except: pass - def test_E_realpath(self): + def test_E_realpath(self, sftp): """ test that realpath is returning something non-empty and not an error. """ pwd = sftp.normalize('.') - self.assertTrue(len(pwd) > 0) - f = sftp.normalize('./' + FOLDER) - self.assertTrue(len(f) > 0) - self.assertEqual(os.path.join(pwd, FOLDER), f) + assert len(pwd) > 0 + f = sftp.normalize('./' + sftp.FOLDER) + assert len(f) > 0 + assert os.path.join(pwd, sftp.FOLDER) == f - def test_F_mkdir(self): + def test_F_mkdir(self, sftp): """ verify that mkdir/rmdir work. """ - try: - sftp.mkdir(FOLDER + '/subfolder') - except: - self.assertTrue(False, 'exception creating subfolder') - try: - sftp.mkdir(FOLDER + '/subfolder') - self.assertTrue(False, 'no exception overwriting subfolder') - except IOError: - pass - try: - sftp.rmdir(FOLDER + '/subfolder') - except: - self.assertTrue(False, 'exception removing subfolder') - try: - sftp.rmdir(FOLDER + '/subfolder') - self.assertTrue(False, 'no exception removing nonexistent subfolder') - except IOError: - pass + sftp.mkdir(sftp.FOLDER + '/subfolder') + with pytest.raises(IOError): # generic msg only + sftp.mkdir(sftp.FOLDER + '/subfolder') + sftp.rmdir(sftp.FOLDER + '/subfolder') + with pytest.raises(IOError, match="No such file"): + sftp.rmdir(sftp.FOLDER + '/subfolder') - def test_G_chdir(self): + def test_G_chdir(self, sftp): """ verify that chdir/getcwd work. """ @@ -598,35 +487,35 @@ class SFTPTest (unittest.TestCase): if root[-1] != '/': root += '/' try: - sftp.mkdir(FOLDER + '/alpha') - sftp.chdir(FOLDER + '/alpha') + sftp.mkdir(sftp.FOLDER + '/alpha') + sftp.chdir(sftp.FOLDER + '/alpha') sftp.mkdir('beta') - self.assertEqual(root + FOLDER + '/alpha', sftp.getcwd()) - self.assertEqual(['beta'], sftp.listdir('.')) + assert root + sftp.FOLDER + '/alpha' == sftp.getcwd() + assert ['beta'] == sftp.listdir('.') sftp.chdir('beta') with sftp.open('fish', 'w') as f: f.write('hello\n') sftp.chdir('..') - self.assertEqual(['fish'], sftp.listdir('beta')) + assert ['fish'] == sftp.listdir('beta') sftp.chdir('..') - self.assertEqual(['fish'], sftp.listdir('alpha/beta')) + assert ['fish'] == sftp.listdir('alpha/beta') finally: sftp.chdir(root) try: - sftp.unlink(FOLDER + '/alpha/beta/fish') + sftp.unlink(sftp.FOLDER + '/alpha/beta/fish') except: pass try: - sftp.rmdir(FOLDER + '/alpha/beta') + sftp.rmdir(sftp.FOLDER + '/alpha/beta') except: pass try: - sftp.rmdir(FOLDER + '/alpha') + sftp.rmdir(sftp.FOLDER + '/alpha') except: pass - def test_H_get_put(self): + def test_H_get_put(self, sftp): """ verify that get/put work. """ @@ -641,103 +530,102 @@ class SFTPTest (unittest.TestCase): def progress_callback(x, y): saved_progress.append((x, y)) - sftp.put(localname, FOLDER + '/bunny.txt', progress_callback) + sftp.put(localname, sftp.FOLDER + '/bunny.txt', progress_callback) - with sftp.open(FOLDER + '/bunny.txt', 'rb') as f: - self.assertEqual(text, f.read(128)) - self.assertEqual([(41, 41)], saved_progress) + with sftp.open(sftp.FOLDER + '/bunny.txt', 'rb') as f: + assert text == f.read(128) + assert [(41, 41)] == saved_progress os.unlink(localname) fd, localname = mkstemp() os.close(fd) saved_progress = [] - sftp.get(FOLDER + '/bunny.txt', localname, progress_callback) + sftp.get(sftp.FOLDER + '/bunny.txt', localname, progress_callback) with open(localname, 'rb') as f: - self.assertEqual(text, f.read(128)) - self.assertEqual([(41, 41)], saved_progress) + assert text == f.read(128) + assert [(41, 41)] == saved_progress os.unlink(localname) - sftp.unlink(FOLDER + '/bunny.txt') + sftp.unlink(sftp.FOLDER + '/bunny.txt') - def test_I_check(self): + def test_I_check(self, sftp): """ verify that file.check() works against our own server. (it's an sftp extension that we support, and may be the only ones who support it.) """ - with sftp.open(FOLDER + '/kitty.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/kitty.txt', 'w') as f: f.write('here kitty kitty' * 64) try: - with sftp.open(FOLDER + '/kitty.txt', 'r') as f: + with sftp.open(sftp.FOLDER + '/kitty.txt', 'r') as f: sum = f.check('sha1') - self.assertEqual('91059CFC6615941378D413CB5ADAF4C5EB293402', u(hexlify(sum)).upper()) + assert '91059CFC6615941378D413CB5ADAF4C5EB293402' == u(hexlify(sum)).upper() sum = f.check('md5', 0, 512) - self.assertEqual('93DE4788FCA28D471516963A1FE3856A', u(hexlify(sum)).upper()) + assert '93DE4788FCA28D471516963A1FE3856A' == u(hexlify(sum)).upper() sum = f.check('md5', 0, 0, 510) - self.assertEqual('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6', - u(hexlify(sum)).upper()) + assert u(hexlify(sum)).upper() == 'EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6' # noqa finally: - sftp.unlink(FOLDER + '/kitty.txt') + sftp.unlink(sftp.FOLDER + '/kitty.txt') - def test_J_x_flag(self): + def test_J_x_flag(self, sftp): """ verify that the 'x' flag works when opening a file. """ - sftp.open(FOLDER + '/unusual.txt', 'wx').close() + sftp.open(sftp.FOLDER + '/unusual.txt', 'wx').close() try: try: - sftp.open(FOLDER + '/unusual.txt', 'wx') + sftp.open(sftp.FOLDER + '/unusual.txt', 'wx') self.fail('expected exception') except IOError: pass finally: - sftp.unlink(FOLDER + '/unusual.txt') + sftp.unlink(sftp.FOLDER + '/unusual.txt') - def test_K_utf8(self): + def test_K_utf8(self, sftp): """ verify that unicode strings are encoded into utf8 correctly. """ - with sftp.open(FOLDER + '/something', 'w') as f: + with sftp.open(sftp.FOLDER + '/something', 'w') as f: f.write('okay') try: - sftp.rename(FOLDER + '/something', FOLDER + '/' + unicode_folder) - sftp.open(b(FOLDER) + utf8_folder, 'r') + sftp.rename(sftp.FOLDER + '/something', sftp.FOLDER + '/' + unicode_folder) + sftp.open(b(sftp.FOLDER) + utf8_folder, 'r') except Exception as e: self.fail('exception ' + str(e)) - sftp.unlink(b(FOLDER) + utf8_folder) + sftp.unlink(b(sftp.FOLDER) + utf8_folder) - def test_L_utf8_chdir(self): - sftp.mkdir(FOLDER + '/' + unicode_folder) + def test_L_utf8_chdir(self, sftp): + sftp.mkdir(sftp.FOLDER + '/' + unicode_folder) try: - sftp.chdir(FOLDER + '/' + unicode_folder) + sftp.chdir(sftp.FOLDER + '/' + unicode_folder) with sftp.open('something', 'w') as f: f.write('okay') sftp.unlink('something') finally: sftp.chdir() - sftp.rmdir(FOLDER + '/' + unicode_folder) + sftp.rmdir(sftp.FOLDER + '/' + unicode_folder) - def test_M_bad_readv(self): + def test_M_bad_readv(self, sftp): """ verify that readv at the end of the file doesn't essplode. """ - sftp.open(FOLDER + '/zero', 'w').close() + sftp.open(sftp.FOLDER + '/zero', 'w').close() try: - with sftp.open(FOLDER + '/zero', 'r') as f: + with sftp.open(sftp.FOLDER + '/zero', 'r') as f: f.readv([(0, 12)]) - with sftp.open(FOLDER + '/zero', 'r') as f: + with sftp.open(sftp.FOLDER + '/zero', 'r') as f: file_size = f.stat().st_size f.prefetch(file_size) f.read(100) finally: - sftp.unlink(FOLDER + '/zero') + sftp.unlink(sftp.FOLDER + '/zero') - def test_N_put_without_confirm(self): + def test_N_put_without_confirm(self, sftp): """ verify that get/put work without confirmation. """ @@ -752,138 +640,132 @@ class SFTPTest (unittest.TestCase): def progress_callback(x, y): saved_progress.append((x, y)) - res = sftp.put(localname, FOLDER + '/bunny.txt', progress_callback, False) + res = sftp.put(localname, sftp.FOLDER + '/bunny.txt', progress_callback, False) - self.assertEqual(SFTPAttributes().attr, res.attr) + assert SFTPAttributes().attr == res.attr - with sftp.open(FOLDER + '/bunny.txt', 'r') as f: - self.assertEqual(text, f.read(128)) - self.assertEqual((41, 41), saved_progress[-1]) + with sftp.open(sftp.FOLDER + '/bunny.txt', 'r') as f: + assert text == f.read(128) + assert (41, 41) == saved_progress[-1] os.unlink(localname) - sftp.unlink(FOLDER + '/bunny.txt') + sftp.unlink(sftp.FOLDER + '/bunny.txt') - def test_O_getcwd(self): + def test_O_getcwd(self, sftp): """ verify that chdir/getcwd work. """ - self.assertEqual(None, sftp.getcwd()) + assert sftp.getcwd() == None root = sftp.normalize('.') if root[-1] != '/': root += '/' try: - sftp.mkdir(FOLDER + '/alpha') - sftp.chdir(FOLDER + '/alpha') - self.assertEqual('/' + FOLDER + '/alpha', sftp.getcwd()) + sftp.mkdir(sftp.FOLDER + '/alpha') + sftp.chdir(sftp.FOLDER + '/alpha') + assert sftp.getcwd() == '/' + sftp.FOLDER + '/alpha' finally: sftp.chdir(root) try: - sftp.rmdir(FOLDER + '/alpha') + sftp.rmdir(sftp.FOLDER + '/alpha') except: pass - def XXX_test_M_seek_append(self): + def XXX_test_M_seek_append(self, sftp): """ verify that seek does't affect writes during append. does not work except through paramiko. :( openssh fails. """ try: - with sftp.open(FOLDER + '/append.txt', 'a') as f: + with sftp.open(sftp.FOLDER + '/append.txt', 'a') as f: f.write('first line\nsecond line\n') f.seek(11, f.SEEK_SET) f.write('third line\n') - with sftp.open(FOLDER + '/append.txt', 'r') as f: - self.assertEqual(f.stat().st_size, 34) - self.assertEqual(f.readline(), 'first line\n') - self.assertEqual(f.readline(), 'second line\n') - self.assertEqual(f.readline(), 'third line\n') + with sftp.open(sftp.FOLDER + '/append.txt', 'r') as f: + assert f.stat().st_size == 34 + assert f.readline() == 'first line\n' + assert f.readline() == 'second line\n' + assert f.readline() == 'third line\n' finally: - sftp.remove(FOLDER + '/append.txt') + sftp.remove(sftp.FOLDER + '/append.txt') - def test_putfo_empty_file(self): + def test_putfo_empty_file(self, sftp): """ Send an empty file and confirm it is sent. """ - target = FOLDER + '/empty file.txt' + target = sftp.FOLDER + '/empty file.txt' stream = StringIO() try: attrs = sftp.putfo(stream, target) # the returned attributes should not be null - self.assertNotEqual(attrs, None) + assert attrs is not None finally: sftp.remove(target) - - def test_N_file_with_percent(self): + # TODO: this test doesn't actually fail if the regression (removing '%' + # expansion to '%%' within sftp.py's def _log()) is removed - stacktraces + # appear but they're clearly emitted from subthreads that have no error + # handling. No point running it until that is fixed somehow. + @pytest.mark.skip("Doesn't prove anything right now") + def test_N_file_with_percent(self, sftp): """ verify that we can create a file with a '%' in the filename. ( it needs to be properly escaped by _log() ) """ - self.assertTrue( paramiko.util.get_logger("paramiko").handlers, "This unit test requires logging to be enabled" ) - f = sftp.open(FOLDER + '/test%file', 'w') + f = sftp.open(sftp.FOLDER + '/test%file', 'w') try: - self.assertEqual(f.stat().st_size, 0) + assert f.stat().st_size == 0 finally: f.close() - sftp.remove(FOLDER + '/test%file') - + sftp.remove(sftp.FOLDER + '/test%file') - def test_O_non_utf8_data(self): + def test_O_non_utf8_data(self, sftp): """Test write() and read() of non utf8 data""" try: - with sftp.open('%s/nonutf8data' % FOLDER, 'w') as f: + with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'w') as f: f.write(NON_UTF8_DATA) - with sftp.open('%s/nonutf8data' % FOLDER, 'r') as f: + with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'r') as f: data = f.read() - self.assertEqual(data, NON_UTF8_DATA) - with sftp.open('%s/nonutf8data' % FOLDER, 'wb') as f: + assert data == NON_UTF8_DATA + with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'wb') as f: f.write(NON_UTF8_DATA) - with sftp.open('%s/nonutf8data' % FOLDER, 'rb') as f: + with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'rb') as f: data = f.read() - self.assertEqual(data, NON_UTF8_DATA) + assert data == NON_UTF8_DATA finally: - sftp.remove('%s/nonutf8data' % FOLDER) + sftp.remove('%s/nonutf8data' % sftp.FOLDER) - def test_sftp_attributes_empty_str(self): + def test_sftp_attributes_empty_str(self, sftp): sftp_attributes = SFTPAttributes() - self.assertEqual(str(sftp_attributes), "?--------- 1 0 0 0 (unknown date) ?") + assert str(sftp_attributes) == "?--------- 1 0 0 0 (unknown date) ?" - @skipUnlessBuiltin('buffer') - def test_write_buffer(self): + @needs_builtin('buffer') + def test_write_buffer(self, sftp): """Test write() using a buffer instance.""" data = 3 * b'A potentially large block of data to chunk up.\n' try: - with sftp.open('%s/write_buffer' % FOLDER, 'wb') as f: + with sftp.open('%s/write_buffer' % sftp.FOLDER, 'wb') as f: for offset in range(0, len(data), 8): f.write(buffer(data, offset, 8)) - with sftp.open('%s/write_buffer' % FOLDER, 'rb') as f: - self.assertEqual(f.read(), data) + with sftp.open('%s/write_buffer' % sftp.FOLDER, 'rb') as f: + assert f.read() == data finally: - sftp.remove('%s/write_buffer' % FOLDER) + sftp.remove('%s/write_buffer' % sftp.FOLDER) - @skipUnlessBuiltin('memoryview') - def test_write_memoryview(self): + @needs_builtin('memoryview') + def test_write_memoryview(self, sftp): """Test write() using a memoryview instance.""" data = 3 * b'A potentially large block of data to chunk up.\n' try: - with sftp.open('%s/write_memoryview' % FOLDER, 'wb') as f: + with sftp.open('%s/write_memoryview' % sftp.FOLDER, 'wb') as f: view = memoryview(data) for offset in range(0, len(data), 8): f.write(view[offset:offset+8]) - with sftp.open('%s/write_memoryview' % FOLDER, 'rb') as f: - self.assertEqual(f.read(), data) + with sftp.open('%s/write_memoryview' % sftp.FOLDER, 'rb') as f: + assert f.read() == data finally: - sftp.remove('%s/write_memoryview' % FOLDER) - - -if __name__ == '__main__': - SFTPTest.init_loopback() - # logging is required by test_N_file_with_percent - paramiko.util.log_to_file('test_sftp.log') - from unittest import main - main() + sftp.remove('%s/write_memoryview' % sftp.FOLDER) |