diff options
Diffstat (limited to 'tests/test_sftp.py')
-rw-r--r--[-rwxr-xr-x] | tests/test_sftp.py | 813 |
1 files changed, 361 insertions, 452 deletions
diff --git a/tests/test_sftp.py b/tests/test_sftp.py index b3c7bf98..87c57340 100755..100644 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -32,17 +32,20 @@ import warnings from binascii import hexlify from tempfile import mkstemp +import pytest + import paramiko +import paramiko.util from paramiko.py3compat import PY2, b, u, StringIO from paramiko.common import o777, o600, o666, o644 -from tests import skipUnlessBuiltin -from tests.stub_sftp import StubServer, StubSFTPServer -from tests.loop import LoopSocket -from tests.util import test_path -import paramiko.util from paramiko.sftp_attr import SFTPAttributes -ARTICLE = ''' +from .util import needs_builtin +from .stub_sftp import StubServer, StubSFTPServer +from .util import _support, slow + + +ARTICLE = """ Insulin sensitivity and liver insulin receptor structure in ducks from two genera @@ -67,7 +70,7 @@ receptors. Therefore the ducks from the two genera exhibit an alpha-beta- structure for liver insulin receptors and a clear difference in the number of liver insulin receptors. Their sensitivity to insulin is, however, similarly decreased compared with chicken. -''' +""" # Here is how unicode characters are encoded over 1 to 6 bytes in utf-8 @@ -79,372 +82,275 @@ decreased compared with chicken. # U-04000000 - U-7FFFFFFF: 1111110x 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx # Note that: hex(int('11000011',2)) == '0xc3' # Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation byte" -NON_UTF8_DATA = b'\xC3\xC3' - -FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000') - -sftp = None -tc = None -g_big_file_test = True -# we need to use eval(compile()) here because Py3.2 doesn't support the 'u' marker for unicode -# this test is the only line in the entire program that has to be treated specially to support Py3.2 -unicode_folder = eval(compile(r"u'\u00fcnic\u00f8de'" if PY2 else r"'\u00fcnic\u00f8de'", 'test_sftp.py', 'eval')) -utf8_folder = b'/\xc3\xbcnic\xc3\xb8\x64\x65' - - -def get_sftp(): - global sftp - return sftp - - -class SFTPTest (unittest.TestCase): - @staticmethod - def init(hostname, username, keyfile, passwd): - global sftp, tc +NON_UTF8_DATA = b"\xC3\xC3" - t = paramiko.Transport(hostname) - tc = t - try: - key = paramiko.RSAKey.from_private_key_file(keyfile, passwd) - except paramiko.PasswordRequiredException: - sys.stderr.write('\n\nparamiko.RSAKey.from_private_key_file REQUIRES PASSWORD.\n') - sys.stderr.write('You have two options:\n') - sys.stderr.write('* Use the "-K" option to point to a different (non-password-protected)\n') - sys.stderr.write(' private key file.\n') - sys.stderr.write('* Use the "-P" option to provide the password needed to unlock this private\n') - sys.stderr.write(' key.\n') - sys.stderr.write('\n') - sys.exit(1) - try: - t.connect(username=username, pkey=key) - except paramiko.SSHException: - t.close() - sys.stderr.write('\n\nparamiko.Transport.connect FAILED.\n') - sys.stderr.write('There are several possible reasons why it might fail so quickly:\n\n') - sys.stderr.write('* The host to connect to (%s) is not a valid SSH server.\n' % hostname) - sys.stderr.write(' (Use the "-H" option to change the host.)\n') - sys.stderr.write('* The username to auth as (%s) is invalid.\n' % username) - sys.stderr.write(' (Use the "-U" option to change the username.)\n') - sys.stderr.write('* The private key given (%s) is not accepted by the server.\n' % keyfile) - sys.stderr.write(' (Use the "-K" option to provide a different key file.)\n') - sys.stderr.write('\n') - sys.exit(1) - sftp = paramiko.SFTP.from_transport(t) - - @staticmethod - def init_loopback(): - global sftp, tc - - socks = LoopSocket() - sockc = LoopSocket() - sockc.link(socks) - tc = paramiko.Transport(sockc) - ts = paramiko.Transport(socks) - - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) - ts.add_server_key(host_key) - event = threading.Event() - server = StubServer() - ts.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer) - ts.start_server(event, server) - tc.connect(username='slowdive', password='pygmalion') - event.wait(1.0) - - sftp = paramiko.SFTP.from_transport(tc) - - @staticmethod - def set_big_file_test(onoff): - global g_big_file_test - g_big_file_test = onoff - - def setUp(self): - global FOLDER - for i in range(1000): - FOLDER = FOLDER[:-3] + '%03d' % i - try: - sftp.mkdir(FOLDER) - break - except (IOError, OSError): - pass +unicode_folder = u"\u00fcnic\u00f8de" if PY2 else "\u00fcnic\u00f8de" +utf8_folder = b"/\xc3\xbcnic\xc3\xb8\x64\x65" - def tearDown(self): - #sftp.chdir() - sftp.rmdir(FOLDER) - def test_1_file(self): +@slow +class TestSFTP(object): + def test_1_file(self, sftp): """ verify that we can create a file. """ - f = sftp.open(FOLDER + '/test', 'w') + f = sftp.open(sftp.FOLDER + "/test", "w") try: - self.assertEqual(f.stat().st_size, 0) + assert f.stat().st_size == 0 finally: f.close() - sftp.remove(FOLDER + '/test') + sftp.remove(sftp.FOLDER + "/test") - def test_2_close(self): + def test_2_close(self, sftp): """ - verify that closing the sftp session doesn't do anything bad, and that - a new one can be opened. + Verify that SFTP session close() causes a socket error on next action. """ - global sftp sftp.close() - try: - sftp.open(FOLDER + '/test2', 'w') - self.fail('expected exception') - except: - pass - sftp = paramiko.SFTP.from_transport(tc) + with pytest.raises(socket.error, match="Socket is closed"): + sftp.open(sftp.FOLDER + "/test2", "w") - def test_2_sftp_can_be_used_as_context_manager(self): + def test_2_sftp_can_be_used_as_context_manager(self, sftp): """ verify that the sftp session is closed when exiting the context manager """ - global sftp with sftp: pass - try: - sftp.open(FOLDER + '/test2', 'w') - self.fail('expected exception') - except (EOFError, socket.error): - pass - finally: - sftp = paramiko.SFTP.from_transport(tc) + with pytest.raises(socket.error, match="Socket is closed"): + sftp.open(sftp.FOLDER + "/test2", "w") - def test_3_write(self): + def test_3_write(self, sftp): """ verify that a file can be created and written, and the size is correct. """ try: - with sftp.open(FOLDER + '/duck.txt', 'w') as f: + with sftp.open(sftp.FOLDER + "/duck.txt", "w") as f: f.write(ARTICLE) - self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) + assert sftp.stat(sftp.FOLDER + "/duck.txt").st_size == 1483 finally: - sftp.remove(FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + "/duck.txt") - def test_3_sftp_file_can_be_used_as_context_manager(self): + def test_3_sftp_file_can_be_used_as_context_manager(self, sftp): """ verify that an opened file can be used as a context manager """ try: - with sftp.open(FOLDER + '/duck.txt', 'w') as f: + with sftp.open(sftp.FOLDER + "/duck.txt", "w") as f: f.write(ARTICLE) - self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) + assert sftp.stat(sftp.FOLDER + "/duck.txt").st_size == 1483 finally: - sftp.remove(FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + "/duck.txt") - def test_4_append(self): + def test_4_append(self, sftp): """ verify that a file can be opened for append, and tell() still works. """ try: - with sftp.open(FOLDER + '/append.txt', 'w') as f: - f.write('first line\nsecond line\n') - self.assertEqual(f.tell(), 23) - - with sftp.open(FOLDER + '/append.txt', 'a+') as f: - f.write('third line!!!\n') - self.assertEqual(f.tell(), 37) - self.assertEqual(f.stat().st_size, 37) + with sftp.open(sftp.FOLDER + "/append.txt", "w") as f: + f.write("first line\nsecond line\n") + assert f.tell() == 23 + + with sftp.open(sftp.FOLDER + "/append.txt", "a+") as f: + f.write("third line!!!\n") + assert f.tell() == 37 + assert f.stat().st_size == 37 f.seek(-26, f.SEEK_CUR) - self.assertEqual(f.readline(), 'second line\n') + assert f.readline() == "second line\n" finally: - sftp.remove(FOLDER + '/append.txt') + sftp.remove(sftp.FOLDER + "/append.txt") - def test_5_rename(self): + def test_5_rename(self, sftp): """ verify that renaming a file works. """ try: - with sftp.open(FOLDER + '/first.txt', 'w') as f: - f.write('content!\n') - sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt') - try: - sftp.open(FOLDER + '/first.txt', 'r') - self.assertTrue(False, 'no exception on reading nonexistent file') - except IOError: - pass - with sftp.open(FOLDER + '/second.txt', 'r') as f: + with sftp.open(sftp.FOLDER + "/first.txt", "w") as f: + f.write("content!\n") + sftp.rename( + sftp.FOLDER + "/first.txt", sftp.FOLDER + "/second.txt" + ) + with pytest.raises(IOError, match="No such file"): + sftp.open(sftp.FOLDER + "/first.txt", "r") + with sftp.open(sftp.FOLDER + "/second.txt", "r") as f: f.seek(-6, f.SEEK_END) - self.assertEqual(u(f.read(4)), 'tent') + assert u(f.read(4)) == "tent" finally: + # TODO: this is gross, make some sort of 'remove if possible' / 'rm + # -f' a-like, jeez try: - sftp.remove(FOLDER + '/first.txt') + sftp.remove(sftp.FOLDER + "/first.txt") except: pass try: - sftp.remove(FOLDER + '/second.txt') + sftp.remove(sftp.FOLDER + "/second.txt") except: pass - - def test_5a_posix_rename(self): + def test_5a_posix_rename(self, sftp): """Test posix-rename@openssh.com protocol extension.""" try: # first check that the normal rename works as specified - with sftp.open(FOLDER + '/a', 'w') as f: - f.write('one') - sftp.rename(FOLDER + '/a', FOLDER + '/b') - with sftp.open(FOLDER + '/a', 'w') as f: - f.write('two') + with sftp.open(sftp.FOLDER + "/a", "w") as f: + f.write("one") + sftp.rename(sftp.FOLDER + "/a", sftp.FOLDER + "/b") + with sftp.open(sftp.FOLDER + "/a", "w") as f: + f.write("two") try: - sftp.rename(FOLDER + '/a', FOLDER + '/b') - self.assertTrue(False, 'no exception when rename-ing onto existing file') + sftp.rename(sftp.FOLDER + "/a", sftp.FOLDER + "/b") + self.assertTrue( + False, "no exception when rename-ing onto existing file" + ) except (OSError, IOError): pass # now check with the posix_rename - sftp.posix_rename(FOLDER + '/a', FOLDER + '/b') - with sftp.open(FOLDER + '/b', 'r') as f: + sftp.posix_rename(sftp.FOLDER + "/a", sftp.FOLDER + "/b") + with sftp.open(sftp.FOLDER + "/b", "r") as f: data = u(f.read()) - self.assertEqual('two', data, "Contents of renamed file not the same as original file") + err = "Contents of renamed file not the same as original file" + assert data == "two", err finally: try: - sftp.remove(FOLDER + '/a') + sftp.remove(sftp.FOLDER + "/a") except: pass try: - sftp.remove(FOLDER + '/b') + sftp.remove(sftp.FOLDER + "/b") except: pass - - def test_6_folder(self): + def test_6_folder(self, sftp): """ create a temporary folder, verify that we can create a file in it, then remove the folder and verify that we can't create a file in it anymore. """ - sftp.mkdir(FOLDER + '/subfolder') - sftp.open(FOLDER + '/subfolder/test', 'w').close() - sftp.remove(FOLDER + '/subfolder/test') - sftp.rmdir(FOLDER + '/subfolder') - try: - sftp.open(FOLDER + '/subfolder/test') - # shouldn't be able to create that file - self.assertTrue(False, 'no exception at dummy file creation') - except IOError: - pass + sftp.mkdir(sftp.FOLDER + "/subfolder") + sftp.open(sftp.FOLDER + "/subfolder/test", "w").close() + sftp.remove(sftp.FOLDER + "/subfolder/test") + sftp.rmdir(sftp.FOLDER + "/subfolder") + # shouldn't be able to create that file if dir removed + with pytest.raises(IOError, match="No such file"): + sftp.open(sftp.FOLDER + "/subfolder/test") - def test_7_listdir(self): + def test_7_listdir(self, sftp): """ verify that a folder can be created, a bunch of files can be placed in it, and those files show up in sftp.listdir. """ try: - sftp.open(FOLDER + '/duck.txt', 'w').close() - sftp.open(FOLDER + '/fish.txt', 'w').close() - sftp.open(FOLDER + '/tertiary.py', 'w').close() - - x = sftp.listdir(FOLDER) - self.assertEqual(len(x), 3) - self.assertTrue('duck.txt' in x) - self.assertTrue('fish.txt' in x) - self.assertTrue('tertiary.py' in x) - self.assertTrue('random' not in x) + sftp.open(sftp.FOLDER + "/duck.txt", "w").close() + sftp.open(sftp.FOLDER + "/fish.txt", "w").close() + sftp.open(sftp.FOLDER + "/tertiary.py", "w").close() + + x = sftp.listdir(sftp.FOLDER) + assert len(x) == 3 + assert "duck.txt" in x + assert "fish.txt" in x + assert "tertiary.py" in x + assert "random" not in x finally: - sftp.remove(FOLDER + '/duck.txt') - sftp.remove(FOLDER + '/fish.txt') - sftp.remove(FOLDER + '/tertiary.py') + sftp.remove(sftp.FOLDER + "/duck.txt") + sftp.remove(sftp.FOLDER + "/fish.txt") + sftp.remove(sftp.FOLDER + "/tertiary.py") - def test_7_5_listdir_iter(self): + def test_7_5_listdir_iter(self, sftp): """ listdir_iter version of above test """ try: - sftp.open(FOLDER + '/duck.txt', 'w').close() - sftp.open(FOLDER + '/fish.txt', 'w').close() - sftp.open(FOLDER + '/tertiary.py', 'w').close() - - x = [x.filename for x in sftp.listdir_iter(FOLDER)] - self.assertEqual(len(x), 3) - self.assertTrue('duck.txt' in x) - self.assertTrue('fish.txt' in x) - self.assertTrue('tertiary.py' in x) - self.assertTrue('random' not in x) + sftp.open(sftp.FOLDER + "/duck.txt", "w").close() + sftp.open(sftp.FOLDER + "/fish.txt", "w").close() + sftp.open(sftp.FOLDER + "/tertiary.py", "w").close() + + x = [x.filename for x in sftp.listdir_iter(sftp.FOLDER)] + assert len(x) == 3 + assert "duck.txt" in x + assert "fish.txt" in x + assert "tertiary.py" in x + assert "random" not in x finally: - sftp.remove(FOLDER + '/duck.txt') - sftp.remove(FOLDER + '/fish.txt') - sftp.remove(FOLDER + '/tertiary.py') + sftp.remove(sftp.FOLDER + "/duck.txt") + sftp.remove(sftp.FOLDER + "/fish.txt") + sftp.remove(sftp.FOLDER + "/tertiary.py") - def test_8_setstat(self): + def test_8_setstat(self, sftp): """ verify that the setstat functions (chown, chmod, utime, truncate) work. """ try: - with sftp.open(FOLDER + '/special', 'w') as f: - f.write('x' * 1024) + with sftp.open(sftp.FOLDER + "/special", "w") as f: + f.write("x" * 1024) - stat = sftp.stat(FOLDER + '/special') - sftp.chmod(FOLDER + '/special', (stat.st_mode & ~o777) | o600) - stat = sftp.stat(FOLDER + '/special') + stat = sftp.stat(sftp.FOLDER + "/special") + sftp.chmod(sftp.FOLDER + "/special", (stat.st_mode & ~o777) | o600) + stat = sftp.stat(sftp.FOLDER + "/special") expected_mode = o600 - if sys.platform == 'win32': + if sys.platform == "win32": # chmod not really functional on windows expected_mode = o666 - if sys.platform == 'cygwin': + if sys.platform == "cygwin": # even worse. expected_mode = o644 - self.assertEqual(stat.st_mode & o777, expected_mode) - self.assertEqual(stat.st_size, 1024) + assert stat.st_mode & o777 == expected_mode + assert stat.st_size == 1024 mtime = stat.st_mtime - 3600 atime = stat.st_atime - 1800 - sftp.utime(FOLDER + '/special', (atime, mtime)) - stat = sftp.stat(FOLDER + '/special') - self.assertEqual(stat.st_mtime, mtime) - if sys.platform not in ('win32', 'cygwin'): - self.assertEqual(stat.st_atime, atime) + sftp.utime(sftp.FOLDER + "/special", (atime, mtime)) + stat = sftp.stat(sftp.FOLDER + "/special") + assert stat.st_mtime == mtime + if sys.platform not in ("win32", "cygwin"): + assert stat.st_atime == atime # can't really test chown, since we'd have to know a valid uid. - sftp.truncate(FOLDER + '/special', 512) - stat = sftp.stat(FOLDER + '/special') - self.assertEqual(stat.st_size, 512) + sftp.truncate(sftp.FOLDER + "/special", 512) + stat = sftp.stat(sftp.FOLDER + "/special") + assert stat.st_size == 512 finally: - sftp.remove(FOLDER + '/special') + sftp.remove(sftp.FOLDER + "/special") - def test_9_fsetstat(self): + def test_9_fsetstat(self, sftp): """ verify that the fsetstat functions (chown, chmod, utime, truncate) work on open files. """ try: - with sftp.open(FOLDER + '/special', 'w') as f: - f.write('x' * 1024) + with sftp.open(sftp.FOLDER + "/special", "w") as f: + f.write("x" * 1024) - with sftp.open(FOLDER + '/special', 'r+') as f: + with sftp.open(sftp.FOLDER + "/special", "r+") as f: stat = f.stat() f.chmod((stat.st_mode & ~o777) | o600) stat = f.stat() expected_mode = o600 - if sys.platform == 'win32': + if sys.platform == "win32": # chmod not really functional on windows expected_mode = o666 - if sys.platform == 'cygwin': + if sys.platform == "cygwin": # even worse. expected_mode = o644 - self.assertEqual(stat.st_mode & o777, expected_mode) - self.assertEqual(stat.st_size, 1024) + assert stat.st_mode & o777 == expected_mode + assert stat.st_size == 1024 mtime = stat.st_mtime - 3600 atime = stat.st_atime - 1800 f.utime((atime, mtime)) stat = f.stat() - self.assertEqual(stat.st_mtime, mtime) - if sys.platform not in ('win32', 'cygwin'): - self.assertEqual(stat.st_atime, atime) + assert stat.st_mtime == mtime + if sys.platform not in ("win32", "cygwin"): + assert stat.st_atime == atime # can't really test chown, since we'd have to know a valid uid. f.truncate(512) stat = f.stat() - self.assertEqual(stat.st_size, 512) + assert stat.st_size == 512 finally: - sftp.remove(FOLDER + '/special') + sftp.remove(sftp.FOLDER + "/special") - def test_A_readline_seek(self): + def test_A_readline_seek(self, sftp): """ create a text file and write a bunch of text into it. then count the lines in the file, and seek around to retrieve particular lines. this should @@ -452,10 +358,10 @@ class SFTPTest (unittest.TestCase): buffering is reset on 'seek'. """ try: - with sftp.open(FOLDER + '/duck.txt', 'w') as f: + with sftp.open(sftp.FOLDER + "/duck.txt", "w") as f: f.write(ARTICLE) - with sftp.open(FOLDER + '/duck.txt', 'r+') as f: + with sftp.open(sftp.FOLDER + "/duck.txt", "r+") as f: line_number = 0 loc = 0 pos_list = [] @@ -463,35 +369,38 @@ class SFTPTest (unittest.TestCase): line_number += 1 pos_list.append(loc) loc = f.tell() - self.assertTrue(f.seekable()) + assert f.seekable() f.seek(pos_list[6], f.SEEK_SET) - self.assertEqual(f.readline(), 'Nouzilly, France.\n') + assert f.readline(), "Nouzilly == France.\n" f.seek(pos_list[17], f.SEEK_SET) - self.assertEqual(f.readline()[:4], 'duck') + assert f.readline()[:4] == "duck" f.seek(pos_list[10], f.SEEK_SET) - self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n') + assert ( + f.readline() + == "duck types were equally resistant to exogenous insulin compared with chicken.\n" + ) finally: - sftp.remove(FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + "/duck.txt") - def test_B_write_seek(self): + def test_B_write_seek(self, sftp): """ create a text file, seek back and change part of it, and verify that the changes worked. """ try: - with sftp.open(FOLDER + '/testing.txt', 'w') as f: - f.write('hello kitty.\n') + with sftp.open(sftp.FOLDER + "/testing.txt", "w") as f: + f.write("hello kitty.\n") f.seek(-5, f.SEEK_CUR) - f.write('dd') + f.write("dd") - self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13) - with sftp.open(FOLDER + '/testing.txt', 'r') as f: + assert sftp.stat(sftp.FOLDER + "/testing.txt").st_size == 13 + with sftp.open(sftp.FOLDER + "/testing.txt", "r") as f: data = f.read(20) - self.assertEqual(data, b'hello kiddy.\n') + assert data == b"hello kiddy.\n" finally: - sftp.remove(FOLDER + '/testing.txt') + sftp.remove(sftp.FOLDER + "/testing.txt") - def test_C_symlink(self): + def test_C_symlink(self, sftp): """ create a symlink and then check that lstat doesn't follow it. """ @@ -500,390 +409,390 @@ class SFTPTest (unittest.TestCase): return try: - with sftp.open(FOLDER + '/original.txt', 'w') as f: - f.write('original\n') - sftp.symlink('original.txt', FOLDER + '/link.txt') - self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt') + with sftp.open(sftp.FOLDER + "/original.txt", "w") as f: + f.write("original\n") + sftp.symlink("original.txt", sftp.FOLDER + "/link.txt") + assert sftp.readlink(sftp.FOLDER + "/link.txt") == "original.txt" - with sftp.open(FOLDER + '/link.txt', 'r') as f: - self.assertEqual(f.readlines(), ['original\n']) + with sftp.open(sftp.FOLDER + "/link.txt", "r") as f: + assert f.readlines() == ["original\n"] - cwd = sftp.normalize('.') - if cwd[-1] == '/': + cwd = sftp.normalize(".") + if cwd[-1] == "/": cwd = cwd[:-1] - abs_path = cwd + '/' + FOLDER + '/original.txt' - sftp.symlink(abs_path, FOLDER + '/link2.txt') - self.assertEqual(abs_path, sftp.readlink(FOLDER + '/link2.txt')) + abs_path = cwd + "/" + sftp.FOLDER + "/original.txt" + sftp.symlink(abs_path, sftp.FOLDER + "/link2.txt") + assert abs_path == sftp.readlink(sftp.FOLDER + "/link2.txt") - self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12) - self.assertEqual(sftp.stat(FOLDER + '/link.txt').st_size, 9) + assert sftp.lstat(sftp.FOLDER + "/link.txt").st_size == 12 + assert sftp.stat(sftp.FOLDER + "/link.txt").st_size == 9 # the sftp server may be hiding extra path members from us, so the # length may be longer than we expect: - self.assertTrue(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path)) - self.assertEqual(sftp.stat(FOLDER + '/link2.txt').st_size, 9) - self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9) + assert sftp.lstat(sftp.FOLDER + "/link2.txt").st_size >= len( + abs_path + ) + assert sftp.stat(sftp.FOLDER + "/link2.txt").st_size == 9 + assert sftp.stat(sftp.FOLDER + "/original.txt").st_size == 9 finally: try: - sftp.remove(FOLDER + '/link.txt') + sftp.remove(sftp.FOLDER + "/link.txt") except: pass try: - sftp.remove(FOLDER + '/link2.txt') + sftp.remove(sftp.FOLDER + "/link2.txt") except: pass try: - sftp.remove(FOLDER + '/original.txt') + sftp.remove(sftp.FOLDER + "/original.txt") except: pass - def test_D_flush_seek(self): + def test_D_flush_seek(self, sftp): """ verify that buffered writes are automatically flushed on seek. """ try: - with sftp.open(FOLDER + '/happy.txt', 'w', 1) as f: - f.write('full line.\n') - f.write('partial') + with sftp.open(sftp.FOLDER + "/happy.txt", "w", 1) as f: + f.write("full line.\n") + f.write("partial") f.seek(9, f.SEEK_SET) - f.write('?\n') + f.write("?\n") - with sftp.open(FOLDER + '/happy.txt', 'r') as f: - self.assertEqual(f.readline(), u('full line?\n')) - self.assertEqual(f.read(7), b'partial') + with sftp.open(sftp.FOLDER + "/happy.txt", "r") as f: + assert f.readline() == u("full line?\n") + assert f.read(7) == b"partial" finally: try: - sftp.remove(FOLDER + '/happy.txt') + sftp.remove(sftp.FOLDER + "/happy.txt") except: pass - def test_E_realpath(self): + def test_E_realpath(self, sftp): """ test that realpath is returning something non-empty and not an error. """ - pwd = sftp.normalize('.') - self.assertTrue(len(pwd) > 0) - f = sftp.normalize('./' + FOLDER) - self.assertTrue(len(f) > 0) - self.assertEqual(os.path.join(pwd, FOLDER), f) + pwd = sftp.normalize(".") + assert len(pwd) > 0 + f = sftp.normalize("./" + sftp.FOLDER) + assert len(f) > 0 + assert os.path.join(pwd, sftp.FOLDER) == f - def test_F_mkdir(self): + def test_F_mkdir(self, sftp): """ verify that mkdir/rmdir work. """ - try: - sftp.mkdir(FOLDER + '/subfolder') - except: - self.assertTrue(False, 'exception creating subfolder') - try: - sftp.mkdir(FOLDER + '/subfolder') - self.assertTrue(False, 'no exception overwriting subfolder') - except IOError: - pass - try: - sftp.rmdir(FOLDER + '/subfolder') - except: - self.assertTrue(False, 'exception removing subfolder') - try: - sftp.rmdir(FOLDER + '/subfolder') - self.assertTrue(False, 'no exception removing nonexistent subfolder') - except IOError: - pass + sftp.mkdir(sftp.FOLDER + "/subfolder") + with pytest.raises(IOError): # generic msg only + sftp.mkdir(sftp.FOLDER + "/subfolder") + sftp.rmdir(sftp.FOLDER + "/subfolder") + with pytest.raises(IOError, match="No such file"): + sftp.rmdir(sftp.FOLDER + "/subfolder") - def test_G_chdir(self): + def test_G_chdir(self, sftp): """ verify that chdir/getcwd work. """ - root = sftp.normalize('.') - if root[-1] != '/': - root += '/' - try: - sftp.mkdir(FOLDER + '/alpha') - sftp.chdir(FOLDER + '/alpha') - sftp.mkdir('beta') - self.assertEqual(root + FOLDER + '/alpha', sftp.getcwd()) - self.assertEqual(['beta'], sftp.listdir('.')) - - sftp.chdir('beta') - with sftp.open('fish', 'w') as f: - f.write('hello\n') - sftp.chdir('..') - self.assertEqual(['fish'], sftp.listdir('beta')) - sftp.chdir('..') - self.assertEqual(['fish'], sftp.listdir('alpha/beta')) + root = sftp.normalize(".") + if root[-1] != "/": + root += "/" + try: + sftp.mkdir(sftp.FOLDER + "/alpha") + sftp.chdir(sftp.FOLDER + "/alpha") + sftp.mkdir("beta") + assert root + sftp.FOLDER + "/alpha" == sftp.getcwd() + assert ["beta"] == sftp.listdir(".") + + sftp.chdir("beta") + with sftp.open("fish", "w") as f: + f.write("hello\n") + sftp.chdir("..") + assert ["fish"] == sftp.listdir("beta") + sftp.chdir("..") + assert ["fish"] == sftp.listdir("alpha/beta") finally: sftp.chdir(root) try: - sftp.unlink(FOLDER + '/alpha/beta/fish') + sftp.unlink(sftp.FOLDER + "/alpha/beta/fish") except: pass try: - sftp.rmdir(FOLDER + '/alpha/beta') + sftp.rmdir(sftp.FOLDER + "/alpha/beta") except: pass try: - sftp.rmdir(FOLDER + '/alpha') + sftp.rmdir(sftp.FOLDER + "/alpha") except: pass - def test_H_get_put(self): + def test_H_get_put(self, sftp): """ verify that get/put work. """ - warnings.filterwarnings('ignore', 'tempnam.*') + warnings.filterwarnings("ignore", "tempnam.*") fd, localname = mkstemp() os.close(fd) - text = b'All I wanted was a plastic bunny rabbit.\n' - with open(localname, 'wb') as f: + text = b"All I wanted was a plastic bunny rabbit.\n" + with open(localname, "wb") as f: f.write(text) saved_progress = [] def progress_callback(x, y): saved_progress.append((x, y)) - sftp.put(localname, FOLDER + '/bunny.txt', progress_callback) - with sftp.open(FOLDER + '/bunny.txt', 'rb') as f: - self.assertEqual(text, f.read(128)) - self.assertEqual([(41, 41)], saved_progress) + sftp.put(localname, sftp.FOLDER + "/bunny.txt", progress_callback) + + with sftp.open(sftp.FOLDER + "/bunny.txt", "rb") as f: + assert text == f.read(128) + assert [(41, 41)] == saved_progress os.unlink(localname) fd, localname = mkstemp() os.close(fd) saved_progress = [] - sftp.get(FOLDER + '/bunny.txt', localname, progress_callback) + sftp.get(sftp.FOLDER + "/bunny.txt", localname, progress_callback) - with open(localname, 'rb') as f: - self.assertEqual(text, f.read(128)) - self.assertEqual([(41, 41)], saved_progress) + with open(localname, "rb") as f: + assert text == f.read(128) + assert [(41, 41)] == saved_progress os.unlink(localname) - sftp.unlink(FOLDER + '/bunny.txt') + sftp.unlink(sftp.FOLDER + "/bunny.txt") - def test_I_check(self): + def test_I_check(self, sftp): """ verify that file.check() works against our own server. (it's an sftp extension that we support, and may be the only ones who support it.) """ - with sftp.open(FOLDER + '/kitty.txt', 'w') as f: - f.write('here kitty kitty' * 64) - - try: - with sftp.open(FOLDER + '/kitty.txt', 'r') as f: - sum = f.check('sha1') - self.assertEqual('91059CFC6615941378D413CB5ADAF4C5EB293402', u(hexlify(sum)).upper()) - sum = f.check('md5', 0, 512) - self.assertEqual('93DE4788FCA28D471516963A1FE3856A', u(hexlify(sum)).upper()) - sum = f.check('md5', 0, 0, 510) - self.assertEqual('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6', - u(hexlify(sum)).upper()) + with sftp.open(sftp.FOLDER + "/kitty.txt", "w") as f: + f.write("here kitty kitty" * 64) + + try: + with sftp.open(sftp.FOLDER + "/kitty.txt", "r") as f: + sum = f.check("sha1") + assert ( + "91059CFC6615941378D413CB5ADAF4C5EB293402" + == u(hexlify(sum)).upper() + ) + sum = f.check("md5", 0, 512) + assert ( + "93DE4788FCA28D471516963A1FE3856A" + == u(hexlify(sum)).upper() + ) + sum = f.check("md5", 0, 0, 510) + assert ( + u(hexlify(sum)).upper() + == "EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6" + ) # noqa finally: - sftp.unlink(FOLDER + '/kitty.txt') + sftp.unlink(sftp.FOLDER + "/kitty.txt") - def test_J_x_flag(self): + def test_J_x_flag(self, sftp): """ verify that the 'x' flag works when opening a file. """ - sftp.open(FOLDER + '/unusual.txt', 'wx').close() + sftp.open(sftp.FOLDER + "/unusual.txt", "wx").close() try: try: - sftp.open(FOLDER + '/unusual.txt', 'wx') - self.fail('expected exception') + sftp.open(sftp.FOLDER + "/unusual.txt", "wx") + self.fail("expected exception") except IOError: pass finally: - sftp.unlink(FOLDER + '/unusual.txt') + sftp.unlink(sftp.FOLDER + "/unusual.txt") - def test_K_utf8(self): + def test_K_utf8(self, sftp): """ verify that unicode strings are encoded into utf8 correctly. """ - with sftp.open(FOLDER + '/something', 'w') as f: - f.write('okay') + with sftp.open(sftp.FOLDER + "/something", "w") as f: + f.write("okay") try: - sftp.rename(FOLDER + '/something', FOLDER + '/' + unicode_folder) - sftp.open(b(FOLDER) + utf8_folder, 'r') + sftp.rename( + sftp.FOLDER + "/something", sftp.FOLDER + "/" + unicode_folder + ) + sftp.open(b(sftp.FOLDER) + utf8_folder, "r") except Exception as e: - self.fail('exception ' + str(e)) - sftp.unlink(b(FOLDER) + utf8_folder) + self.fail("exception " + str(e)) + sftp.unlink(b(sftp.FOLDER) + utf8_folder) - def test_L_utf8_chdir(self): - sftp.mkdir(FOLDER + '/' + unicode_folder) + def test_L_utf8_chdir(self, sftp): + sftp.mkdir(sftp.FOLDER + "/" + unicode_folder) try: - sftp.chdir(FOLDER + '/' + unicode_folder) - with sftp.open('something', 'w') as f: - f.write('okay') - sftp.unlink('something') + sftp.chdir(sftp.FOLDER + "/" + unicode_folder) + with sftp.open("something", "w") as f: + f.write("okay") + sftp.unlink("something") finally: sftp.chdir() - sftp.rmdir(FOLDER + '/' + unicode_folder) + sftp.rmdir(sftp.FOLDER + "/" + unicode_folder) - def test_M_bad_readv(self): + def test_M_bad_readv(self, sftp): """ verify that readv at the end of the file doesn't essplode. """ - sftp.open(FOLDER + '/zero', 'w').close() + sftp.open(sftp.FOLDER + "/zero", "w").close() try: - with sftp.open(FOLDER + '/zero', 'r') as f: + with sftp.open(sftp.FOLDER + "/zero", "r") as f: f.readv([(0, 12)]) - with sftp.open(FOLDER + '/zero', 'r') as f: + with sftp.open(sftp.FOLDER + "/zero", "r") as f: file_size = f.stat().st_size f.prefetch(file_size) f.read(100) finally: - sftp.unlink(FOLDER + '/zero') + sftp.unlink(sftp.FOLDER + "/zero") - def test_N_put_without_confirm(self): + def test_N_put_without_confirm(self, sftp): """ verify that get/put work without confirmation. """ - warnings.filterwarnings('ignore', 'tempnam.*') + warnings.filterwarnings("ignore", "tempnam.*") fd, localname = mkstemp() os.close(fd) - text = b'All I wanted was a plastic bunny rabbit.\n' - with open(localname, 'wb') as f: + text = b"All I wanted was a plastic bunny rabbit.\n" + with open(localname, "wb") as f: f.write(text) saved_progress = [] def progress_callback(x, y): saved_progress.append((x, y)) - res = sftp.put(localname, FOLDER + '/bunny.txt', progress_callback, False) - self.assertEqual(SFTPAttributes().attr, res.attr) + res = sftp.put( + localname, sftp.FOLDER + "/bunny.txt", progress_callback, False + ) + + assert SFTPAttributes().attr == res.attr - with sftp.open(FOLDER + '/bunny.txt', 'r') as f: - self.assertEqual(text, f.read(128)) - self.assertEqual((41, 41), saved_progress[-1]) + with sftp.open(sftp.FOLDER + "/bunny.txt", "r") as f: + assert text == f.read(128) + assert (41, 41) == saved_progress[-1] os.unlink(localname) - sftp.unlink(FOLDER + '/bunny.txt') + sftp.unlink(sftp.FOLDER + "/bunny.txt") - def test_O_getcwd(self): + def test_O_getcwd(self, sftp): """ verify that chdir/getcwd work. """ - self.assertEqual(None, sftp.getcwd()) - root = sftp.normalize('.') - if root[-1] != '/': - root += '/' + assert sftp.getcwd() == None + root = sftp.normalize(".") + if root[-1] != "/": + root += "/" try: - sftp.mkdir(FOLDER + '/alpha') - sftp.chdir(FOLDER + '/alpha') - self.assertEqual('/' + FOLDER + '/alpha', sftp.getcwd()) + sftp.mkdir(sftp.FOLDER + "/alpha") + sftp.chdir(sftp.FOLDER + "/alpha") + assert sftp.getcwd() == "/" + sftp.FOLDER + "/alpha" finally: sftp.chdir(root) try: - sftp.rmdir(FOLDER + '/alpha') + sftp.rmdir(sftp.FOLDER + "/alpha") except: pass - def XXX_test_M_seek_append(self): + def XXX_test_M_seek_append(self, sftp): """ verify that seek does't affect writes during append. does not work except through paramiko. :( openssh fails. """ try: - with sftp.open(FOLDER + '/append.txt', 'a') as f: - f.write('first line\nsecond line\n') + with sftp.open(sftp.FOLDER + "/append.txt", "a") as f: + f.write("first line\nsecond line\n") f.seek(11, f.SEEK_SET) - f.write('third line\n') + f.write("third line\n") - with sftp.open(FOLDER + '/append.txt', 'r') as f: - self.assertEqual(f.stat().st_size, 34) - self.assertEqual(f.readline(), 'first line\n') - self.assertEqual(f.readline(), 'second line\n') - self.assertEqual(f.readline(), 'third line\n') + with sftp.open(sftp.FOLDER + "/append.txt", "r") as f: + assert f.stat().st_size == 34 + assert f.readline() == "first line\n" + assert f.readline() == "second line\n" + assert f.readline() == "third line\n" finally: - sftp.remove(FOLDER + '/append.txt') + sftp.remove(sftp.FOLDER + "/append.txt") - def test_putfo_empty_file(self): + def test_putfo_empty_file(self, sftp): """ Send an empty file and confirm it is sent. """ - target = FOLDER + '/empty file.txt' + target = sftp.FOLDER + "/empty file.txt" stream = StringIO() try: attrs = sftp.putfo(stream, target) # the returned attributes should not be null - self.assertNotEqual(attrs, None) + assert attrs is not None finally: sftp.remove(target) - - def test_N_file_with_percent(self): + # TODO: this test doesn't actually fail if the regression (removing '%' + # expansion to '%%' within sftp.py's def _log()) is removed - stacktraces + # appear but they're clearly emitted from subthreads that have no error + # handling. No point running it until that is fixed somehow. + @pytest.mark.skip("Doesn't prove anything right now") + def test_N_file_with_percent(self, sftp): """ verify that we can create a file with a '%' in the filename. ( it needs to be properly escaped by _log() ) """ - self.assertTrue( paramiko.util.get_logger("paramiko").handlers, "This unit test requires logging to be enabled" ) - f = sftp.open(FOLDER + '/test%file', 'w') + f = sftp.open(sftp.FOLDER + "/test%file", "w") try: - self.assertEqual(f.stat().st_size, 0) + assert f.stat().st_size == 0 finally: f.close() - sftp.remove(FOLDER + '/test%file') - + sftp.remove(sftp.FOLDER + "/test%file") - def test_O_non_utf8_data(self): + def test_O_non_utf8_data(self, sftp): """Test write() and read() of non utf8 data""" try: - with sftp.open('%s/nonutf8data' % FOLDER, 'w') as f: + with sftp.open("%s/nonutf8data" % sftp.FOLDER, "w") as f: f.write(NON_UTF8_DATA) - with sftp.open('%s/nonutf8data' % FOLDER, 'r') as f: + with sftp.open("%s/nonutf8data" % sftp.FOLDER, "r") as f: data = f.read() - self.assertEqual(data, NON_UTF8_DATA) - with sftp.open('%s/nonutf8data' % FOLDER, 'wb') as f: + assert data == NON_UTF8_DATA + with sftp.open("%s/nonutf8data" % sftp.FOLDER, "wb") as f: f.write(NON_UTF8_DATA) - with sftp.open('%s/nonutf8data' % FOLDER, 'rb') as f: + with sftp.open("%s/nonutf8data" % sftp.FOLDER, "rb") as f: data = f.read() - self.assertEqual(data, NON_UTF8_DATA) + assert data == NON_UTF8_DATA finally: - sftp.remove('%s/nonutf8data' % FOLDER) - + sftp.remove("%s/nonutf8data" % sftp.FOLDER) - def test_sftp_attributes_empty_str(self): + def test_sftp_attributes_empty_str(self, sftp): sftp_attributes = SFTPAttributes() - self.assertEqual(str(sftp_attributes), "?--------- 1 0 0 0 (unknown date) ?") + assert ( + str(sftp_attributes) + == "?--------- 1 0 0 0 (unknown date) ?" + ) - @skipUnlessBuiltin('buffer') - def test_write_buffer(self): + @needs_builtin("buffer") + def test_write_buffer(self, sftp): """Test write() using a buffer instance.""" - data = 3 * b'A potentially large block of data to chunk up.\n' + data = 3 * b"A potentially large block of data to chunk up.\n" try: - with sftp.open('%s/write_buffer' % FOLDER, 'wb') as f: + with sftp.open("%s/write_buffer" % sftp.FOLDER, "wb") as f: for offset in range(0, len(data), 8): f.write(buffer(data, offset, 8)) - with sftp.open('%s/write_buffer' % FOLDER, 'rb') as f: - self.assertEqual(f.read(), data) + with sftp.open("%s/write_buffer" % sftp.FOLDER, "rb") as f: + assert f.read() == data finally: - sftp.remove('%s/write_buffer' % FOLDER) + sftp.remove("%s/write_buffer" % sftp.FOLDER) - @skipUnlessBuiltin('memoryview') - def test_write_memoryview(self): + @needs_builtin("memoryview") + def test_write_memoryview(self, sftp): """Test write() using a memoryview instance.""" - data = 3 * b'A potentially large block of data to chunk up.\n' + data = 3 * b"A potentially large block of data to chunk up.\n" try: - with sftp.open('%s/write_memoryview' % FOLDER, 'wb') as f: + with sftp.open("%s/write_memoryview" % sftp.FOLDER, "wb") as f: view = memoryview(data) for offset in range(0, len(data), 8): - f.write(view[offset:offset+8]) + f.write(view[offset : offset + 8]) - with sftp.open('%s/write_memoryview' % FOLDER, 'rb') as f: - self.assertEqual(f.read(), data) + with sftp.open("%s/write_memoryview" % sftp.FOLDER, "rb") as f: + assert f.read() == data finally: - sftp.remove('%s/write_memoryview' % FOLDER) - - -if __name__ == '__main__': - SFTPTest.init_loopback() - # logging is required by test_N_file_with_percent - paramiko.util.log_to_file('test_sftp.log') - from unittest import main - main() + sftp.remove("%s/write_memoryview" % sftp.FOLDER) |