diff options
Diffstat (limited to 'tests/test_sftp.py')
-rwxr-xr-x | tests/test_sftp.py | 523 |
1 files changed, 294 insertions, 229 deletions
diff --git a/tests/test_sftp.py b/tests/test_sftp.py index 2eadabcd..2b6aa3b6 100755 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -7,7 +7,7 @@ # Software Foundation; either version 2.1 of the License, or (at your option) # any later version. # -# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. @@ -24,18 +24,20 @@ do test file operations in (so no existing files will be harmed). """ from binascii import hexlify -import logging import os -import random -import struct import sys +import warnings import threading -import time import unittest +from tempfile import mkstemp import paramiko -from stub_sftp import StubServer, StubSFTPServer -from loop import LoopSocket +from paramiko.py3compat import PY2, b, u, StringIO +from paramiko.common import o777, o600, o666, o644 +from tests.stub_sftp import StubServer, StubSFTPServer +from tests.loop import LoopSocket +from tests.util import test_path +import paramiko.util from paramiko.sftp_attr import SFTPAttributes ARTICLE = ''' @@ -65,11 +67,27 @@ liver insulin receptors. Their sensitivity to insulin is, however, similarly decreased compared with chicken. ''' + +# Here is how unicode characters are encoded over 1 to 6 bytes in utf-8 +# U-00000000 - U-0000007F: 0xxxxxxx +# U-00000080 - U-000007FF: 110xxxxx 10xxxxxx +# U-00000800 - U-0000FFFF: 1110xxxx 10xxxxxx 10xxxxxx +# U-00010000 - U-001FFFFF: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx +# U-00200000 - U-03FFFFFF: 111110xx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx +# U-04000000 - U-7FFFFFFF: 1111110x 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx +# Note that: hex(int('11000011',2)) == '0xc3' +# Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation byte" +NON_UTF8_DATA = b'\xC3\xC3' + FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000') sftp = None tc = None g_big_file_test = True +# we need to use eval(compile()) here because Py3.2 doesn't support the 'u' marker for unicode +# this test is the only line in the entire program that has to be treated specially to support Py3.2 +unicode_folder = eval(compile(r"u'\u00fcnic\u00f8de'" if PY2 else r"'\u00fcnic\u00f8de'", 'test_sftp.py', 'eval')) +utf8_folder = b'/\xc3\xbcnic\xc3\xb8\x64\x65' def get_sftp(): @@ -121,7 +139,7 @@ class SFTPTest (unittest.TestCase): tc = paramiko.Transport(sockc) ts = paramiko.Transport(socks) - host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) ts.add_server_key(host_key) event = threading.Event() server = StubServer() @@ -140,7 +158,7 @@ class SFTPTest (unittest.TestCase): def setUp(self): global FOLDER - for i in xrange(1000): + for i in range(1000): FOLDER = FOLDER[:-3] + '%03d' % i try: sftp.mkdir(FOLDER) @@ -149,6 +167,7 @@ class SFTPTest (unittest.TestCase): pass def tearDown(self): + #sftp.chdir() sftp.rmdir(FOLDER) def test_1_file(self): @@ -158,8 +177,8 @@ class SFTPTest (unittest.TestCase): f = sftp.open(FOLDER + '/test', 'w') try: self.assertEqual(f.stat().st_size, 0) - f.close() finally: + f.close() sftp.remove(FOLDER + '/test') def test_2_close(self): @@ -180,10 +199,20 @@ class SFTPTest (unittest.TestCase): """ verify that a file can be created and written, and the size is correct. """ - f = sftp.open(FOLDER + '/duck.txt', 'w') try: - f.write(ARTICLE) - f.close() + with sftp.open(FOLDER + '/duck.txt', 'w') as f: + f.write(ARTICLE) + self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) + finally: + sftp.remove(FOLDER + '/duck.txt') + + def test_3_sftp_file_can_be_used_as_context_manager(self): + """ + verify that an opened file can be used as a context manager + """ + try: + with sftp.open(FOLDER + '/duck.txt', 'w') as f: + f.write(ARTICLE) self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) finally: sftp.remove(FOLDER + '/duck.txt') @@ -192,19 +221,17 @@ class SFTPTest (unittest.TestCase): """ verify that a file can be opened for append, and tell() still works. """ - f = sftp.open(FOLDER + '/append.txt', 'w') try: - f.write('first line\nsecond line\n') - self.assertEqual(f.tell(), 23) - f.close() - - f = sftp.open(FOLDER + '/append.txt', 'a+') - f.write('third line!!!\n') - self.assertEqual(f.tell(), 37) - self.assertEqual(f.stat().st_size, 37) - f.seek(-26, f.SEEK_CUR) - self.assertEqual(f.readline(), 'second line\n') - f.close() + with sftp.open(FOLDER + '/append.txt', 'w') as f: + f.write('first line\nsecond line\n') + self.assertEqual(f.tell(), 23) + + with sftp.open(FOLDER + '/append.txt', 'a+') as f: + f.write('third line!!!\n') + self.assertEqual(f.tell(), 37) + self.assertEqual(f.stat().st_size, 37) + f.seek(-26, f.SEEK_CUR) + self.assertEqual(f.readline(), 'second line\n') finally: sftp.remove(FOLDER + '/append.txt') @@ -212,20 +239,18 @@ class SFTPTest (unittest.TestCase): """ verify that renaming a file works. """ - f = sftp.open(FOLDER + '/first.txt', 'w') try: - f.write('content!\n'); - f.close() + with sftp.open(FOLDER + '/first.txt', 'w') as f: + f.write('content!\n') sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt') try: - f = sftp.open(FOLDER + '/first.txt', 'r') - self.assert_(False, 'no exception on reading nonexistent file') + sftp.open(FOLDER + '/first.txt', 'r') + self.assertTrue(False, 'no exception on reading nonexistent file') except IOError: pass - f = sftp.open(FOLDER + '/second.txt', 'r') - f.seek(-6, f.SEEK_END) - self.assertEqual(f.read(4), 'tent') - f.close() + with sftp.open(FOLDER + '/second.txt', 'r') as f: + f.seek(-6, f.SEEK_END) + self.assertEqual(u(f.read(4)), 'tent') finally: try: sftp.remove(FOLDER + '/first.txt') @@ -242,14 +267,13 @@ class SFTPTest (unittest.TestCase): remove the folder and verify that we can't create a file in it anymore. """ sftp.mkdir(FOLDER + '/subfolder') - f = sftp.open(FOLDER + '/subfolder/test', 'w') - f.close() + sftp.open(FOLDER + '/subfolder/test', 'w').close() sftp.remove(FOLDER + '/subfolder/test') sftp.rmdir(FOLDER + '/subfolder') try: - f = sftp.open(FOLDER + '/subfolder/test') + sftp.open(FOLDER + '/subfolder/test') # shouldn't be able to create that file - self.assert_(False, 'no exception at dummy file creation') + self.assertTrue(False, 'no exception at dummy file creation') except IOError: pass @@ -259,21 +283,16 @@ class SFTPTest (unittest.TestCase): and those files show up in sftp.listdir. """ try: - f = sftp.open(FOLDER + '/duck.txt', 'w') - f.close() - - f = sftp.open(FOLDER + '/fish.txt', 'w') - f.close() - - f = sftp.open(FOLDER + '/tertiary.py', 'w') - f.close() + sftp.open(FOLDER + '/duck.txt', 'w').close() + sftp.open(FOLDER + '/fish.txt', 'w').close() + sftp.open(FOLDER + '/tertiary.py', 'w').close() x = sftp.listdir(FOLDER) self.assertEqual(len(x), 3) - self.assert_('duck.txt' in x) - self.assert_('fish.txt' in x) - self.assert_('tertiary.py' in x) - self.assert_('random' not in x) + self.assertTrue('duck.txt' in x) + self.assertTrue('fish.txt' in x) + self.assertTrue('tertiary.py' in x) + self.assertTrue('random' not in x) finally: sftp.remove(FOLDER + '/duck.txt') sftp.remove(FOLDER + '/fish.txt') @@ -283,22 +302,21 @@ class SFTPTest (unittest.TestCase): """ verify that the setstat functions (chown, chmod, utime, truncate) work. """ - f = sftp.open(FOLDER + '/special', 'w') try: - f.write('x' * 1024) - f.close() + with sftp.open(FOLDER + '/special', 'w') as f: + f.write('x' * 1024) stat = sftp.stat(FOLDER + '/special') - sftp.chmod(FOLDER + '/special', (stat.st_mode & ~0777) | 0600) + sftp.chmod(FOLDER + '/special', (stat.st_mode & ~o777) | o600) stat = sftp.stat(FOLDER + '/special') - expected_mode = 0600 + expected_mode = o600 if sys.platform == 'win32': # chmod not really functional on windows - expected_mode = 0666 + expected_mode = o666 if sys.platform == 'cygwin': # even worse. - expected_mode = 0644 - self.assertEqual(stat.st_mode & 0777, expected_mode) + expected_mode = o644 + self.assertEqual(stat.st_mode & o777, expected_mode) self.assertEqual(stat.st_size, 1024) mtime = stat.st_mtime - 3600 @@ -322,40 +340,38 @@ class SFTPTest (unittest.TestCase): verify that the fsetstat functions (chown, chmod, utime, truncate) work on open files. """ - f = sftp.open(FOLDER + '/special', 'w') try: - f.write('x' * 1024) - f.close() - - f = sftp.open(FOLDER + '/special', 'r+') - stat = f.stat() - f.chmod((stat.st_mode & ~0777) | 0600) - stat = f.stat() - - expected_mode = 0600 - if sys.platform == 'win32': - # chmod not really functional on windows - expected_mode = 0666 - if sys.platform == 'cygwin': - # even worse. - expected_mode = 0644 - self.assertEqual(stat.st_mode & 0777, expected_mode) - self.assertEqual(stat.st_size, 1024) - - mtime = stat.st_mtime - 3600 - atime = stat.st_atime - 1800 - f.utime((atime, mtime)) - stat = f.stat() - self.assertEqual(stat.st_mtime, mtime) - if sys.platform not in ('win32', 'cygwin'): - self.assertEqual(stat.st_atime, atime) - - # can't really test chown, since we'd have to know a valid uid. - - f.truncate(512) - stat = f.stat() - self.assertEqual(stat.st_size, 512) - f.close() + with sftp.open(FOLDER + '/special', 'w') as f: + f.write('x' * 1024) + + with sftp.open(FOLDER + '/special', 'r+') as f: + stat = f.stat() + f.chmod((stat.st_mode & ~o777) | o600) + stat = f.stat() + + expected_mode = o600 + if sys.platform == 'win32': + # chmod not really functional on windows + expected_mode = o666 + if sys.platform == 'cygwin': + # even worse. + expected_mode = o644 + self.assertEqual(stat.st_mode & o777, expected_mode) + self.assertEqual(stat.st_size, 1024) + + mtime = stat.st_mtime - 3600 + atime = stat.st_atime - 1800 + f.utime((atime, mtime)) + stat = f.stat() + self.assertEqual(stat.st_mtime, mtime) + if sys.platform not in ('win32', 'cygwin'): + self.assertEqual(stat.st_atime, atime) + + # can't really test chown, since we'd have to know a valid uid. + + f.truncate(512) + stat = f.stat() + self.assertEqual(stat.st_size, 512) finally: sftp.remove(FOLDER + '/special') @@ -367,25 +383,23 @@ class SFTPTest (unittest.TestCase): buffering is reset on 'seek'. """ try: - f = sftp.open(FOLDER + '/duck.txt', 'w') - f.write(ARTICLE) - f.close() - - f = sftp.open(FOLDER + '/duck.txt', 'r+') - line_number = 0 - loc = 0 - pos_list = [] - for line in f: - line_number += 1 - pos_list.append(loc) - loc = f.tell() - f.seek(pos_list[6], f.SEEK_SET) - self.assertEqual(f.readline(), 'Nouzilly, France.\n') - f.seek(pos_list[17], f.SEEK_SET) - self.assertEqual(f.readline()[:4], 'duck') - f.seek(pos_list[10], f.SEEK_SET) - self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n') - f.close() + with sftp.open(FOLDER + '/duck.txt', 'w') as f: + f.write(ARTICLE) + + with sftp.open(FOLDER + '/duck.txt', 'r+') as f: + line_number = 0 + loc = 0 + pos_list = [] + for line in f: + line_number += 1 + pos_list.append(loc) + loc = f.tell() + f.seek(pos_list[6], f.SEEK_SET) + self.assertEqual(f.readline(), 'Nouzilly, France.\n') + f.seek(pos_list[17], f.SEEK_SET) + self.assertEqual(f.readline()[:4], 'duck') + f.seek(pos_list[10], f.SEEK_SET) + self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n') finally: sftp.remove(FOLDER + '/duck.txt') @@ -394,18 +408,16 @@ class SFTPTest (unittest.TestCase): create a text file, seek back and change part of it, and verify that the changes worked. """ - f = sftp.open(FOLDER + '/testing.txt', 'w') try: - f.write('hello kitty.\n') - f.seek(-5, f.SEEK_CUR) - f.write('dd') - f.close() + with sftp.open(FOLDER + '/testing.txt', 'w') as f: + f.write('hello kitty.\n') + f.seek(-5, f.SEEK_CUR) + f.write('dd') self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13) - f = sftp.open(FOLDER + '/testing.txt', 'r') - data = f.read(20) - f.close() - self.assertEqual(data, 'hello kiddy.\n') + with sftp.open(FOLDER + '/testing.txt', 'r') as f: + data = f.read(20) + self.assertEqual(data, b'hello kiddy.\n') finally: sftp.remove(FOLDER + '/testing.txt') @@ -417,16 +429,14 @@ class SFTPTest (unittest.TestCase): # skip symlink tests on windows return - f = sftp.open(FOLDER + '/original.txt', 'w') try: - f.write('original\n') - f.close() + with sftp.open(FOLDER + '/original.txt', 'w') as f: + f.write('original\n') sftp.symlink('original.txt', FOLDER + '/link.txt') self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt') - f = sftp.open(FOLDER + '/link.txt', 'r') - self.assertEqual(f.readlines(), [ 'original\n' ]) - f.close() + with sftp.open(FOLDER + '/link.txt', 'r') as f: + self.assertEqual(f.readlines(), ['original\n']) cwd = sftp.normalize('.') if cwd[-1] == '/': @@ -439,7 +449,7 @@ class SFTPTest (unittest.TestCase): self.assertEqual(sftp.stat(FOLDER + '/link.txt').st_size, 9) # the sftp server may be hiding extra path members from us, so the # length may be longer than we expect: - self.assert_(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path)) + self.assertTrue(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path)) self.assertEqual(sftp.stat(FOLDER + '/link2.txt').st_size, 9) self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9) finally: @@ -460,18 +470,16 @@ class SFTPTest (unittest.TestCase): """ verify that buffered writes are automatically flushed on seek. """ - f = sftp.open(FOLDER + '/happy.txt', 'w', 1) try: - f.write('full line.\n') - f.write('partial') - f.seek(9, f.SEEK_SET) - f.write('?\n') - f.close() - - f = sftp.open(FOLDER + '/happy.txt', 'r') - self.assertEqual(f.readline(), 'full line?\n') - self.assertEqual(f.read(7), 'partial') - f.close() + with sftp.open(FOLDER + '/happy.txt', 'w', 1) as f: + f.write('full line.\n') + f.write('partial') + f.seek(9, f.SEEK_SET) + f.write('?\n') + + with sftp.open(FOLDER + '/happy.txt', 'r') as f: + self.assertEqual(f.readline(), u('full line?\n')) + self.assertEqual(f.read(7), b'partial') finally: try: sftp.remove(FOLDER + '/happy.txt') @@ -484,10 +492,10 @@ class SFTPTest (unittest.TestCase): error. """ pwd = sftp.normalize('.') - self.assert_(len(pwd) > 0) + self.assertTrue(len(pwd) > 0) f = sftp.normalize('./' + FOLDER) - self.assert_(len(f) > 0) - self.assertEquals(os.path.join(pwd, FOLDER), f) + self.assertTrue(len(f) > 0) + self.assertEqual(os.path.join(pwd, FOLDER), f) def test_F_mkdir(self): """ @@ -496,19 +504,19 @@ class SFTPTest (unittest.TestCase): try: sftp.mkdir(FOLDER + '/subfolder') except: - self.assert_(False, 'exception creating subfolder') + self.assertTrue(False, 'exception creating subfolder') try: sftp.mkdir(FOLDER + '/subfolder') - self.assert_(False, 'no exception overwriting subfolder') + self.assertTrue(False, 'no exception overwriting subfolder') except IOError: pass try: sftp.rmdir(FOLDER + '/subfolder') except: - self.assert_(False, 'exception removing subfolder') + self.assertTrue(False, 'exception removing subfolder') try: sftp.rmdir(FOLDER + '/subfolder') - self.assert_(False, 'no exception removing nonexistent subfolder') + self.assertTrue(False, 'no exception removing nonexistent subfolder') except IOError: pass @@ -523,17 +531,16 @@ class SFTPTest (unittest.TestCase): sftp.mkdir(FOLDER + '/alpha') sftp.chdir(FOLDER + '/alpha') sftp.mkdir('beta') - self.assertEquals(root + FOLDER + '/alpha', sftp.getcwd()) - self.assertEquals(['beta'], sftp.listdir('.')) + self.assertEqual(root + FOLDER + '/alpha', sftp.getcwd()) + self.assertEqual(['beta'], sftp.listdir('.')) sftp.chdir('beta') - f = sftp.open('fish', 'w') - f.write('hello\n') - f.close() + with sftp.open('fish', 'w') as f: + f.write('hello\n') sftp.chdir('..') - self.assertEquals(['fish'], sftp.listdir('beta')) + self.assertEqual(['fish'], sftp.listdir('beta')) sftp.chdir('..') - self.assertEquals(['fish'], sftp.listdir('alpha/beta')) + self.assertEqual(['fish'], sftp.listdir('alpha/beta')) finally: sftp.chdir(root) try: @@ -553,33 +560,32 @@ class SFTPTest (unittest.TestCase): """ verify that get/put work. """ - import os, warnings warnings.filterwarnings('ignore', 'tempnam.*') - localname = os.tempnam() - text = 'All I wanted was a plastic bunny rabbit.\n' - f = open(localname, 'wb') - f.write(text) - f.close() + fd, localname = mkstemp() + os.close(fd) + text = b'All I wanted was a plastic bunny rabbit.\n' + with open(localname, 'wb') as f: + f.write(text) saved_progress = [] + def progress_callback(x, y): saved_progress.append((x, y)) sftp.put(localname, FOLDER + '/bunny.txt', progress_callback) - f = sftp.open(FOLDER + '/bunny.txt', 'r') - self.assertEquals(text, f.read(128)) - f.close() - self.assertEquals((41, 41), saved_progress[-1]) + with sftp.open(FOLDER + '/bunny.txt', 'rb') as f: + self.assertEqual(text, f.read(128)) + self.assertEqual((41, 41), saved_progress[-1]) os.unlink(localname) - localname = os.tempnam() + fd, localname = mkstemp() + os.close(fd) saved_progress = [] sftp.get(FOLDER + '/bunny.txt', localname, progress_callback) - f = open(localname, 'rb') - self.assertEquals(text, f.read(128)) - f.close() - self.assertEquals((41, 41), saved_progress[-1]) + with open(localname, 'rb') as f: + self.assertEqual(text, f.read(128)) + self.assertEqual((41, 41), saved_progress[-1]) os.unlink(localname) sftp.unlink(FOLDER + '/bunny.txt') @@ -590,20 +596,18 @@ class SFTPTest (unittest.TestCase): (it's an sftp extension that we support, and may be the only ones who support it.) """ - f = sftp.open(FOLDER + '/kitty.txt', 'w') - f.write('here kitty kitty' * 64) - f.close() + with sftp.open(FOLDER + '/kitty.txt', 'w') as f: + f.write('here kitty kitty' * 64) try: - f = sftp.open(FOLDER + '/kitty.txt', 'r') - sum = f.check('sha1') - self.assertEquals('91059CFC6615941378D413CB5ADAF4C5EB293402', hexlify(sum).upper()) - sum = f.check('md5', 0, 512) - self.assertEquals('93DE4788FCA28D471516963A1FE3856A', hexlify(sum).upper()) - sum = f.check('md5', 0, 0, 510) - self.assertEquals('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6', - hexlify(sum).upper()) - f.close() + with sftp.open(FOLDER + '/kitty.txt', 'r') as f: + sum = f.check('sha1') + self.assertEqual('91059CFC6615941378D413CB5ADAF4C5EB293402', u(hexlify(sum)).upper()) + sum = f.check('md5', 0, 512) + self.assertEqual('93DE4788FCA28D471516963A1FE3856A', u(hexlify(sum)).upper()) + sum = f.check('md5', 0, 0, 510) + self.assertEqual('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6', + u(hexlify(sum)).upper()) finally: sftp.unlink(FOLDER + '/kitty.txt') @@ -611,14 +615,13 @@ class SFTPTest (unittest.TestCase): """ verify that the 'x' flag works when opening a file. """ - f = sftp.open(FOLDER + '/unusual.txt', 'wx') - f.close() + sftp.open(FOLDER + '/unusual.txt', 'wx').close() try: try: - f = sftp.open(FOLDER + '/unusual.txt', 'wx') + sftp.open(FOLDER + '/unusual.txt', 'wx') self.fail('expected exception') - except IOError, x: + except IOError: pass finally: sftp.unlink(FOLDER + '/unusual.txt') @@ -627,44 +630,39 @@ class SFTPTest (unittest.TestCase): """ verify that unicode strings are encoded into utf8 correctly. """ - f = sftp.open(FOLDER + '/something', 'w') - f.write('okay') - f.close() + with sftp.open(FOLDER + '/something', 'w') as f: + f.write('okay') try: - sftp.rename(FOLDER + '/something', FOLDER + u'/\u00fcnic\u00f8de') - sftp.open(FOLDER + '/\xc3\xbcnic\xc3\xb8\x64\x65', 'r') - except Exception, e: - self.fail('exception ' + e) - sftp.unlink(FOLDER + '/\xc3\xbcnic\xc3\xb8\x64\x65') + sftp.rename(FOLDER + '/something', FOLDER + '/' + unicode_folder) + sftp.open(b(FOLDER) + utf8_folder, 'r') + except Exception as e: + self.fail('exception ' + str(e)) + sftp.unlink(b(FOLDER) + utf8_folder) def test_L_utf8_chdir(self): - sftp.mkdir(FOLDER + u'\u00fcnic\u00f8de') + sftp.mkdir(FOLDER + '/' + unicode_folder) try: - sftp.chdir(FOLDER + u'\u00fcnic\u00f8de') - f = sftp.open('something', 'w') - f.write('okay') - f.close() + sftp.chdir(FOLDER + '/' + unicode_folder) + with sftp.open('something', 'w') as f: + f.write('okay') sftp.unlink('something') finally: - sftp.chdir(None) - sftp.rmdir(FOLDER + u'\u00fcnic\u00f8de') + sftp.chdir() + sftp.rmdir(FOLDER + '/' + unicode_folder) def test_M_bad_readv(self): """ verify that readv at the end of the file doesn't essplode. """ - f = sftp.open(FOLDER + '/zero', 'w') - f.close() + sftp.open(FOLDER + '/zero', 'w').close() try: - f = sftp.open(FOLDER + '/zero', 'r') - data = f.readv([(0, 12)]) - f.close() + with sftp.open(FOLDER + '/zero', 'r') as f: + f.readv([(0, 12)]) - f = sftp.open(FOLDER + '/zero', 'r') - f.prefetch() - data = f.read(100) - f.close() + with sftp.open(FOLDER + '/zero', 'r') as f: + f.prefetch() + f.read(100) finally: sftp.unlink(FOLDER + '/zero') @@ -672,48 +670,115 @@ class SFTPTest (unittest.TestCase): """ verify that get/put work without confirmation. """ - import os, warnings warnings.filterwarnings('ignore', 'tempnam.*') - localname = os.tempnam() - text = 'All I wanted was a plastic bunny rabbit.\n' - f = open(localname, 'wb') - f.write(text) - f.close() + fd, localname = mkstemp() + os.close(fd) + text = b'All I wanted was a plastic bunny rabbit.\n' + with open(localname, 'wb') as f: + f.write(text) saved_progress = [] + def progress_callback(x, y): saved_progress.append((x, y)) res = sftp.put(localname, FOLDER + '/bunny.txt', progress_callback, False) - - self.assertEquals(SFTPAttributes().attr, res.attr) - f = sftp.open(FOLDER + '/bunny.txt', 'r') - self.assertEquals(text, f.read(128)) - f.close() - self.assertEquals((41, 41), saved_progress[-1]) + self.assertEqual(SFTPAttributes().attr, res.attr) + + with sftp.open(FOLDER + '/bunny.txt', 'r') as f: + self.assertEqual(text, f.read(128)) + self.assertEqual((41, 41), saved_progress[-1]) os.unlink(localname) sftp.unlink(FOLDER + '/bunny.txt') + def test_O_getcwd(self): + """ + verify that chdir/getcwd work. + """ + self.assertEqual(None, sftp.getcwd()) + root = sftp.normalize('.') + if root[-1] != '/': + root += '/' + try: + sftp.mkdir(FOLDER + '/alpha') + sftp.chdir(FOLDER + '/alpha') + self.assertEqual('/' + FOLDER + '/alpha', sftp.getcwd()) + finally: + sftp.chdir(root) + try: + sftp.rmdir(FOLDER + '/alpha') + except: + pass + def XXX_test_M_seek_append(self): """ verify that seek does't affect writes during append. does not work except through paramiko. :( openssh fails. """ - f = sftp.open(FOLDER + '/append.txt', 'a') try: - f.write('first line\nsecond line\n') - f.seek(11, f.SEEK_SET) - f.write('third line\n') - f.close() + with sftp.open(FOLDER + '/append.txt', 'a') as f: + f.write('first line\nsecond line\n') + f.seek(11, f.SEEK_SET) + f.write('third line\n') + + with sftp.open(FOLDER + '/append.txt', 'r') as f: + self.assertEqual(f.stat().st_size, 34) + self.assertEqual(f.readline(), 'first line\n') + self.assertEqual(f.readline(), 'second line\n') + self.assertEqual(f.readline(), 'third line\n') + finally: + sftp.remove(FOLDER + '/append.txt') + + def test_putfo_empty_file(self): + """ + Send an empty file and confirm it is sent. + """ + target = FOLDER + '/empty file.txt' + stream = StringIO() + try: + attrs = sftp.putfo(stream, target) + # the returned attributes should not be null + self.assertNotEqual(attrs, None) + finally: + sftp.remove(target) + - f = sftp.open(FOLDER + '/append.txt', 'r') - self.assertEqual(f.stat().st_size, 34) - self.assertEqual(f.readline(), 'first line\n') - self.assertEqual(f.readline(), 'second line\n') - self.assertEqual(f.readline(), 'third line\n') + def test_N_file_with_percent(self): + """ + verify that we can create a file with a '%' in the filename. + ( it needs to be properly escaped by _log() ) + """ + self.assertTrue( paramiko.util.get_logger("paramiko").handlers, "This unit test requires logging to be enabled" ) + f = sftp.open(FOLDER + '/test%file', 'w') + try: + self.assertEqual(f.stat().st_size, 0) + finally: f.close() + sftp.remove(FOLDER + '/test%file') + + + def test_O_non_utf8_data(self): + """Test write() and read() of non utf8 data""" + try: + with sftp.open('%s/nonutf8data' % FOLDER, 'w') as f: + f.write(NON_UTF8_DATA) + with sftp.open('%s/nonutf8data' % FOLDER, 'r') as f: + data = f.read() + self.assertEqual(data, NON_UTF8_DATA) + with sftp.open('%s/nonutf8data' % FOLDER, 'wb') as f: + f.write(NON_UTF8_DATA) + with sftp.open('%s/nonutf8data' % FOLDER, 'rb') as f: + data = f.read() + self.assertEqual(data, NON_UTF8_DATA) finally: - sftp.remove(FOLDER + '/append.txt') + sftp.remove('%s/nonutf8data' % FOLDER) + +if __name__ == '__main__': + SFTPTest.init_loopback() + # logging is required by test_N_file_with_percent + paramiko.util.log_to_file('test_sftp.log') + from unittest import main + main() |