summaryrefslogtreecommitdiffhomepage
path: root/tests/test_sftp.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_sftp.py')
-rw-r--r--[-rwxr-xr-x]tests/test_sftp.py656
1 files changed, 269 insertions, 387 deletions
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index b3c7bf98..09a50453 100755..100644
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -32,16 +32,19 @@ import warnings
from binascii import hexlify
from tempfile import mkstemp
+import pytest
+
import paramiko
+import paramiko.util
from paramiko.py3compat import PY2, b, u, StringIO
from paramiko.common import o777, o600, o666, o644
-from tests import skipUnlessBuiltin
-from tests.stub_sftp import StubServer, StubSFTPServer
-from tests.loop import LoopSocket
-from tests.util import test_path
-import paramiko.util
from paramiko.sftp_attr import SFTPAttributes
+from .util import needs_builtin
+from .stub_sftp import StubServer, StubSFTPServer
+from .util import _support, slow
+
+
ARTICLE = '''
Insulin sensitivity and liver insulin receptor structure in ducks from two
genera
@@ -81,303 +84,201 @@ decreased compared with chicken.
# Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation byte"
NON_UTF8_DATA = b'\xC3\xC3'
-FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000')
-
-sftp = None
-tc = None
-g_big_file_test = True
-# we need to use eval(compile()) here because Py3.2 doesn't support the 'u' marker for unicode
-# this test is the only line in the entire program that has to be treated specially to support Py3.2
-unicode_folder = eval(compile(r"u'\u00fcnic\u00f8de'" if PY2 else r"'\u00fcnic\u00f8de'", 'test_sftp.py', 'eval'))
+unicode_folder = u'\u00fcnic\u00f8de' if PY2 else '\u00fcnic\u00f8de'
utf8_folder = b'/\xc3\xbcnic\xc3\xb8\x64\x65'
-def get_sftp():
- global sftp
- return sftp
-
-
-class SFTPTest (unittest.TestCase):
- @staticmethod
- def init(hostname, username, keyfile, passwd):
- global sftp, tc
-
- t = paramiko.Transport(hostname)
- tc = t
- try:
- key = paramiko.RSAKey.from_private_key_file(keyfile, passwd)
- except paramiko.PasswordRequiredException:
- sys.stderr.write('\n\nparamiko.RSAKey.from_private_key_file REQUIRES PASSWORD.\n')
- sys.stderr.write('You have two options:\n')
- sys.stderr.write('* Use the "-K" option to point to a different (non-password-protected)\n')
- sys.stderr.write(' private key file.\n')
- sys.stderr.write('* Use the "-P" option to provide the password needed to unlock this private\n')
- sys.stderr.write(' key.\n')
- sys.stderr.write('\n')
- sys.exit(1)
- try:
- t.connect(username=username, pkey=key)
- except paramiko.SSHException:
- t.close()
- sys.stderr.write('\n\nparamiko.Transport.connect FAILED.\n')
- sys.stderr.write('There are several possible reasons why it might fail so quickly:\n\n')
- sys.stderr.write('* The host to connect to (%s) is not a valid SSH server.\n' % hostname)
- sys.stderr.write(' (Use the "-H" option to change the host.)\n')
- sys.stderr.write('* The username to auth as (%s) is invalid.\n' % username)
- sys.stderr.write(' (Use the "-U" option to change the username.)\n')
- sys.stderr.write('* The private key given (%s) is not accepted by the server.\n' % keyfile)
- sys.stderr.write(' (Use the "-K" option to provide a different key file.)\n')
- sys.stderr.write('\n')
- sys.exit(1)
- sftp = paramiko.SFTP.from_transport(t)
-
- @staticmethod
- def init_loopback():
- global sftp, tc
-
- socks = LoopSocket()
- sockc = LoopSocket()
- sockc.link(socks)
- tc = paramiko.Transport(sockc)
- ts = paramiko.Transport(socks)
-
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
- ts.add_server_key(host_key)
- event = threading.Event()
- server = StubServer()
- ts.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer)
- ts.start_server(event, server)
- tc.connect(username='slowdive', password='pygmalion')
- event.wait(1.0)
-
- sftp = paramiko.SFTP.from_transport(tc)
-
- @staticmethod
- def set_big_file_test(onoff):
- global g_big_file_test
- g_big_file_test = onoff
-
- def setUp(self):
- global FOLDER
- for i in range(1000):
- FOLDER = FOLDER[:-3] + '%03d' % i
- try:
- sftp.mkdir(FOLDER)
- break
- except (IOError, OSError):
- pass
-
- def tearDown(self):
- #sftp.chdir()
- sftp.rmdir(FOLDER)
-
- def test_1_file(self):
+@slow
+class TestSFTP(object):
+ def test_1_file(self, sftp):
"""
verify that we can create a file.
"""
- f = sftp.open(FOLDER + '/test', 'w')
+ f = sftp.open(sftp.FOLDER + '/test', 'w')
try:
- self.assertEqual(f.stat().st_size, 0)
+ assert f.stat().st_size == 0
finally:
f.close()
- sftp.remove(FOLDER + '/test')
+ sftp.remove(sftp.FOLDER + '/test')
- def test_2_close(self):
+ def test_2_close(self, sftp):
"""
- verify that closing the sftp session doesn't do anything bad, and that
- a new one can be opened.
+ Verify that SFTP session close() causes a socket error on next action.
"""
- global sftp
sftp.close()
- try:
- sftp.open(FOLDER + '/test2', 'w')
- self.fail('expected exception')
- except:
- pass
- sftp = paramiko.SFTP.from_transport(tc)
+ with pytest.raises(socket.error, match='Socket is closed'):
+ sftp.open(sftp.FOLDER + '/test2', 'w')
- def test_2_sftp_can_be_used_as_context_manager(self):
+ def test_2_sftp_can_be_used_as_context_manager(self, sftp):
"""
verify that the sftp session is closed when exiting the context manager
"""
- global sftp
with sftp:
pass
- try:
- sftp.open(FOLDER + '/test2', 'w')
- self.fail('expected exception')
- except (EOFError, socket.error):
- pass
- finally:
- sftp = paramiko.SFTP.from_transport(tc)
+ with pytest.raises(socket.error, match='Socket is closed'):
+ sftp.open(sftp.FOLDER + '/test2', 'w')
- def test_3_write(self):
+ def test_3_write(self, sftp):
"""
verify that a file can be created and written, and the size is correct.
"""
try:
- with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f:
f.write(ARTICLE)
- self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
+ assert sftp.stat(sftp.FOLDER + '/duck.txt').st_size == 1483
finally:
- sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
- def test_3_sftp_file_can_be_used_as_context_manager(self):
+ def test_3_sftp_file_can_be_used_as_context_manager(self, sftp):
"""
verify that an opened file can be used as a context manager
"""
try:
- with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f:
f.write(ARTICLE)
- self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
+ assert sftp.stat(sftp.FOLDER + '/duck.txt').st_size == 1483
finally:
- sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
- def test_4_append(self):
+ def test_4_append(self, sftp):
"""
verify that a file can be opened for append, and tell() still works.
"""
try:
- with sftp.open(FOLDER + '/append.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/append.txt', 'w') as f:
f.write('first line\nsecond line\n')
- self.assertEqual(f.tell(), 23)
+ assert f.tell() == 23
- with sftp.open(FOLDER + '/append.txt', 'a+') as f:
+ with sftp.open(sftp.FOLDER + '/append.txt', 'a+') as f:
f.write('third line!!!\n')
- self.assertEqual(f.tell(), 37)
- self.assertEqual(f.stat().st_size, 37)
+ assert f.tell() == 37
+ assert f.stat().st_size == 37
f.seek(-26, f.SEEK_CUR)
- self.assertEqual(f.readline(), 'second line\n')
+ assert f.readline() == 'second line\n'
finally:
- sftp.remove(FOLDER + '/append.txt')
+ sftp.remove(sftp.FOLDER + '/append.txt')
- def test_5_rename(self):
+ def test_5_rename(self, sftp):
"""
verify that renaming a file works.
"""
try:
- with sftp.open(FOLDER + '/first.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/first.txt', 'w') as f:
f.write('content!\n')
- sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt')
- try:
- sftp.open(FOLDER + '/first.txt', 'r')
- self.assertTrue(False, 'no exception on reading nonexistent file')
- except IOError:
- pass
- with sftp.open(FOLDER + '/second.txt', 'r') as f:
+ sftp.rename(sftp.FOLDER + '/first.txt', sftp.FOLDER + '/second.txt')
+ with pytest.raises(IOError, match='No such file'):
+ sftp.open(sftp.FOLDER + '/first.txt', 'r')
+ with sftp.open(sftp.FOLDER + '/second.txt', 'r') as f:
f.seek(-6, f.SEEK_END)
- self.assertEqual(u(f.read(4)), 'tent')
+ assert u(f.read(4)) == 'tent'
finally:
+ # TODO: this is gross, make some sort of 'remove if possible' / 'rm
+ # -f' a-like, jeez
try:
- sftp.remove(FOLDER + '/first.txt')
+ sftp.remove(sftp.FOLDER + '/first.txt')
except:
pass
try:
- sftp.remove(FOLDER + '/second.txt')
+ sftp.remove(sftp.FOLDER + '/second.txt')
except:
pass
- def test_5a_posix_rename(self):
+ def test_5a_posix_rename(self, sftp):
"""Test posix-rename@openssh.com protocol extension."""
try:
# first check that the normal rename works as specified
- with sftp.open(FOLDER + '/a', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/a', 'w') as f:
f.write('one')
- sftp.rename(FOLDER + '/a', FOLDER + '/b')
- with sftp.open(FOLDER + '/a', 'w') as f:
+ sftp.rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b')
+ with sftp.open(sftp.FOLDER + '/a', 'w') as f:
f.write('two')
- try:
- sftp.rename(FOLDER + '/a', FOLDER + '/b')
- self.assertTrue(False, 'no exception when rename-ing onto existing file')
- except (OSError, IOError):
- pass
+ with pytest.raises(IOError): # actual message seems generic
+ sftp.rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b')
# now check with the posix_rename
- sftp.posix_rename(FOLDER + '/a', FOLDER + '/b')
- with sftp.open(FOLDER + '/b', 'r') as f:
+ sftp.posix_rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b')
+ with sftp.open(sftp.FOLDER + '/b', 'r') as f:
data = u(f.read())
- self.assertEqual('two', data, "Contents of renamed file not the same as original file")
+ err = "Contents of renamed file not the same as original file"
+ assert 'two' == data, err
finally:
try:
- sftp.remove(FOLDER + '/a')
+ sftp.remove(sftp.FOLDER + '/a')
except:
pass
try:
- sftp.remove(FOLDER + '/b')
+ sftp.remove(sftp.FOLDER + '/b')
except:
pass
- def test_6_folder(self):
+ def test_6_folder(self, sftp):
"""
create a temporary folder, verify that we can create a file in it, then
remove the folder and verify that we can't create a file in it anymore.
"""
- sftp.mkdir(FOLDER + '/subfolder')
- sftp.open(FOLDER + '/subfolder/test', 'w').close()
- sftp.remove(FOLDER + '/subfolder/test')
- sftp.rmdir(FOLDER + '/subfolder')
- try:
- sftp.open(FOLDER + '/subfolder/test')
- # shouldn't be able to create that file
- self.assertTrue(False, 'no exception at dummy file creation')
- except IOError:
- pass
+ sftp.mkdir(sftp.FOLDER + '/subfolder')
+ sftp.open(sftp.FOLDER + '/subfolder/test', 'w').close()
+ sftp.remove(sftp.FOLDER + '/subfolder/test')
+ sftp.rmdir(sftp.FOLDER + '/subfolder')
+ # shouldn't be able to create that file if dir removed
+ with pytest.raises(IOError, match="No such file"):
+ sftp.open(sftp.FOLDER + '/subfolder/test')
- def test_7_listdir(self):
+ def test_7_listdir(self, sftp):
"""
verify that a folder can be created, a bunch of files can be placed in
it, and those files show up in sftp.listdir.
"""
try:
- sftp.open(FOLDER + '/duck.txt', 'w').close()
- sftp.open(FOLDER + '/fish.txt', 'w').close()
- sftp.open(FOLDER + '/tertiary.py', 'w').close()
-
- x = sftp.listdir(FOLDER)
- self.assertEqual(len(x), 3)
- self.assertTrue('duck.txt' in x)
- self.assertTrue('fish.txt' in x)
- self.assertTrue('tertiary.py' in x)
- self.assertTrue('random' not in x)
+ sftp.open(sftp.FOLDER + '/duck.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/fish.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/tertiary.py', 'w').close()
+
+ x = sftp.listdir(sftp.FOLDER)
+ assert len(x) == 3
+ assert 'duck.txt' in x
+ assert 'fish.txt' in x
+ assert 'tertiary.py' in x
+ assert 'random' not in x
finally:
- sftp.remove(FOLDER + '/duck.txt')
- sftp.remove(FOLDER + '/fish.txt')
- sftp.remove(FOLDER + '/tertiary.py')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/fish.txt')
+ sftp.remove(sftp.FOLDER + '/tertiary.py')
- def test_7_5_listdir_iter(self):
+ def test_7_5_listdir_iter(self, sftp):
"""
listdir_iter version of above test
"""
try:
- sftp.open(FOLDER + '/duck.txt', 'w').close()
- sftp.open(FOLDER + '/fish.txt', 'w').close()
- sftp.open(FOLDER + '/tertiary.py', 'w').close()
-
- x = [x.filename for x in sftp.listdir_iter(FOLDER)]
- self.assertEqual(len(x), 3)
- self.assertTrue('duck.txt' in x)
- self.assertTrue('fish.txt' in x)
- self.assertTrue('tertiary.py' in x)
- self.assertTrue('random' not in x)
+ sftp.open(sftp.FOLDER + '/duck.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/fish.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/tertiary.py', 'w').close()
+
+ x = [x.filename for x in sftp.listdir_iter(sftp.FOLDER)]
+ assert len(x) == 3
+ assert 'duck.txt' in x
+ assert 'fish.txt' in x
+ assert 'tertiary.py' in x
+ assert 'random' not in x
finally:
- sftp.remove(FOLDER + '/duck.txt')
- sftp.remove(FOLDER + '/fish.txt')
- sftp.remove(FOLDER + '/tertiary.py')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/fish.txt')
+ sftp.remove(sftp.FOLDER + '/tertiary.py')
- def test_8_setstat(self):
+ def test_8_setstat(self, sftp):
"""
verify that the setstat functions (chown, chmod, utime, truncate) work.
"""
try:
- with sftp.open(FOLDER + '/special', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/special', 'w') as f:
f.write('x' * 1024)
- stat = sftp.stat(FOLDER + '/special')
- sftp.chmod(FOLDER + '/special', (stat.st_mode & ~o777) | o600)
- stat = sftp.stat(FOLDER + '/special')
+ stat = sftp.stat(sftp.FOLDER + '/special')
+ sftp.chmod(sftp.FOLDER + '/special', (stat.st_mode & ~o777) | o600)
+ stat = sftp.stat(sftp.FOLDER + '/special')
expected_mode = o600
if sys.platform == 'win32':
# chmod not really functional on windows
@@ -385,35 +286,35 @@ class SFTPTest (unittest.TestCase):
if sys.platform == 'cygwin':
# even worse.
expected_mode = o644
- self.assertEqual(stat.st_mode & o777, expected_mode)
- self.assertEqual(stat.st_size, 1024)
+ assert stat.st_mode & o777 == expected_mode
+ assert stat.st_size == 1024
mtime = stat.st_mtime - 3600
atime = stat.st_atime - 1800
- sftp.utime(FOLDER + '/special', (atime, mtime))
- stat = sftp.stat(FOLDER + '/special')
- self.assertEqual(stat.st_mtime, mtime)
+ sftp.utime(sftp.FOLDER + '/special', (atime, mtime))
+ stat = sftp.stat(sftp.FOLDER + '/special')
+ assert stat.st_mtime == mtime
if sys.platform not in ('win32', 'cygwin'):
- self.assertEqual(stat.st_atime, atime)
+ assert stat.st_atime == atime
# can't really test chown, since we'd have to know a valid uid.
- sftp.truncate(FOLDER + '/special', 512)
- stat = sftp.stat(FOLDER + '/special')
- self.assertEqual(stat.st_size, 512)
+ sftp.truncate(sftp.FOLDER + '/special', 512)
+ stat = sftp.stat(sftp.FOLDER + '/special')
+ assert stat.st_size == 512
finally:
- sftp.remove(FOLDER + '/special')
+ sftp.remove(sftp.FOLDER + '/special')
- def test_9_fsetstat(self):
+ def test_9_fsetstat(self, sftp):
"""
verify that the fsetstat functions (chown, chmod, utime, truncate)
work on open files.
"""
try:
- with sftp.open(FOLDER + '/special', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/special', 'w') as f:
f.write('x' * 1024)
- with sftp.open(FOLDER + '/special', 'r+') as f:
+ with sftp.open(sftp.FOLDER + '/special', 'r+') as f:
stat = f.stat()
f.chmod((stat.st_mode & ~o777) | o600)
stat = f.stat()
@@ -425,26 +326,26 @@ class SFTPTest (unittest.TestCase):
if sys.platform == 'cygwin':
# even worse.
expected_mode = o644
- self.assertEqual(stat.st_mode & o777, expected_mode)
- self.assertEqual(stat.st_size, 1024)
+ assert stat.st_mode & o777 == expected_mode
+ assert stat.st_size == 1024
mtime = stat.st_mtime - 3600
atime = stat.st_atime - 1800
f.utime((atime, mtime))
stat = f.stat()
- self.assertEqual(stat.st_mtime, mtime)
+ assert stat.st_mtime == mtime
if sys.platform not in ('win32', 'cygwin'):
- self.assertEqual(stat.st_atime, atime)
+ assert stat.st_atime == atime
# can't really test chown, since we'd have to know a valid uid.
f.truncate(512)
stat = f.stat()
- self.assertEqual(stat.st_size, 512)
+ assert stat.st_size == 512
finally:
- sftp.remove(FOLDER + '/special')
+ sftp.remove(sftp.FOLDER + '/special')
- def test_A_readline_seek(self):
+ def test_A_readline_seek(self, sftp):
"""
create a text file and write a bunch of text into it. then count the lines
in the file, and seek around to retrieve particular lines. this should
@@ -452,10 +353,10 @@ class SFTPTest (unittest.TestCase):
buffering is reset on 'seek'.
"""
try:
- with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f:
f.write(ARTICLE)
- with sftp.open(FOLDER + '/duck.txt', 'r+') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'r+') as f:
line_number = 0
loc = 0
pos_list = []
@@ -463,35 +364,35 @@ class SFTPTest (unittest.TestCase):
line_number += 1
pos_list.append(loc)
loc = f.tell()
- self.assertTrue(f.seekable())
+ assert f.seekable()
f.seek(pos_list[6], f.SEEK_SET)
- self.assertEqual(f.readline(), 'Nouzilly, France.\n')
+ assert f.readline(), 'Nouzilly == France.\n'
f.seek(pos_list[17], f.SEEK_SET)
- self.assertEqual(f.readline()[:4], 'duck')
+ assert f.readline()[:4] == 'duck'
f.seek(pos_list[10], f.SEEK_SET)
- self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n')
+ assert f.readline() == 'duck types were equally resistant to exogenous insulin compared with chicken.\n'
finally:
- sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
- def test_B_write_seek(self):
+ def test_B_write_seek(self, sftp):
"""
create a text file, seek back and change part of it, and verify that the
changes worked.
"""
try:
- with sftp.open(FOLDER + '/testing.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/testing.txt', 'w') as f:
f.write('hello kitty.\n')
f.seek(-5, f.SEEK_CUR)
f.write('dd')
- self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13)
- with sftp.open(FOLDER + '/testing.txt', 'r') as f:
+ assert sftp.stat(sftp.FOLDER + '/testing.txt').st_size == 13
+ with sftp.open(sftp.FOLDER + '/testing.txt', 'r') as f:
data = f.read(20)
- self.assertEqual(data, b'hello kiddy.\n')
+ assert data == b'hello kiddy.\n'
finally:
- sftp.remove(FOLDER + '/testing.txt')
+ sftp.remove(sftp.FOLDER + '/testing.txt')
- def test_C_symlink(self):
+ def test_C_symlink(self, sftp):
"""
create a symlink and then check that lstat doesn't follow it.
"""
@@ -500,97 +401,85 @@ class SFTPTest (unittest.TestCase):
return
try:
- with sftp.open(FOLDER + '/original.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/original.txt', 'w') as f:
f.write('original\n')
- sftp.symlink('original.txt', FOLDER + '/link.txt')
- self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt')
+ sftp.symlink('original.txt', sftp.FOLDER + '/link.txt')
+ assert sftp.readlink(sftp.FOLDER + '/link.txt') == 'original.txt'
- with sftp.open(FOLDER + '/link.txt', 'r') as f:
- self.assertEqual(f.readlines(), ['original\n'])
+ with sftp.open(sftp.FOLDER + '/link.txt', 'r') as f:
+ assert f.readlines() == ['original\n']
cwd = sftp.normalize('.')
if cwd[-1] == '/':
cwd = cwd[:-1]
- abs_path = cwd + '/' + FOLDER + '/original.txt'
- sftp.symlink(abs_path, FOLDER + '/link2.txt')
- self.assertEqual(abs_path, sftp.readlink(FOLDER + '/link2.txt'))
+ abs_path = cwd + '/' + sftp.FOLDER + '/original.txt'
+ sftp.symlink(abs_path, sftp.FOLDER + '/link2.txt')
+ assert abs_path == sftp.readlink(sftp.FOLDER + '/link2.txt')
- self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12)
- self.assertEqual(sftp.stat(FOLDER + '/link.txt').st_size, 9)
+ assert sftp.lstat(sftp.FOLDER + '/link.txt').st_size == 12
+ assert sftp.stat(sftp.FOLDER + '/link.txt').st_size == 9
# the sftp server may be hiding extra path members from us, so the
# length may be longer than we expect:
- self.assertTrue(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path))
- self.assertEqual(sftp.stat(FOLDER + '/link2.txt').st_size, 9)
- self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9)
+ assert sftp.lstat(sftp.FOLDER + '/link2.txt').st_size >= len(abs_path)
+ assert sftp.stat(sftp.FOLDER + '/link2.txt').st_size == 9
+ assert sftp.stat(sftp.FOLDER + '/original.txt').st_size == 9
finally:
try:
- sftp.remove(FOLDER + '/link.txt')
+ sftp.remove(sftp.FOLDER + '/link.txt')
except:
pass
try:
- sftp.remove(FOLDER + '/link2.txt')
+ sftp.remove(sftp.FOLDER + '/link2.txt')
except:
pass
try:
- sftp.remove(FOLDER + '/original.txt')
+ sftp.remove(sftp.FOLDER + '/original.txt')
except:
pass
- def test_D_flush_seek(self):
+ def test_D_flush_seek(self, sftp):
"""
verify that buffered writes are automatically flushed on seek.
"""
try:
- with sftp.open(FOLDER + '/happy.txt', 'w', 1) as f:
+ with sftp.open(sftp.FOLDER + '/happy.txt', 'w', 1) as f:
f.write('full line.\n')
f.write('partial')
f.seek(9, f.SEEK_SET)
f.write('?\n')
- with sftp.open(FOLDER + '/happy.txt', 'r') as f:
- self.assertEqual(f.readline(), u('full line?\n'))
- self.assertEqual(f.read(7), b'partial')
+ with sftp.open(sftp.FOLDER + '/happy.txt', 'r') as f:
+ assert f.readline() == u('full line?\n')
+ assert f.read(7) == b'partial'
finally:
try:
- sftp.remove(FOLDER + '/happy.txt')
+ sftp.remove(sftp.FOLDER + '/happy.txt')
except:
pass
- def test_E_realpath(self):
+ def test_E_realpath(self, sftp):
"""
test that realpath is returning something non-empty and not an
error.
"""
pwd = sftp.normalize('.')
- self.assertTrue(len(pwd) > 0)
- f = sftp.normalize('./' + FOLDER)
- self.assertTrue(len(f) > 0)
- self.assertEqual(os.path.join(pwd, FOLDER), f)
+ assert len(pwd) > 0
+ f = sftp.normalize('./' + sftp.FOLDER)
+ assert len(f) > 0
+ assert os.path.join(pwd, sftp.FOLDER) == f
- def test_F_mkdir(self):
+ def test_F_mkdir(self, sftp):
"""
verify that mkdir/rmdir work.
"""
- try:
- sftp.mkdir(FOLDER + '/subfolder')
- except:
- self.assertTrue(False, 'exception creating subfolder')
- try:
- sftp.mkdir(FOLDER + '/subfolder')
- self.assertTrue(False, 'no exception overwriting subfolder')
- except IOError:
- pass
- try:
- sftp.rmdir(FOLDER + '/subfolder')
- except:
- self.assertTrue(False, 'exception removing subfolder')
- try:
- sftp.rmdir(FOLDER + '/subfolder')
- self.assertTrue(False, 'no exception removing nonexistent subfolder')
- except IOError:
- pass
+ sftp.mkdir(sftp.FOLDER + '/subfolder')
+ with pytest.raises(IOError): # generic msg only
+ sftp.mkdir(sftp.FOLDER + '/subfolder')
+ sftp.rmdir(sftp.FOLDER + '/subfolder')
+ with pytest.raises(IOError, match="No such file"):
+ sftp.rmdir(sftp.FOLDER + '/subfolder')
- def test_G_chdir(self):
+ def test_G_chdir(self, sftp):
"""
verify that chdir/getcwd work.
"""
@@ -598,35 +487,35 @@ class SFTPTest (unittest.TestCase):
if root[-1] != '/':
root += '/'
try:
- sftp.mkdir(FOLDER + '/alpha')
- sftp.chdir(FOLDER + '/alpha')
+ sftp.mkdir(sftp.FOLDER + '/alpha')
+ sftp.chdir(sftp.FOLDER + '/alpha')
sftp.mkdir('beta')
- self.assertEqual(root + FOLDER + '/alpha', sftp.getcwd())
- self.assertEqual(['beta'], sftp.listdir('.'))
+ assert root + sftp.FOLDER + '/alpha' == sftp.getcwd()
+ assert ['beta'] == sftp.listdir('.')
sftp.chdir('beta')
with sftp.open('fish', 'w') as f:
f.write('hello\n')
sftp.chdir('..')
- self.assertEqual(['fish'], sftp.listdir('beta'))
+ assert ['fish'] == sftp.listdir('beta')
sftp.chdir('..')
- self.assertEqual(['fish'], sftp.listdir('alpha/beta'))
+ assert ['fish'] == sftp.listdir('alpha/beta')
finally:
sftp.chdir(root)
try:
- sftp.unlink(FOLDER + '/alpha/beta/fish')
+ sftp.unlink(sftp.FOLDER + '/alpha/beta/fish')
except:
pass
try:
- sftp.rmdir(FOLDER + '/alpha/beta')
+ sftp.rmdir(sftp.FOLDER + '/alpha/beta')
except:
pass
try:
- sftp.rmdir(FOLDER + '/alpha')
+ sftp.rmdir(sftp.FOLDER + '/alpha')
except:
pass
- def test_H_get_put(self):
+ def test_H_get_put(self, sftp):
"""
verify that get/put work.
"""
@@ -641,103 +530,102 @@ class SFTPTest (unittest.TestCase):
def progress_callback(x, y):
saved_progress.append((x, y))
- sftp.put(localname, FOLDER + '/bunny.txt', progress_callback)
+ sftp.put(localname, sftp.FOLDER + '/bunny.txt', progress_callback)
- with sftp.open(FOLDER + '/bunny.txt', 'rb') as f:
- self.assertEqual(text, f.read(128))
- self.assertEqual([(41, 41)], saved_progress)
+ with sftp.open(sftp.FOLDER + '/bunny.txt', 'rb') as f:
+ assert text == f.read(128)
+ assert [(41, 41)] == saved_progress
os.unlink(localname)
fd, localname = mkstemp()
os.close(fd)
saved_progress = []
- sftp.get(FOLDER + '/bunny.txt', localname, progress_callback)
+ sftp.get(sftp.FOLDER + '/bunny.txt', localname, progress_callback)
with open(localname, 'rb') as f:
- self.assertEqual(text, f.read(128))
- self.assertEqual([(41, 41)], saved_progress)
+ assert text == f.read(128)
+ assert [(41, 41)] == saved_progress
os.unlink(localname)
- sftp.unlink(FOLDER + '/bunny.txt')
+ sftp.unlink(sftp.FOLDER + '/bunny.txt')
- def test_I_check(self):
+ def test_I_check(self, sftp):
"""
verify that file.check() works against our own server.
(it's an sftp extension that we support, and may be the only ones who
support it.)
"""
- with sftp.open(FOLDER + '/kitty.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/kitty.txt', 'w') as f:
f.write('here kitty kitty' * 64)
try:
- with sftp.open(FOLDER + '/kitty.txt', 'r') as f:
+ with sftp.open(sftp.FOLDER + '/kitty.txt', 'r') as f:
sum = f.check('sha1')
- self.assertEqual('91059CFC6615941378D413CB5ADAF4C5EB293402', u(hexlify(sum)).upper())
+ assert '91059CFC6615941378D413CB5ADAF4C5EB293402' == u(hexlify(sum)).upper()
sum = f.check('md5', 0, 512)
- self.assertEqual('93DE4788FCA28D471516963A1FE3856A', u(hexlify(sum)).upper())
+ assert '93DE4788FCA28D471516963A1FE3856A' == u(hexlify(sum)).upper()
sum = f.check('md5', 0, 0, 510)
- self.assertEqual('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6',
- u(hexlify(sum)).upper())
+ assert u(hexlify(sum)).upper() == 'EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6' # noqa
finally:
- sftp.unlink(FOLDER + '/kitty.txt')
+ sftp.unlink(sftp.FOLDER + '/kitty.txt')
- def test_J_x_flag(self):
+ def test_J_x_flag(self, sftp):
"""
verify that the 'x' flag works when opening a file.
"""
- sftp.open(FOLDER + '/unusual.txt', 'wx').close()
+ sftp.open(sftp.FOLDER + '/unusual.txt', 'wx').close()
try:
try:
- sftp.open(FOLDER + '/unusual.txt', 'wx')
+ sftp.open(sftp.FOLDER + '/unusual.txt', 'wx')
self.fail('expected exception')
except IOError:
pass
finally:
- sftp.unlink(FOLDER + '/unusual.txt')
+ sftp.unlink(sftp.FOLDER + '/unusual.txt')
- def test_K_utf8(self):
+ def test_K_utf8(self, sftp):
"""
verify that unicode strings are encoded into utf8 correctly.
"""
- with sftp.open(FOLDER + '/something', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/something', 'w') as f:
f.write('okay')
try:
- sftp.rename(FOLDER + '/something', FOLDER + '/' + unicode_folder)
- sftp.open(b(FOLDER) + utf8_folder, 'r')
+ sftp.rename(sftp.FOLDER + '/something', sftp.FOLDER + '/' + unicode_folder)
+ sftp.open(b(sftp.FOLDER) + utf8_folder, 'r')
except Exception as e:
self.fail('exception ' + str(e))
- sftp.unlink(b(FOLDER) + utf8_folder)
+ sftp.unlink(b(sftp.FOLDER) + utf8_folder)
- def test_L_utf8_chdir(self):
- sftp.mkdir(FOLDER + '/' + unicode_folder)
+ def test_L_utf8_chdir(self, sftp):
+ sftp.mkdir(sftp.FOLDER + '/' + unicode_folder)
try:
- sftp.chdir(FOLDER + '/' + unicode_folder)
+ sftp.chdir(sftp.FOLDER + '/' + unicode_folder)
with sftp.open('something', 'w') as f:
f.write('okay')
sftp.unlink('something')
finally:
sftp.chdir()
- sftp.rmdir(FOLDER + '/' + unicode_folder)
+ sftp.rmdir(sftp.FOLDER + '/' + unicode_folder)
- def test_M_bad_readv(self):
+ def test_M_bad_readv(self, sftp):
"""
verify that readv at the end of the file doesn't essplode.
"""
- sftp.open(FOLDER + '/zero', 'w').close()
+ sftp.open(sftp.FOLDER + '/zero', 'w').close()
try:
- with sftp.open(FOLDER + '/zero', 'r') as f:
+ with sftp.open(sftp.FOLDER + '/zero', 'r') as f:
f.readv([(0, 12)])
- with sftp.open(FOLDER + '/zero', 'r') as f:
+ with sftp.open(sftp.FOLDER + '/zero', 'r') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
f.read(100)
finally:
- sftp.unlink(FOLDER + '/zero')
+ sftp.unlink(sftp.FOLDER + '/zero')
- def test_N_put_without_confirm(self):
+ def test_N_put_without_confirm(self, sftp):
"""
verify that get/put work without confirmation.
"""
@@ -752,138 +640,132 @@ class SFTPTest (unittest.TestCase):
def progress_callback(x, y):
saved_progress.append((x, y))
- res = sftp.put(localname, FOLDER + '/bunny.txt', progress_callback, False)
+ res = sftp.put(localname, sftp.FOLDER + '/bunny.txt', progress_callback, False)
- self.assertEqual(SFTPAttributes().attr, res.attr)
+ assert SFTPAttributes().attr == res.attr
- with sftp.open(FOLDER + '/bunny.txt', 'r') as f:
- self.assertEqual(text, f.read(128))
- self.assertEqual((41, 41), saved_progress[-1])
+ with sftp.open(sftp.FOLDER + '/bunny.txt', 'r') as f:
+ assert text == f.read(128)
+ assert (41, 41) == saved_progress[-1]
os.unlink(localname)
- sftp.unlink(FOLDER + '/bunny.txt')
+ sftp.unlink(sftp.FOLDER + '/bunny.txt')
- def test_O_getcwd(self):
+ def test_O_getcwd(self, sftp):
"""
verify that chdir/getcwd work.
"""
- self.assertEqual(None, sftp.getcwd())
+ assert sftp.getcwd() == None
root = sftp.normalize('.')
if root[-1] != '/':
root += '/'
try:
- sftp.mkdir(FOLDER + '/alpha')
- sftp.chdir(FOLDER + '/alpha')
- self.assertEqual('/' + FOLDER + '/alpha', sftp.getcwd())
+ sftp.mkdir(sftp.FOLDER + '/alpha')
+ sftp.chdir(sftp.FOLDER + '/alpha')
+ assert sftp.getcwd() == '/' + sftp.FOLDER + '/alpha'
finally:
sftp.chdir(root)
try:
- sftp.rmdir(FOLDER + '/alpha')
+ sftp.rmdir(sftp.FOLDER + '/alpha')
except:
pass
- def XXX_test_M_seek_append(self):
+ def XXX_test_M_seek_append(self, sftp):
"""
verify that seek does't affect writes during append.
does not work except through paramiko. :( openssh fails.
"""
try:
- with sftp.open(FOLDER + '/append.txt', 'a') as f:
+ with sftp.open(sftp.FOLDER + '/append.txt', 'a') as f:
f.write('first line\nsecond line\n')
f.seek(11, f.SEEK_SET)
f.write('third line\n')
- with sftp.open(FOLDER + '/append.txt', 'r') as f:
- self.assertEqual(f.stat().st_size, 34)
- self.assertEqual(f.readline(), 'first line\n')
- self.assertEqual(f.readline(), 'second line\n')
- self.assertEqual(f.readline(), 'third line\n')
+ with sftp.open(sftp.FOLDER + '/append.txt', 'r') as f:
+ assert f.stat().st_size == 34
+ assert f.readline() == 'first line\n'
+ assert f.readline() == 'second line\n'
+ assert f.readline() == 'third line\n'
finally:
- sftp.remove(FOLDER + '/append.txt')
+ sftp.remove(sftp.FOLDER + '/append.txt')
- def test_putfo_empty_file(self):
+ def test_putfo_empty_file(self, sftp):
"""
Send an empty file and confirm it is sent.
"""
- target = FOLDER + '/empty file.txt'
+ target = sftp.FOLDER + '/empty file.txt'
stream = StringIO()
try:
attrs = sftp.putfo(stream, target)
# the returned attributes should not be null
- self.assertNotEqual(attrs, None)
+ assert attrs is not None
finally:
sftp.remove(target)
-
- def test_N_file_with_percent(self):
+ # TODO: this test doesn't actually fail if the regression (removing '%'
+ # expansion to '%%' within sftp.py's def _log()) is removed - stacktraces
+ # appear but they're clearly emitted from subthreads that have no error
+ # handling. No point running it until that is fixed somehow.
+ @pytest.mark.skip("Doesn't prove anything right now")
+ def test_N_file_with_percent(self, sftp):
"""
verify that we can create a file with a '%' in the filename.
( it needs to be properly escaped by _log() )
"""
- self.assertTrue( paramiko.util.get_logger("paramiko").handlers, "This unit test requires logging to be enabled" )
- f = sftp.open(FOLDER + '/test%file', 'w')
+ f = sftp.open(sftp.FOLDER + '/test%file', 'w')
try:
- self.assertEqual(f.stat().st_size, 0)
+ assert f.stat().st_size == 0
finally:
f.close()
- sftp.remove(FOLDER + '/test%file')
-
+ sftp.remove(sftp.FOLDER + '/test%file')
- def test_O_non_utf8_data(self):
+ def test_O_non_utf8_data(self, sftp):
"""Test write() and read() of non utf8 data"""
try:
- with sftp.open('%s/nonutf8data' % FOLDER, 'w') as f:
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'w') as f:
f.write(NON_UTF8_DATA)
- with sftp.open('%s/nonutf8data' % FOLDER, 'r') as f:
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'r') as f:
data = f.read()
- self.assertEqual(data, NON_UTF8_DATA)
- with sftp.open('%s/nonutf8data' % FOLDER, 'wb') as f:
+ assert data == NON_UTF8_DATA
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'wb') as f:
f.write(NON_UTF8_DATA)
- with sftp.open('%s/nonutf8data' % FOLDER, 'rb') as f:
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'rb') as f:
data = f.read()
- self.assertEqual(data, NON_UTF8_DATA)
+ assert data == NON_UTF8_DATA
finally:
- sftp.remove('%s/nonutf8data' % FOLDER)
+ sftp.remove('%s/nonutf8data' % sftp.FOLDER)
- def test_sftp_attributes_empty_str(self):
+ def test_sftp_attributes_empty_str(self, sftp):
sftp_attributes = SFTPAttributes()
- self.assertEqual(str(sftp_attributes), "?--------- 1 0 0 0 (unknown date) ?")
+ assert str(sftp_attributes) == "?--------- 1 0 0 0 (unknown date) ?"
- @skipUnlessBuiltin('buffer')
- def test_write_buffer(self):
+ @needs_builtin('buffer')
+ def test_write_buffer(self, sftp):
"""Test write() using a buffer instance."""
data = 3 * b'A potentially large block of data to chunk up.\n'
try:
- with sftp.open('%s/write_buffer' % FOLDER, 'wb') as f:
+ with sftp.open('%s/write_buffer' % sftp.FOLDER, 'wb') as f:
for offset in range(0, len(data), 8):
f.write(buffer(data, offset, 8))
- with sftp.open('%s/write_buffer' % FOLDER, 'rb') as f:
- self.assertEqual(f.read(), data)
+ with sftp.open('%s/write_buffer' % sftp.FOLDER, 'rb') as f:
+ assert f.read() == data
finally:
- sftp.remove('%s/write_buffer' % FOLDER)
+ sftp.remove('%s/write_buffer' % sftp.FOLDER)
- @skipUnlessBuiltin('memoryview')
- def test_write_memoryview(self):
+ @needs_builtin('memoryview')
+ def test_write_memoryview(self, sftp):
"""Test write() using a memoryview instance."""
data = 3 * b'A potentially large block of data to chunk up.\n'
try:
- with sftp.open('%s/write_memoryview' % FOLDER, 'wb') as f:
+ with sftp.open('%s/write_memoryview' % sftp.FOLDER, 'wb') as f:
view = memoryview(data)
for offset in range(0, len(data), 8):
f.write(view[offset:offset+8])
- with sftp.open('%s/write_memoryview' % FOLDER, 'rb') as f:
- self.assertEqual(f.read(), data)
+ with sftp.open('%s/write_memoryview' % sftp.FOLDER, 'rb') as f:
+ assert f.read() == data
finally:
- sftp.remove('%s/write_memoryview' % FOLDER)
-
-
-if __name__ == '__main__':
- SFTPTest.init_loopback()
- # logging is required by test_N_file_with_percent
- paramiko.util.log_to_file('test_sftp.log')
- from unittest import main
- main()
+ sftp.remove('%s/write_memoryview' % sftp.FOLDER)