summaryrefslogtreecommitdiffhomepage
path: root/tests/test_sftp.py
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2014-09-05 10:39:00 -0700
committerJeff Forcier <jeff@bitprophet.org>2014-09-05 10:39:00 -0700
commit0e0460f85942e79c94b7db9d93abdf60bf1dbac4 (patch)
tree152b80b4a2822abcb85136f7fbc7f9263279eb14 /tests/test_sftp.py
parent683b3c22893be899d2fdbf1ada35e61fba8a3d70 (diff)
parenteb7da84bee49f6e7b568ee00fd5f3db0bb29f36a (diff)
Merge branch 'master' into 131-int
Diffstat (limited to 'tests/test_sftp.py')
-rwxr-xr-xtests/test_sftp.py523
1 files changed, 294 insertions, 229 deletions
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index 2eadabcd..2b6aa3b6 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -7,7 +7,7 @@
# Software Foundation; either version 2.1 of the License, or (at your option)
# any later version.
#
-# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
@@ -24,18 +24,20 @@ do test file operations in (so no existing files will be harmed).
"""
from binascii import hexlify
-import logging
import os
-import random
-import struct
import sys
+import warnings
import threading
-import time
import unittest
+from tempfile import mkstemp
import paramiko
-from stub_sftp import StubServer, StubSFTPServer
-from loop import LoopSocket
+from paramiko.py3compat import PY2, b, u, StringIO
+from paramiko.common import o777, o600, o666, o644
+from tests.stub_sftp import StubServer, StubSFTPServer
+from tests.loop import LoopSocket
+from tests.util import test_path
+import paramiko.util
from paramiko.sftp_attr import SFTPAttributes
ARTICLE = '''
@@ -65,11 +67,27 @@ liver insulin receptors. Their sensitivity to insulin is, however, similarly
decreased compared with chicken.
'''
+
+# Here is how unicode characters are encoded over 1 to 6 bytes in utf-8
+# U-00000000 - U-0000007F: 0xxxxxxx
+# U-00000080 - U-000007FF: 110xxxxx 10xxxxxx
+# U-00000800 - U-0000FFFF: 1110xxxx 10xxxxxx 10xxxxxx
+# U-00010000 - U-001FFFFF: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx
+# U-00200000 - U-03FFFFFF: 111110xx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx
+# U-04000000 - U-7FFFFFFF: 1111110x 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx
+# Note that: hex(int('11000011',2)) == '0xc3'
+# Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation byte"
+NON_UTF8_DATA = b'\xC3\xC3'
+
FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000')
sftp = None
tc = None
g_big_file_test = True
+# we need to use eval(compile()) here because Py3.2 doesn't support the 'u' marker for unicode
+# this test is the only line in the entire program that has to be treated specially to support Py3.2
+unicode_folder = eval(compile(r"u'\u00fcnic\u00f8de'" if PY2 else r"'\u00fcnic\u00f8de'", 'test_sftp.py', 'eval'))
+utf8_folder = b'/\xc3\xbcnic\xc3\xb8\x64\x65'
def get_sftp():
@@ -121,7 +139,7 @@ class SFTPTest (unittest.TestCase):
tc = paramiko.Transport(sockc)
ts = paramiko.Transport(socks)
- host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
ts.add_server_key(host_key)
event = threading.Event()
server = StubServer()
@@ -140,7 +158,7 @@ class SFTPTest (unittest.TestCase):
def setUp(self):
global FOLDER
- for i in xrange(1000):
+ for i in range(1000):
FOLDER = FOLDER[:-3] + '%03d' % i
try:
sftp.mkdir(FOLDER)
@@ -149,6 +167,7 @@ class SFTPTest (unittest.TestCase):
pass
def tearDown(self):
+ #sftp.chdir()
sftp.rmdir(FOLDER)
def test_1_file(self):
@@ -158,8 +177,8 @@ class SFTPTest (unittest.TestCase):
f = sftp.open(FOLDER + '/test', 'w')
try:
self.assertEqual(f.stat().st_size, 0)
- f.close()
finally:
+ f.close()
sftp.remove(FOLDER + '/test')
def test_2_close(self):
@@ -180,10 +199,20 @@ class SFTPTest (unittest.TestCase):
"""
verify that a file can be created and written, and the size is correct.
"""
- f = sftp.open(FOLDER + '/duck.txt', 'w')
try:
- f.write(ARTICLE)
- f.close()
+ with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ f.write(ARTICLE)
+ self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
+ finally:
+ sftp.remove(FOLDER + '/duck.txt')
+
+ def test_3_sftp_file_can_be_used_as_context_manager(self):
+ """
+ verify that an opened file can be used as a context manager
+ """
+ try:
+ with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ f.write(ARTICLE)
self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
finally:
sftp.remove(FOLDER + '/duck.txt')
@@ -192,19 +221,17 @@ class SFTPTest (unittest.TestCase):
"""
verify that a file can be opened for append, and tell() still works.
"""
- f = sftp.open(FOLDER + '/append.txt', 'w')
try:
- f.write('first line\nsecond line\n')
- self.assertEqual(f.tell(), 23)
- f.close()
-
- f = sftp.open(FOLDER + '/append.txt', 'a+')
- f.write('third line!!!\n')
- self.assertEqual(f.tell(), 37)
- self.assertEqual(f.stat().st_size, 37)
- f.seek(-26, f.SEEK_CUR)
- self.assertEqual(f.readline(), 'second line\n')
- f.close()
+ with sftp.open(FOLDER + '/append.txt', 'w') as f:
+ f.write('first line\nsecond line\n')
+ self.assertEqual(f.tell(), 23)
+
+ with sftp.open(FOLDER + '/append.txt', 'a+') as f:
+ f.write('third line!!!\n')
+ self.assertEqual(f.tell(), 37)
+ self.assertEqual(f.stat().st_size, 37)
+ f.seek(-26, f.SEEK_CUR)
+ self.assertEqual(f.readline(), 'second line\n')
finally:
sftp.remove(FOLDER + '/append.txt')
@@ -212,20 +239,18 @@ class SFTPTest (unittest.TestCase):
"""
verify that renaming a file works.
"""
- f = sftp.open(FOLDER + '/first.txt', 'w')
try:
- f.write('content!\n');
- f.close()
+ with sftp.open(FOLDER + '/first.txt', 'w') as f:
+ f.write('content!\n')
sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt')
try:
- f = sftp.open(FOLDER + '/first.txt', 'r')
- self.assert_(False, 'no exception on reading nonexistent file')
+ sftp.open(FOLDER + '/first.txt', 'r')
+ self.assertTrue(False, 'no exception on reading nonexistent file')
except IOError:
pass
- f = sftp.open(FOLDER + '/second.txt', 'r')
- f.seek(-6, f.SEEK_END)
- self.assertEqual(f.read(4), 'tent')
- f.close()
+ with sftp.open(FOLDER + '/second.txt', 'r') as f:
+ f.seek(-6, f.SEEK_END)
+ self.assertEqual(u(f.read(4)), 'tent')
finally:
try:
sftp.remove(FOLDER + '/first.txt')
@@ -242,14 +267,13 @@ class SFTPTest (unittest.TestCase):
remove the folder and verify that we can't create a file in it anymore.
"""
sftp.mkdir(FOLDER + '/subfolder')
- f = sftp.open(FOLDER + '/subfolder/test', 'w')
- f.close()
+ sftp.open(FOLDER + '/subfolder/test', 'w').close()
sftp.remove(FOLDER + '/subfolder/test')
sftp.rmdir(FOLDER + '/subfolder')
try:
- f = sftp.open(FOLDER + '/subfolder/test')
+ sftp.open(FOLDER + '/subfolder/test')
# shouldn't be able to create that file
- self.assert_(False, 'no exception at dummy file creation')
+ self.assertTrue(False, 'no exception at dummy file creation')
except IOError:
pass
@@ -259,21 +283,16 @@ class SFTPTest (unittest.TestCase):
and those files show up in sftp.listdir.
"""
try:
- f = sftp.open(FOLDER + '/duck.txt', 'w')
- f.close()
-
- f = sftp.open(FOLDER + '/fish.txt', 'w')
- f.close()
-
- f = sftp.open(FOLDER + '/tertiary.py', 'w')
- f.close()
+ sftp.open(FOLDER + '/duck.txt', 'w').close()
+ sftp.open(FOLDER + '/fish.txt', 'w').close()
+ sftp.open(FOLDER + '/tertiary.py', 'w').close()
x = sftp.listdir(FOLDER)
self.assertEqual(len(x), 3)
- self.assert_('duck.txt' in x)
- self.assert_('fish.txt' in x)
- self.assert_('tertiary.py' in x)
- self.assert_('random' not in x)
+ self.assertTrue('duck.txt' in x)
+ self.assertTrue('fish.txt' in x)
+ self.assertTrue('tertiary.py' in x)
+ self.assertTrue('random' not in x)
finally:
sftp.remove(FOLDER + '/duck.txt')
sftp.remove(FOLDER + '/fish.txt')
@@ -283,22 +302,21 @@ class SFTPTest (unittest.TestCase):
"""
verify that the setstat functions (chown, chmod, utime, truncate) work.
"""
- f = sftp.open(FOLDER + '/special', 'w')
try:
- f.write('x' * 1024)
- f.close()
+ with sftp.open(FOLDER + '/special', 'w') as f:
+ f.write('x' * 1024)
stat = sftp.stat(FOLDER + '/special')
- sftp.chmod(FOLDER + '/special', (stat.st_mode & ~0777) | 0600)
+ sftp.chmod(FOLDER + '/special', (stat.st_mode & ~o777) | o600)
stat = sftp.stat(FOLDER + '/special')
- expected_mode = 0600
+ expected_mode = o600
if sys.platform == 'win32':
# chmod not really functional on windows
- expected_mode = 0666
+ expected_mode = o666
if sys.platform == 'cygwin':
# even worse.
- expected_mode = 0644
- self.assertEqual(stat.st_mode & 0777, expected_mode)
+ expected_mode = o644
+ self.assertEqual(stat.st_mode & o777, expected_mode)
self.assertEqual(stat.st_size, 1024)
mtime = stat.st_mtime - 3600
@@ -322,40 +340,38 @@ class SFTPTest (unittest.TestCase):
verify that the fsetstat functions (chown, chmod, utime, truncate)
work on open files.
"""
- f = sftp.open(FOLDER + '/special', 'w')
try:
- f.write('x' * 1024)
- f.close()
-
- f = sftp.open(FOLDER + '/special', 'r+')
- stat = f.stat()
- f.chmod((stat.st_mode & ~0777) | 0600)
- stat = f.stat()
-
- expected_mode = 0600
- if sys.platform == 'win32':
- # chmod not really functional on windows
- expected_mode = 0666
- if sys.platform == 'cygwin':
- # even worse.
- expected_mode = 0644
- self.assertEqual(stat.st_mode & 0777, expected_mode)
- self.assertEqual(stat.st_size, 1024)
-
- mtime = stat.st_mtime - 3600
- atime = stat.st_atime - 1800
- f.utime((atime, mtime))
- stat = f.stat()
- self.assertEqual(stat.st_mtime, mtime)
- if sys.platform not in ('win32', 'cygwin'):
- self.assertEqual(stat.st_atime, atime)
-
- # can't really test chown, since we'd have to know a valid uid.
-
- f.truncate(512)
- stat = f.stat()
- self.assertEqual(stat.st_size, 512)
- f.close()
+ with sftp.open(FOLDER + '/special', 'w') as f:
+ f.write('x' * 1024)
+
+ with sftp.open(FOLDER + '/special', 'r+') as f:
+ stat = f.stat()
+ f.chmod((stat.st_mode & ~o777) | o600)
+ stat = f.stat()
+
+ expected_mode = o600
+ if sys.platform == 'win32':
+ # chmod not really functional on windows
+ expected_mode = o666
+ if sys.platform == 'cygwin':
+ # even worse.
+ expected_mode = o644
+ self.assertEqual(stat.st_mode & o777, expected_mode)
+ self.assertEqual(stat.st_size, 1024)
+
+ mtime = stat.st_mtime - 3600
+ atime = stat.st_atime - 1800
+ f.utime((atime, mtime))
+ stat = f.stat()
+ self.assertEqual(stat.st_mtime, mtime)
+ if sys.platform not in ('win32', 'cygwin'):
+ self.assertEqual(stat.st_atime, atime)
+
+ # can't really test chown, since we'd have to know a valid uid.
+
+ f.truncate(512)
+ stat = f.stat()
+ self.assertEqual(stat.st_size, 512)
finally:
sftp.remove(FOLDER + '/special')
@@ -367,25 +383,23 @@ class SFTPTest (unittest.TestCase):
buffering is reset on 'seek'.
"""
try:
- f = sftp.open(FOLDER + '/duck.txt', 'w')
- f.write(ARTICLE)
- f.close()
-
- f = sftp.open(FOLDER + '/duck.txt', 'r+')
- line_number = 0
- loc = 0
- pos_list = []
- for line in f:
- line_number += 1
- pos_list.append(loc)
- loc = f.tell()
- f.seek(pos_list[6], f.SEEK_SET)
- self.assertEqual(f.readline(), 'Nouzilly, France.\n')
- f.seek(pos_list[17], f.SEEK_SET)
- self.assertEqual(f.readline()[:4], 'duck')
- f.seek(pos_list[10], f.SEEK_SET)
- self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n')
- f.close()
+ with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ f.write(ARTICLE)
+
+ with sftp.open(FOLDER + '/duck.txt', 'r+') as f:
+ line_number = 0
+ loc = 0
+ pos_list = []
+ for line in f:
+ line_number += 1
+ pos_list.append(loc)
+ loc = f.tell()
+ f.seek(pos_list[6], f.SEEK_SET)
+ self.assertEqual(f.readline(), 'Nouzilly, France.\n')
+ f.seek(pos_list[17], f.SEEK_SET)
+ self.assertEqual(f.readline()[:4], 'duck')
+ f.seek(pos_list[10], f.SEEK_SET)
+ self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n')
finally:
sftp.remove(FOLDER + '/duck.txt')
@@ -394,18 +408,16 @@ class SFTPTest (unittest.TestCase):
create a text file, seek back and change part of it, and verify that the
changes worked.
"""
- f = sftp.open(FOLDER + '/testing.txt', 'w')
try:
- f.write('hello kitty.\n')
- f.seek(-5, f.SEEK_CUR)
- f.write('dd')
- f.close()
+ with sftp.open(FOLDER + '/testing.txt', 'w') as f:
+ f.write('hello kitty.\n')
+ f.seek(-5, f.SEEK_CUR)
+ f.write('dd')
self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13)
- f = sftp.open(FOLDER + '/testing.txt', 'r')
- data = f.read(20)
- f.close()
- self.assertEqual(data, 'hello kiddy.\n')
+ with sftp.open(FOLDER + '/testing.txt', 'r') as f:
+ data = f.read(20)
+ self.assertEqual(data, b'hello kiddy.\n')
finally:
sftp.remove(FOLDER + '/testing.txt')
@@ -417,16 +429,14 @@ class SFTPTest (unittest.TestCase):
# skip symlink tests on windows
return
- f = sftp.open(FOLDER + '/original.txt', 'w')
try:
- f.write('original\n')
- f.close()
+ with sftp.open(FOLDER + '/original.txt', 'w') as f:
+ f.write('original\n')
sftp.symlink('original.txt', FOLDER + '/link.txt')
self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt')
- f = sftp.open(FOLDER + '/link.txt', 'r')
- self.assertEqual(f.readlines(), [ 'original\n' ])
- f.close()
+ with sftp.open(FOLDER + '/link.txt', 'r') as f:
+ self.assertEqual(f.readlines(), ['original\n'])
cwd = sftp.normalize('.')
if cwd[-1] == '/':
@@ -439,7 +449,7 @@ class SFTPTest (unittest.TestCase):
self.assertEqual(sftp.stat(FOLDER + '/link.txt').st_size, 9)
# the sftp server may be hiding extra path members from us, so the
# length may be longer than we expect:
- self.assert_(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path))
+ self.assertTrue(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path))
self.assertEqual(sftp.stat(FOLDER + '/link2.txt').st_size, 9)
self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9)
finally:
@@ -460,18 +470,16 @@ class SFTPTest (unittest.TestCase):
"""
verify that buffered writes are automatically flushed on seek.
"""
- f = sftp.open(FOLDER + '/happy.txt', 'w', 1)
try:
- f.write('full line.\n')
- f.write('partial')
- f.seek(9, f.SEEK_SET)
- f.write('?\n')
- f.close()
-
- f = sftp.open(FOLDER + '/happy.txt', 'r')
- self.assertEqual(f.readline(), 'full line?\n')
- self.assertEqual(f.read(7), 'partial')
- f.close()
+ with sftp.open(FOLDER + '/happy.txt', 'w', 1) as f:
+ f.write('full line.\n')
+ f.write('partial')
+ f.seek(9, f.SEEK_SET)
+ f.write('?\n')
+
+ with sftp.open(FOLDER + '/happy.txt', 'r') as f:
+ self.assertEqual(f.readline(), u('full line?\n'))
+ self.assertEqual(f.read(7), b'partial')
finally:
try:
sftp.remove(FOLDER + '/happy.txt')
@@ -484,10 +492,10 @@ class SFTPTest (unittest.TestCase):
error.
"""
pwd = sftp.normalize('.')
- self.assert_(len(pwd) > 0)
+ self.assertTrue(len(pwd) > 0)
f = sftp.normalize('./' + FOLDER)
- self.assert_(len(f) > 0)
- self.assertEquals(os.path.join(pwd, FOLDER), f)
+ self.assertTrue(len(f) > 0)
+ self.assertEqual(os.path.join(pwd, FOLDER), f)
def test_F_mkdir(self):
"""
@@ -496,19 +504,19 @@ class SFTPTest (unittest.TestCase):
try:
sftp.mkdir(FOLDER + '/subfolder')
except:
- self.assert_(False, 'exception creating subfolder')
+ self.assertTrue(False, 'exception creating subfolder')
try:
sftp.mkdir(FOLDER + '/subfolder')
- self.assert_(False, 'no exception overwriting subfolder')
+ self.assertTrue(False, 'no exception overwriting subfolder')
except IOError:
pass
try:
sftp.rmdir(FOLDER + '/subfolder')
except:
- self.assert_(False, 'exception removing subfolder')
+ self.assertTrue(False, 'exception removing subfolder')
try:
sftp.rmdir(FOLDER + '/subfolder')
- self.assert_(False, 'no exception removing nonexistent subfolder')
+ self.assertTrue(False, 'no exception removing nonexistent subfolder')
except IOError:
pass
@@ -523,17 +531,16 @@ class SFTPTest (unittest.TestCase):
sftp.mkdir(FOLDER + '/alpha')
sftp.chdir(FOLDER + '/alpha')
sftp.mkdir('beta')
- self.assertEquals(root + FOLDER + '/alpha', sftp.getcwd())
- self.assertEquals(['beta'], sftp.listdir('.'))
+ self.assertEqual(root + FOLDER + '/alpha', sftp.getcwd())
+ self.assertEqual(['beta'], sftp.listdir('.'))
sftp.chdir('beta')
- f = sftp.open('fish', 'w')
- f.write('hello\n')
- f.close()
+ with sftp.open('fish', 'w') as f:
+ f.write('hello\n')
sftp.chdir('..')
- self.assertEquals(['fish'], sftp.listdir('beta'))
+ self.assertEqual(['fish'], sftp.listdir('beta'))
sftp.chdir('..')
- self.assertEquals(['fish'], sftp.listdir('alpha/beta'))
+ self.assertEqual(['fish'], sftp.listdir('alpha/beta'))
finally:
sftp.chdir(root)
try:
@@ -553,33 +560,32 @@ class SFTPTest (unittest.TestCase):
"""
verify that get/put work.
"""
- import os, warnings
warnings.filterwarnings('ignore', 'tempnam.*')
- localname = os.tempnam()
- text = 'All I wanted was a plastic bunny rabbit.\n'
- f = open(localname, 'wb')
- f.write(text)
- f.close()
+ fd, localname = mkstemp()
+ os.close(fd)
+ text = b'All I wanted was a plastic bunny rabbit.\n'
+ with open(localname, 'wb') as f:
+ f.write(text)
saved_progress = []
+
def progress_callback(x, y):
saved_progress.append((x, y))
sftp.put(localname, FOLDER + '/bunny.txt', progress_callback)
- f = sftp.open(FOLDER + '/bunny.txt', 'r')
- self.assertEquals(text, f.read(128))
- f.close()
- self.assertEquals((41, 41), saved_progress[-1])
+ with sftp.open(FOLDER + '/bunny.txt', 'rb') as f:
+ self.assertEqual(text, f.read(128))
+ self.assertEqual((41, 41), saved_progress[-1])
os.unlink(localname)
- localname = os.tempnam()
+ fd, localname = mkstemp()
+ os.close(fd)
saved_progress = []
sftp.get(FOLDER + '/bunny.txt', localname, progress_callback)
- f = open(localname, 'rb')
- self.assertEquals(text, f.read(128))
- f.close()
- self.assertEquals((41, 41), saved_progress[-1])
+ with open(localname, 'rb') as f:
+ self.assertEqual(text, f.read(128))
+ self.assertEqual((41, 41), saved_progress[-1])
os.unlink(localname)
sftp.unlink(FOLDER + '/bunny.txt')
@@ -590,20 +596,18 @@ class SFTPTest (unittest.TestCase):
(it's an sftp extension that we support, and may be the only ones who
support it.)
"""
- f = sftp.open(FOLDER + '/kitty.txt', 'w')
- f.write('here kitty kitty' * 64)
- f.close()
+ with sftp.open(FOLDER + '/kitty.txt', 'w') as f:
+ f.write('here kitty kitty' * 64)
try:
- f = sftp.open(FOLDER + '/kitty.txt', 'r')
- sum = f.check('sha1')
- self.assertEquals('91059CFC6615941378D413CB5ADAF4C5EB293402', hexlify(sum).upper())
- sum = f.check('md5', 0, 512)
- self.assertEquals('93DE4788FCA28D471516963A1FE3856A', hexlify(sum).upper())
- sum = f.check('md5', 0, 0, 510)
- self.assertEquals('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6',
- hexlify(sum).upper())
- f.close()
+ with sftp.open(FOLDER + '/kitty.txt', 'r') as f:
+ sum = f.check('sha1')
+ self.assertEqual('91059CFC6615941378D413CB5ADAF4C5EB293402', u(hexlify(sum)).upper())
+ sum = f.check('md5', 0, 512)
+ self.assertEqual('93DE4788FCA28D471516963A1FE3856A', u(hexlify(sum)).upper())
+ sum = f.check('md5', 0, 0, 510)
+ self.assertEqual('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6',
+ u(hexlify(sum)).upper())
finally:
sftp.unlink(FOLDER + '/kitty.txt')
@@ -611,14 +615,13 @@ class SFTPTest (unittest.TestCase):
"""
verify that the 'x' flag works when opening a file.
"""
- f = sftp.open(FOLDER + '/unusual.txt', 'wx')
- f.close()
+ sftp.open(FOLDER + '/unusual.txt', 'wx').close()
try:
try:
- f = sftp.open(FOLDER + '/unusual.txt', 'wx')
+ sftp.open(FOLDER + '/unusual.txt', 'wx')
self.fail('expected exception')
- except IOError, x:
+ except IOError:
pass
finally:
sftp.unlink(FOLDER + '/unusual.txt')
@@ -627,44 +630,39 @@ class SFTPTest (unittest.TestCase):
"""
verify that unicode strings are encoded into utf8 correctly.
"""
- f = sftp.open(FOLDER + '/something', 'w')
- f.write('okay')
- f.close()
+ with sftp.open(FOLDER + '/something', 'w') as f:
+ f.write('okay')
try:
- sftp.rename(FOLDER + '/something', FOLDER + u'/\u00fcnic\u00f8de')
- sftp.open(FOLDER + '/\xc3\xbcnic\xc3\xb8\x64\x65', 'r')
- except Exception, e:
- self.fail('exception ' + e)
- sftp.unlink(FOLDER + '/\xc3\xbcnic\xc3\xb8\x64\x65')
+ sftp.rename(FOLDER + '/something', FOLDER + '/' + unicode_folder)
+ sftp.open(b(FOLDER) + utf8_folder, 'r')
+ except Exception as e:
+ self.fail('exception ' + str(e))
+ sftp.unlink(b(FOLDER) + utf8_folder)
def test_L_utf8_chdir(self):
- sftp.mkdir(FOLDER + u'\u00fcnic\u00f8de')
+ sftp.mkdir(FOLDER + '/' + unicode_folder)
try:
- sftp.chdir(FOLDER + u'\u00fcnic\u00f8de')
- f = sftp.open('something', 'w')
- f.write('okay')
- f.close()
+ sftp.chdir(FOLDER + '/' + unicode_folder)
+ with sftp.open('something', 'w') as f:
+ f.write('okay')
sftp.unlink('something')
finally:
- sftp.chdir(None)
- sftp.rmdir(FOLDER + u'\u00fcnic\u00f8de')
+ sftp.chdir()
+ sftp.rmdir(FOLDER + '/' + unicode_folder)
def test_M_bad_readv(self):
"""
verify that readv at the end of the file doesn't essplode.
"""
- f = sftp.open(FOLDER + '/zero', 'w')
- f.close()
+ sftp.open(FOLDER + '/zero', 'w').close()
try:
- f = sftp.open(FOLDER + '/zero', 'r')
- data = f.readv([(0, 12)])
- f.close()
+ with sftp.open(FOLDER + '/zero', 'r') as f:
+ f.readv([(0, 12)])
- f = sftp.open(FOLDER + '/zero', 'r')
- f.prefetch()
- data = f.read(100)
- f.close()
+ with sftp.open(FOLDER + '/zero', 'r') as f:
+ f.prefetch()
+ f.read(100)
finally:
sftp.unlink(FOLDER + '/zero')
@@ -672,48 +670,115 @@ class SFTPTest (unittest.TestCase):
"""
verify that get/put work without confirmation.
"""
- import os, warnings
warnings.filterwarnings('ignore', 'tempnam.*')
- localname = os.tempnam()
- text = 'All I wanted was a plastic bunny rabbit.\n'
- f = open(localname, 'wb')
- f.write(text)
- f.close()
+ fd, localname = mkstemp()
+ os.close(fd)
+ text = b'All I wanted was a plastic bunny rabbit.\n'
+ with open(localname, 'wb') as f:
+ f.write(text)
saved_progress = []
+
def progress_callback(x, y):
saved_progress.append((x, y))
res = sftp.put(localname, FOLDER + '/bunny.txt', progress_callback, False)
-
- self.assertEquals(SFTPAttributes().attr, res.attr)
- f = sftp.open(FOLDER + '/bunny.txt', 'r')
- self.assertEquals(text, f.read(128))
- f.close()
- self.assertEquals((41, 41), saved_progress[-1])
+ self.assertEqual(SFTPAttributes().attr, res.attr)
+
+ with sftp.open(FOLDER + '/bunny.txt', 'r') as f:
+ self.assertEqual(text, f.read(128))
+ self.assertEqual((41, 41), saved_progress[-1])
os.unlink(localname)
sftp.unlink(FOLDER + '/bunny.txt')
+ def test_O_getcwd(self):
+ """
+ verify that chdir/getcwd work.
+ """
+ self.assertEqual(None, sftp.getcwd())
+ root = sftp.normalize('.')
+ if root[-1] != '/':
+ root += '/'
+ try:
+ sftp.mkdir(FOLDER + '/alpha')
+ sftp.chdir(FOLDER + '/alpha')
+ self.assertEqual('/' + FOLDER + '/alpha', sftp.getcwd())
+ finally:
+ sftp.chdir(root)
+ try:
+ sftp.rmdir(FOLDER + '/alpha')
+ except:
+ pass
+
def XXX_test_M_seek_append(self):
"""
verify that seek does't affect writes during append.
does not work except through paramiko. :( openssh fails.
"""
- f = sftp.open(FOLDER + '/append.txt', 'a')
try:
- f.write('first line\nsecond line\n')
- f.seek(11, f.SEEK_SET)
- f.write('third line\n')
- f.close()
+ with sftp.open(FOLDER + '/append.txt', 'a') as f:
+ f.write('first line\nsecond line\n')
+ f.seek(11, f.SEEK_SET)
+ f.write('third line\n')
+
+ with sftp.open(FOLDER + '/append.txt', 'r') as f:
+ self.assertEqual(f.stat().st_size, 34)
+ self.assertEqual(f.readline(), 'first line\n')
+ self.assertEqual(f.readline(), 'second line\n')
+ self.assertEqual(f.readline(), 'third line\n')
+ finally:
+ sftp.remove(FOLDER + '/append.txt')
+
+ def test_putfo_empty_file(self):
+ """
+ Send an empty file and confirm it is sent.
+ """
+ target = FOLDER + '/empty file.txt'
+ stream = StringIO()
+ try:
+ attrs = sftp.putfo(stream, target)
+ # the returned attributes should not be null
+ self.assertNotEqual(attrs, None)
+ finally:
+ sftp.remove(target)
+
- f = sftp.open(FOLDER + '/append.txt', 'r')
- self.assertEqual(f.stat().st_size, 34)
- self.assertEqual(f.readline(), 'first line\n')
- self.assertEqual(f.readline(), 'second line\n')
- self.assertEqual(f.readline(), 'third line\n')
+ def test_N_file_with_percent(self):
+ """
+ verify that we can create a file with a '%' in the filename.
+ ( it needs to be properly escaped by _log() )
+ """
+ self.assertTrue( paramiko.util.get_logger("paramiko").handlers, "This unit test requires logging to be enabled" )
+ f = sftp.open(FOLDER + '/test%file', 'w')
+ try:
+ self.assertEqual(f.stat().st_size, 0)
+ finally:
f.close()
+ sftp.remove(FOLDER + '/test%file')
+
+
+ def test_O_non_utf8_data(self):
+ """Test write() and read() of non utf8 data"""
+ try:
+ with sftp.open('%s/nonutf8data' % FOLDER, 'w') as f:
+ f.write(NON_UTF8_DATA)
+ with sftp.open('%s/nonutf8data' % FOLDER, 'r') as f:
+ data = f.read()
+ self.assertEqual(data, NON_UTF8_DATA)
+ with sftp.open('%s/nonutf8data' % FOLDER, 'wb') as f:
+ f.write(NON_UTF8_DATA)
+ with sftp.open('%s/nonutf8data' % FOLDER, 'rb') as f:
+ data = f.read()
+ self.assertEqual(data, NON_UTF8_DATA)
finally:
- sftp.remove(FOLDER + '/append.txt')
+ sftp.remove('%s/nonutf8data' % FOLDER)
+
+if __name__ == '__main__':
+ SFTPTest.init_loopback()
+ # logging is required by test_N_file_with_percent
+ paramiko.util.log_to_file('test_sftp.log')
+ from unittest import main
+ main()