From 14f2193d3979c910d205282a43ca2da8f85f3915 Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Mon, 8 Mar 2004 17:54:19 +0000 Subject: [project @ Arch-1:robey@lag.net--2003-public%secsh--dev--1.0--patch-32] add unit tests add unit tests for BufferedFile and SFTP (it's a start). remove the demo sftp client because it was 99% copied from the other demos, which makes it kinda confusing. the unit tests are a much better example. --- tests/test_file.py | 139 ++++++++++++++++++++++++ tests/test_sftp.py | 303 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 442 insertions(+) create mode 100755 tests/test_file.py create mode 100755 tests/test_sftp.py (limited to 'tests') diff --git a/tests/test_file.py b/tests/test_file.py new file mode 100755 index 00000000..222b6818 --- /dev/null +++ b/tests/test_file.py @@ -0,0 +1,139 @@ +#!/usr/bin/python + +# Copyright (C) 2003-2004 Robey Pointer +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Foobar; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Some unit tests for the BufferedFile abstraction. +""" + +import unittest +from paramiko.file import BufferedFile + + +class LoopbackFile (BufferedFile): + """ + BufferedFile object that you can write data into, and then read it back. + """ + def __init__(self, mode='r', bufsize=-1): + BufferedFile.__init__(self) + self._set_mode(mode, bufsize) + self.buffer = '' + + def _read(self, size): + if len(self.buffer) == 0: + return None + if size > len(self.buffer): + size = len(self.buffer) + data = self.buffer[:size] + self.buffer = self.buffer[size:] + return data + + def _write(self, data): + self.buffer += data + return len(data) + + +class BufferedFileTest (unittest.TestCase): + + def test_1_simple(self): + f = LoopbackFile('r') + try: + f.write('hi') + self.assert_(False, 'no exception on write to read-only file') + except: + pass + f.close() + + f = LoopbackFile('w') + try: + f.read(1) + self.assert_(False, 'no exception to read from write-only file') + except: + pass + f.close() + + def test_2_readline(self): + f = LoopbackFile('r+U') + f.write('First line.\nSecond line.\r\nFinal line non-terminated.') + self.assertEqual(f.readline(), 'First line.\n') + # universal newline mode should convert this linefeed: + self.assertEqual(f.readline(), 'Second line.\n') + self.assertEqual(f.readline(), 'Final line non-terminated.') + self.assertEqual(f.readline(), '') + f.close() + try: + f.readline() + self.assert_(False, 'no exception on readline of closed file') + except IOError: + pass + self.assert_('\n' in f.newlines) + self.assert_('\r\n' in f.newlines) + self.assert_('\r' not in f.newlines) + + def test_3_lf(self): + """ + try to trick the linefeed detector. + """ + f = LoopbackFile('r+U') + f.write('First line.\r') + self.assertEqual(f.readline(), 'First line.\n') + f.write('\nSecond.\r\n') + self.assertEqual(f.readline(), 'Second.\n') + f.close() + self.assertEqual(f.newlines, '\r\n') + + def test_4_write(self): + """ + verify that write buffering is on. + """ + f = LoopbackFile('r+', 1) + f.write('Complete line.\nIncomplete line.') + self.assertEqual(f.readline(), 'Complete line.\n') + self.assertEqual(f.readline(), '') + f.write('..\n') + self.assertEqual(f.readline(), 'Incomplete line...\n') + f.close() + + def test_5_flush(self): + """ + verify that flush will force a write. + """ + f = LoopbackFile('r+', 512) + f.write('Not\nquite\n512 bytes.\n') + self.assertEqual(f.read(1), '') + f.flush() + self.assertEqual(f.read(5), 'Not\nq') + self.assertEqual(f.read(10), 'uite\n512 b') + self.assertEqual(f.read(9), 'ytes.\n') + self.assertEqual(f.read(3), '') + f.close() + + def test_6_buffering(self): + """ + verify that flushing happens automatically on buffer crossing. + """ + f = LoopbackFile('r+', 16) + f.write('Too small.') + self.assertEqual(f.read(4), '') + f.write(' ') + self.assertEqual(f.read(4), '') + f.write('Enough.') + self.assertEqual(f.read(20), 'Too small. Enough.') + f.close() + diff --git a/tests/test_sftp.py b/tests/test_sftp.py new file mode 100755 index 00000000..798853f6 --- /dev/null +++ b/tests/test_sftp.py @@ -0,0 +1,303 @@ +#!/usr/bin/python + +# Copyright (C) 2003-2004 Robey Pointer +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Foobar; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +some unit tests to make sure sftp works. + +a real actual sftp server is contacted, and a new folder is created there to +do test file operations in (so no existing files will be harmed). +""" + +import sys, os + +# need a host and private-key where we have acecss +HOST = os.environ.get('TEST_HOST', 'localhost') +USER = os.environ.get('TEST_USER', os.environ.get('USER', 'nobody')) +PKEY = os.environ.get('TEST_PKEY', os.path.join(os.environ.get('HOME', '/'), '.ssh/id_rsa')) +FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing') + +import paramiko, logging, unittest + +ARTICLE = ''' +Insulin sensitivity and liver insulin receptor structure in ducks from two +genera + +T. Constans, B. Chevalier, M. Derouet and J. Simon +Station de Recherches Avicoles, Institut National de la Recherche Agronomique, +Nouzilly, France. + +Insulin sensitivity and liver insulin receptor structure were studied in +5-wk-old ducks from two genera (Muscovy and Pekin). In the fasting state, both +duck types were equally resistant to exogenous insulin compared with chicken. +Despite the low potency of duck insulin, the number of insulin receptors was +lower in Muscovy duck and similar in Pekin duck and chicken liver membranes. +After 125I-insulin cross-linking, the size of the alpha-subunit of the +receptors from the three species was 135,000. Wheat germ agglutinin-purified +receptors from the three species were contaminated by an active and unusual +adenosinetriphosphatase (ATPase) contaminant (highest activity in Muscovy +duck). Sequential purification of solubilized receptor from both duck types on +lentil and then wheat germ agglutinin lectins led to a fraction of receptors +very poor in ATPase activity that exhibited a beta-subunit size (95,000) and +tyrosine kinase activity similar to those of ATPase-free chicken insulin +receptors. Therefore the ducks from the two genera exhibit an alpha-beta- +structure for liver insulin receptors and a clear difference in the number of +liver insulin receptors. Their sensitivity to insulin is, however, similarly +decreased compared with chicken. +''' + + +# setup logging +l = logging.getLogger('paramiko') +l.setLevel(logging.DEBUG) +if len(l.handlers) == 0: + f = open('test.log', 'w') + lh = logging.StreamHandler(f) + lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s] %(name)s: %(message)s', '%Y%m%d:%H%M%S')) + l.addHandler(lh) +t = paramiko.Transport(HOST) +key = paramiko.RSAKey() +key.read_private_key_file(PKEY) +try: + t.connect(username=USER, pkey=key) +except paramiko.SSHException: + t.close() + sys.stderr.write('\n\nparamiko.Transport.connect FAILED.\n') + sys.stderr.write('There are several possible reasons why it might fail so quickly:\n\n') + sys.stderr.write('* The host to connect to (%s) is not a valid SSH server.\n' % HOST) + sys.stderr.write(' (Override the SSH host with environment variable TEST_HOST.)\n') + sys.stderr.write('* The username to auth as (%s) is invalid.\n' % USER) + sys.stderr.write(' (Override the auth user with environment variable TEST_USER.)\n') + sys.stderr.write('* The private key given (%s) is not accepted by the server.\n' % PKEY) + sys.stderr.write(' (Override the private key location with environment variable TEST_PKEY.)\n') + sys.stderr.write('\n') + sys.exit(1) +sftp = paramiko.SFTP.from_transport(t) + + +class SFTPTest (unittest.TestCase): + + def setUp(self): + sftp.mkdir(FOLDER) + + def tearDown(self): + sftp.rmdir(FOLDER) + + def test_1_folder(self): + """ + create a temporary folder, verify that we can create a file in it, then + remove the folder and verify that we can't create a file in it anymore. + """ + f = sftp.open(FOLDER + '/test', 'w') + try: + self.assertEqual(f.stat().st_size, 0) + f.close() + try: + f = sftp.open(FOLDER + '/test', 'w') + # shouldn't be able to create that file + self.assert_(False, 'no exception at dummy file creation') + except: + pass + finally: + sftp.remove(FOLDER + '/test') + + def test_2_write(self): + """ + verify that a file can be created and written, and the size is correct. + """ + f = sftp.open(FOLDER + '/duck.txt', 'w') + try: + f.write(ARTICLE) + f.close() + self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) + finally: + sftp.remove(FOLDER + '/duck.txt') + + def test_3_append(self): + """ + verify that a file can be opened for append, and tell() still works. + """ + f = sftp.open(FOLDER + '/append.txt', 'w') + try: + f.write('first line\nsecond line\n') + self.assertEqual(f.tell(), 23) + f.close() + + f = sftp.open(FOLDER + '/append.txt', 'a+') + f.write('third line!!!\n') + self.assertEqual(f.tell(), 37) + self.assertEqual(f.stat().st_size, 37) + f.seek(-26, f.SEEK_CUR) + self.assertEqual(f.readline(), 'second line\n') + f.close() + finally: + sftp.remove(FOLDER + '/append.txt') + + def test_4_rename(self): + """ + verify that renaming a file works. + """ + f = sftp.open(FOLDER + '/first.txt', 'w') + try: + f.write('content!\n'); + f.close() + sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt') + try: + f = sftp.open(FOLDER + '/first.txt', 'r') + self.assert_(False, 'no exception on reading nonexistent file') + except: + pass + f = sftp.open(FOLDER + '/second.txt', 'r') + f.seek(-6, f.SEEK_END) + self.assertEqual(f.read(4), 'tent') + f.close() + finally: + try: + sftp.remove(FOLDER + '/first.txt') + except: + pass + try: + sftp.remove(FOLDER + '/second.txt') + except: + pass + + def test_5_listdir(self): + """ + verify that a folder can be created, a bunch of files can be placed in it, + and those files show up in sftp.listdir. + """ + try: + f = sftp.open(FOLDER + '/duck.txt', 'w') + f.close() + + f = sftp.open(FOLDER + '/fish.txt', 'w') + f.close() + + f = sftp.open(FOLDER + '/tertiary.py', 'w') + f.close() + + x = sftp.listdir(FOLDER) + self.assertEqual(len(x), 3) + self.assert_('duck.txt' in x) + self.assert_('fish.txt' in x) + self.assert_('tertiary.py' in x) + self.assert_('random' not in x) + finally: + sftp.remove(FOLDER + '/duck.txt') + sftp.remove(FOLDER + '/fish.txt') + sftp.remove(FOLDER + '/tertiary.py') + + def test_6_setstat(self): + """ + verify that the setstat functions (chown, chmod, utime) work. + """ + f = sftp.open(FOLDER + '/special', 'w') + try: + f.close() + + stat = sftp.stat(FOLDER + '/special') + sftp.chmod(FOLDER + '/special', (stat.st_mode & ~0777) | 0600) + self.assertEqual(sftp.stat(FOLDER + '/special').st_mode & 0777, 0600) + + mtime = stat.st_mtime - 3600 + atime = stat.st_atime - 1800 + sftp.utime(FOLDER + '/special', (atime, mtime)) + nstat = sftp.stat(FOLDER + '/special') + self.assertEqual(nstat.st_mtime, mtime) + self.assertEqual(nstat.st_atime, atime) + + # can't really test chown, since we'd have to know a valid uid. + finally: + sftp.remove(FOLDER + '/special') + + def test_7_readline_seek(self): + """ + create a text file and write a bunch of text into it. then count the lines + in the file, and seek around to retreive particular lines. this should + verify that read buffering and 'tell' work well together, and that read + buffering is reset on 'seek'. + """ + try: + f = sftp.open(FOLDER + '/duck.txt', 'w') + f.write(ARTICLE) + f.close() + + f = sftp.open(FOLDER + '/duck.txt', 'r+') + line_number = 0 + loc = 0 + pos_list = [] + for line in f: + line_number += 1 + pos_list.append(loc) + loc = f.tell() + f.seek(pos_list[6], f.SEEK_SET) + self.assertEqual(f.readline(), 'Nouzilly, France.\n') + f.seek(pos_list[17], f.SEEK_SET) + self.assertEqual(f.readline()[:4], 'duck') + f.seek(pos_list[10], f.SEEK_SET) + self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n') + f.close() + finally: + sftp.remove(FOLDER + '/duck.txt') + + def test_8_write_seek(self): + """ + create a text file, seek back and change part of it, and verify that the + changes worked. + """ + f = sftp.open(FOLDER + '/testing.txt', 'w') + try: + f.write('hello kitty.\n') + f.seek(-5, f.SEEK_CUR) + f.write('dd') + f.close() + + self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13) + f = sftp.open(FOLDER + '/testing.txt', 'r') + data = f.read(20) + f.close() + self.assertEqual(data, 'hello kiddy.\n') + finally: + sftp.remove(FOLDER + '/testing.txt') + + def test_9_symlink(self): + """ + create a symlink and then check that lstat doesn't follow it. + """ + f = sftp.open(FOLDER + '/original.txt', 'w') + try: + f.write('original\n') + f.close() + sftp.symlink('original.txt', FOLDER + '/link.txt') + self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt') + + f = sftp.open(FOLDER + '/link.txt', 'r') + self.assertEqual(f.readlines(), [ 'original\n' ]) + f.close() + self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12) + self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9) + finally: + try: + sftp.remove(FOLDER + '/link.txt') + except: + pass + try: + sftp.remove(FOLDER + '/original.txt') + except: + pass -- cgit v1.2.3