summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorRobey Pointer <robey@lag.net>2004-03-08 17:54:19 +0000
committerRobey Pointer <robey@lag.net>2004-03-08 17:54:19 +0000
commit14f2193d3979c910d205282a43ca2da8f85f3915 (patch)
tree554749824e6282f1173e62df779930bb3c35e8d6
parent7cd7fced6e89b92ceecb2209a6351f22d602abbf (diff)
[project @ Arch-1:robey@lag.net--2003-public%secsh--dev--1.0--patch-32]
add unit tests add unit tests for BufferedFile and SFTP (it's a start). remove the demo sftp client because it was 99% copied from the other demos, which makes it kinda confusing. the unit tests are a much better example.
-rwxr-xr-xdemo_sftp.py157
-rwxr-xr-xtest.py35
-rwxr-xr-xtests/test_file.py139
-rwxr-xr-xtests/test_sftp.py303
4 files changed, 477 insertions, 157 deletions
diff --git a/demo_sftp.py b/demo_sftp.py
deleted file mode 100755
index 1f6cd3a6..00000000
--- a/demo_sftp.py
+++ /dev/null
@@ -1,157 +0,0 @@
-#!/usr/bin/python
-
-import sys, os, socket, threading, getpass, logging, time, base64, select, termios, tty, traceback
-import paramiko
-
-
-##### utility functions
-
-def load_host_keys():
- filename = os.environ['HOME'] + '/.ssh/known_hosts'
- keys = {}
- try:
- f = open(filename, 'r')
- except Exception, e:
- print '*** Unable to open host keys file (%s)' % filename
- return
- for line in f:
- keylist = line.split(' ')
- if len(keylist) != 3:
- continue
- hostlist, keytype, key = keylist
- hosts = hostlist.split(',')
- for host in hosts:
- if not keys.has_key(host):
- keys[host] = {}
- keys[host][keytype] = base64.decodestring(key)
- f.close()
- return keys
-
-
-##### main demo
-
-# setup logging
-l = logging.getLogger("paramiko")
-l.setLevel(logging.DEBUG)
-if len(l.handlers) == 0:
- f = open('demo.log', 'w')
- lh = logging.StreamHandler(f)
- lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s] %(name)s: %(message)s', '%Y%m%d:%H%M%S'))
- l.addHandler(lh)
-
-
-username = ''
-if len(sys.argv) > 1:
- hostname = sys.argv[1]
- if hostname.find('@') >= 0:
- username, hostname = hostname.split('@')
-else:
- hostname = raw_input('Hostname: ')
-if len(hostname) == 0:
- print '*** Hostname required.'
- sys.exit(1)
-port = 22
-if hostname.find(':') >= 0:
- hostname, portstr = hostname.split(':')
- port = int(portstr)
-
-# now connect
-try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect((hostname, port))
-except Exception, e:
- print '*** Connect failed: ' + str(e)
- traceback.print_exc()
- sys.exit(1)
-
-try:
- event = threading.Event()
- t = paramiko.Transport(sock)
- t.start_client(event)
- # print repr(t)
- event.wait(15)
- if not t.is_active():
- print '*** SSH negotiation failed.'
- sys.exit(1)
- # print repr(t)
-
- keys = load_host_keys()
- keytype, hostkey = t.get_remote_server_key()
- if not keys.has_key(hostname):
- print '*** WARNING: Unknown host key!'
- elif not keys[hostname].has_key(keytype):
- print '*** WARNING: Unknown host key!'
- elif keys[hostname][keytype] != hostkey:
- print '*** WARNING: Host key has changed!!!'
- sys.exit(1)
- else:
- print '*** Host key OK.'
-
- event.clear()
-
- # get username
- if username == '':
- default_username = getpass.getuser()
- username = raw_input('Username [%s]: ' % default_username)
- if len(username) == 0:
- username = default_username
-
- # ask for what kind of authentication to try
- default_auth = 'p'
- auth = raw_input('Auth by (p)assword, (r)sa key, or (d)ss key? [%s] ' % default_auth)
- if len(auth) == 0:
- auth = default_auth
-
- if auth == 'r':
- key = paramiko.RSAKey()
- default_path = os.environ['HOME'] + '/.ssh/id_rsa'
- path = raw_input('RSA key [%s]: ' % default_path)
- if len(path) == 0:
- path = default_path
- try:
- key.read_private_key_file(path)
- except paramiko.PasswordRequiredException:
- password = getpass.getpass('RSA key password: ')
- key.read_private_key_file(path, password)
- t.auth_publickey(username, key, event)
- elif auth == 'd':
- key = paramiko.DSSKey()
- default_path = os.environ['HOME'] + '/.ssh/id_dsa'
- path = raw_input('DSS key [%s]: ' % default_path)
- if len(path) == 0:
- path = default_path
- try:
- key.read_private_key_file(path)
- except paramiko.PasswordRequiredException:
- password = getpass.getpass('DSS key password: ')
- key.read_private_key_file(path, password)
- t.auth_key(username, key, event)
- else:
- pw = getpass.getpass('Password for %s@%s: ' % (username, hostname))
- t.auth_password(username, pw, event)
-
- event.wait(10)
- # print repr(t)
- if not t.is_authenticated():
- print '*** Authentication failed. :('
- t.close()
- sys.exit(1)
-
- chan = t.open_session()
- chan.invoke_subsystem('sftp')
- print '*** SFTP...'
- sftp = paramiko.SFTP(chan)
- print repr(sftp.listdir('/tmp'))
-
- chan.close()
- t.close()
-
-except Exception, e:
- print '*** Caught exception: ' + str(e.__class__) + ': ' + str(e)
- traceback.print_exc()
- try:
- t.close()
- except:
- pass
- sys.exit(1)
-
diff --git a/test.py b/test.py
new file mode 100755
index 00000000..15a9b779
--- /dev/null
+++ b/test.py
@@ -0,0 +1,35 @@
+#!/usr/bin/python
+
+# Copyright (C) 2003-2004 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Foobar; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+do the unit tests!
+"""
+
+import sys, os, unittest
+sys.path.append('tests/')
+
+from test_file import BufferedFileTest
+from test_sftp import SFTPTest
+
+suite = unittest.TestSuite()
+suite.addTest(unittest.makeSuite(BufferedFileTest))
+suite.addTest(unittest.makeSuite(SFTPTest))
+unittest.TextTestRunner(verbosity=2).run(suite)
+
diff --git a/tests/test_file.py b/tests/test_file.py
new file mode 100755
index 00000000..222b6818
--- /dev/null
+++ b/tests/test_file.py
@@ -0,0 +1,139 @@
+#!/usr/bin/python
+
+# Copyright (C) 2003-2004 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Foobar; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Some unit tests for the BufferedFile abstraction.
+"""
+
+import unittest
+from paramiko.file import BufferedFile
+
+
+class LoopbackFile (BufferedFile):
+ """
+ BufferedFile object that you can write data into, and then read it back.
+ """
+ def __init__(self, mode='r', bufsize=-1):
+ BufferedFile.__init__(self)
+ self._set_mode(mode, bufsize)
+ self.buffer = ''
+
+ def _read(self, size):
+ if len(self.buffer) == 0:
+ return None
+ if size > len(self.buffer):
+ size = len(self.buffer)
+ data = self.buffer[:size]
+ self.buffer = self.buffer[size:]
+ return data
+
+ def _write(self, data):
+ self.buffer += data
+ return len(data)
+
+
+class BufferedFileTest (unittest.TestCase):
+
+ def test_1_simple(self):
+ f = LoopbackFile('r')
+ try:
+ f.write('hi')
+ self.assert_(False, 'no exception on write to read-only file')
+ except:
+ pass
+ f.close()
+
+ f = LoopbackFile('w')
+ try:
+ f.read(1)
+ self.assert_(False, 'no exception to read from write-only file')
+ except:
+ pass
+ f.close()
+
+ def test_2_readline(self):
+ f = LoopbackFile('r+U')
+ f.write('First line.\nSecond line.\r\nFinal line non-terminated.')
+ self.assertEqual(f.readline(), 'First line.\n')
+ # universal newline mode should convert this linefeed:
+ self.assertEqual(f.readline(), 'Second line.\n')
+ self.assertEqual(f.readline(), 'Final line non-terminated.')
+ self.assertEqual(f.readline(), '')
+ f.close()
+ try:
+ f.readline()
+ self.assert_(False, 'no exception on readline of closed file')
+ except IOError:
+ pass
+ self.assert_('\n' in f.newlines)
+ self.assert_('\r\n' in f.newlines)
+ self.assert_('\r' not in f.newlines)
+
+ def test_3_lf(self):
+ """
+ try to trick the linefeed detector.
+ """
+ f = LoopbackFile('r+U')
+ f.write('First line.\r')
+ self.assertEqual(f.readline(), 'First line.\n')
+ f.write('\nSecond.\r\n')
+ self.assertEqual(f.readline(), 'Second.\n')
+ f.close()
+ self.assertEqual(f.newlines, '\r\n')
+
+ def test_4_write(self):
+ """
+ verify that write buffering is on.
+ """
+ f = LoopbackFile('r+', 1)
+ f.write('Complete line.\nIncomplete line.')
+ self.assertEqual(f.readline(), 'Complete line.\n')
+ self.assertEqual(f.readline(), '')
+ f.write('..\n')
+ self.assertEqual(f.readline(), 'Incomplete line...\n')
+ f.close()
+
+ def test_5_flush(self):
+ """
+ verify that flush will force a write.
+ """
+ f = LoopbackFile('r+', 512)
+ f.write('Not\nquite\n512 bytes.\n')
+ self.assertEqual(f.read(1), '')
+ f.flush()
+ self.assertEqual(f.read(5), 'Not\nq')
+ self.assertEqual(f.read(10), 'uite\n512 b')
+ self.assertEqual(f.read(9), 'ytes.\n')
+ self.assertEqual(f.read(3), '')
+ f.close()
+
+ def test_6_buffering(self):
+ """
+ verify that flushing happens automatically on buffer crossing.
+ """
+ f = LoopbackFile('r+', 16)
+ f.write('Too small.')
+ self.assertEqual(f.read(4), '')
+ f.write(' ')
+ self.assertEqual(f.read(4), '')
+ f.write('Enough.')
+ self.assertEqual(f.read(20), 'Too small. Enough.')
+ f.close()
+
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
new file mode 100755
index 00000000..798853f6
--- /dev/null
+++ b/tests/test_sftp.py
@@ -0,0 +1,303 @@
+#!/usr/bin/python
+
+# Copyright (C) 2003-2004 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Foobar; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+some unit tests to make sure sftp works.
+
+a real actual sftp server is contacted, and a new folder is created there to
+do test file operations in (so no existing files will be harmed).
+"""
+
+import sys, os
+
+# need a host and private-key where we have acecss
+HOST = os.environ.get('TEST_HOST', 'localhost')
+USER = os.environ.get('TEST_USER', os.environ.get('USER', 'nobody'))
+PKEY = os.environ.get('TEST_PKEY', os.path.join(os.environ.get('HOME', '/'), '.ssh/id_rsa'))
+FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing')
+
+import paramiko, logging, unittest
+
+ARTICLE = '''
+Insulin sensitivity and liver insulin receptor structure in ducks from two
+genera
+
+T. Constans, B. Chevalier, M. Derouet and J. Simon
+Station de Recherches Avicoles, Institut National de la Recherche Agronomique,
+Nouzilly, France.
+
+Insulin sensitivity and liver insulin receptor structure were studied in
+5-wk-old ducks from two genera (Muscovy and Pekin). In the fasting state, both
+duck types were equally resistant to exogenous insulin compared with chicken.
+Despite the low potency of duck insulin, the number of insulin receptors was
+lower in Muscovy duck and similar in Pekin duck and chicken liver membranes.
+After 125I-insulin cross-linking, the size of the alpha-subunit of the
+receptors from the three species was 135,000. Wheat germ agglutinin-purified
+receptors from the three species were contaminated by an active and unusual
+adenosinetriphosphatase (ATPase) contaminant (highest activity in Muscovy
+duck). Sequential purification of solubilized receptor from both duck types on
+lentil and then wheat germ agglutinin lectins led to a fraction of receptors
+very poor in ATPase activity that exhibited a beta-subunit size (95,000) and
+tyrosine kinase activity similar to those of ATPase-free chicken insulin
+receptors. Therefore the ducks from the two genera exhibit an alpha-beta-
+structure for liver insulin receptors and a clear difference in the number of
+liver insulin receptors. Their sensitivity to insulin is, however, similarly
+decreased compared with chicken.
+'''
+
+
+# setup logging
+l = logging.getLogger('paramiko')
+l.setLevel(logging.DEBUG)
+if len(l.handlers) == 0:
+ f = open('test.log', 'w')
+ lh = logging.StreamHandler(f)
+ lh.setFormatter(logging.Formatter('%(levelname)-.3s [%(asctime)s] %(name)s: %(message)s', '%Y%m%d:%H%M%S'))
+ l.addHandler(lh)
+t = paramiko.Transport(HOST)
+key = paramiko.RSAKey()
+key.read_private_key_file(PKEY)
+try:
+ t.connect(username=USER, pkey=key)
+except paramiko.SSHException:
+ t.close()
+ sys.stderr.write('\n\nparamiko.Transport.connect FAILED.\n')
+ sys.stderr.write('There are several possible reasons why it might fail so quickly:\n\n')
+ sys.stderr.write('* The host to connect to (%s) is not a valid SSH server.\n' % HOST)
+ sys.stderr.write(' (Override the SSH host with environment variable TEST_HOST.)\n')
+ sys.stderr.write('* The username to auth as (%s) is invalid.\n' % USER)
+ sys.stderr.write(' (Override the auth user with environment variable TEST_USER.)\n')
+ sys.stderr.write('* The private key given (%s) is not accepted by the server.\n' % PKEY)
+ sys.stderr.write(' (Override the private key location with environment variable TEST_PKEY.)\n')
+ sys.stderr.write('\n')
+ sys.exit(1)
+sftp = paramiko.SFTP.from_transport(t)
+
+
+class SFTPTest (unittest.TestCase):
+
+ def setUp(self):
+ sftp.mkdir(FOLDER)
+
+ def tearDown(self):
+ sftp.rmdir(FOLDER)
+
+ def test_1_folder(self):
+ """
+ create a temporary folder, verify that we can create a file in it, then
+ remove the folder and verify that we can't create a file in it anymore.
+ """
+ f = sftp.open(FOLDER + '/test', 'w')
+ try:
+ self.assertEqual(f.stat().st_size, 0)
+ f.close()
+ try:
+ f = sftp.open(FOLDER + '/test', 'w')
+ # shouldn't be able to create that file
+ self.assert_(False, 'no exception at dummy file creation')
+ except:
+ pass
+ finally:
+ sftp.remove(FOLDER + '/test')
+
+ def test_2_write(self):
+ """
+ verify that a file can be created and written, and the size is correct.
+ """
+ f = sftp.open(FOLDER + '/duck.txt', 'w')
+ try:
+ f.write(ARTICLE)
+ f.close()
+ self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
+ finally:
+ sftp.remove(FOLDER + '/duck.txt')
+
+ def test_3_append(self):
+ """
+ verify that a file can be opened for append, and tell() still works.
+ """
+ f = sftp.open(FOLDER + '/append.txt', 'w')
+ try:
+ f.write('first line\nsecond line\n')
+ self.assertEqual(f.tell(), 23)
+ f.close()
+
+ f = sftp.open(FOLDER + '/append.txt', 'a+')
+ f.write('third line!!!\n')
+ self.assertEqual(f.tell(), 37)
+ self.assertEqual(f.stat().st_size, 37)
+ f.seek(-26, f.SEEK_CUR)
+ self.assertEqual(f.readline(), 'second line\n')
+ f.close()
+ finally:
+ sftp.remove(FOLDER + '/append.txt')
+
+ def test_4_rename(self):
+ """
+ verify that renaming a file works.
+ """
+ f = sftp.open(FOLDER + '/first.txt', 'w')
+ try:
+ f.write('content!\n');
+ f.close()
+ sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt')
+ try:
+ f = sftp.open(FOLDER + '/first.txt', 'r')
+ self.assert_(False, 'no exception on reading nonexistent file')
+ except:
+ pass
+ f = sftp.open(FOLDER + '/second.txt', 'r')
+ f.seek(-6, f.SEEK_END)
+ self.assertEqual(f.read(4), 'tent')
+ f.close()
+ finally:
+ try:
+ sftp.remove(FOLDER + '/first.txt')
+ except:
+ pass
+ try:
+ sftp.remove(FOLDER + '/second.txt')
+ except:
+ pass
+
+ def test_5_listdir(self):
+ """
+ verify that a folder can be created, a bunch of files can be placed in it,
+ and those files show up in sftp.listdir.
+ """
+ try:
+ f = sftp.open(FOLDER + '/duck.txt', 'w')
+ f.close()
+
+ f = sftp.open(FOLDER + '/fish.txt', 'w')
+ f.close()
+
+ f = sftp.open(FOLDER + '/tertiary.py', 'w')
+ f.close()
+
+ x = sftp.listdir(FOLDER)
+ self.assertEqual(len(x), 3)
+ self.assert_('duck.txt' in x)
+ self.assert_('fish.txt' in x)
+ self.assert_('tertiary.py' in x)
+ self.assert_('random' not in x)
+ finally:
+ sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(FOLDER + '/fish.txt')
+ sftp.remove(FOLDER + '/tertiary.py')
+
+ def test_6_setstat(self):
+ """
+ verify that the setstat functions (chown, chmod, utime) work.
+ """
+ f = sftp.open(FOLDER + '/special', 'w')
+ try:
+ f.close()
+
+ stat = sftp.stat(FOLDER + '/special')
+ sftp.chmod(FOLDER + '/special', (stat.st_mode & ~0777) | 0600)
+ self.assertEqual(sftp.stat(FOLDER + '/special').st_mode & 0777, 0600)
+
+ mtime = stat.st_mtime - 3600
+ atime = stat.st_atime - 1800
+ sftp.utime(FOLDER + '/special', (atime, mtime))
+ nstat = sftp.stat(FOLDER + '/special')
+ self.assertEqual(nstat.st_mtime, mtime)
+ self.assertEqual(nstat.st_atime, atime)
+
+ # can't really test chown, since we'd have to know a valid uid.
+ finally:
+ sftp.remove(FOLDER + '/special')
+
+ def test_7_readline_seek(self):
+ """
+ create a text file and write a bunch of text into it. then count the lines
+ in the file, and seek around to retreive particular lines. this should
+ verify that read buffering and 'tell' work well together, and that read
+ buffering is reset on 'seek'.
+ """
+ try:
+ f = sftp.open(FOLDER + '/duck.txt', 'w')
+ f.write(ARTICLE)
+ f.close()
+
+ f = sftp.open(FOLDER + '/duck.txt', 'r+')
+ line_number = 0
+ loc = 0
+ pos_list = []
+ for line in f:
+ line_number += 1
+ pos_list.append(loc)
+ loc = f.tell()
+ f.seek(pos_list[6], f.SEEK_SET)
+ self.assertEqual(f.readline(), 'Nouzilly, France.\n')
+ f.seek(pos_list[17], f.SEEK_SET)
+ self.assertEqual(f.readline()[:4], 'duck')
+ f.seek(pos_list[10], f.SEEK_SET)
+ self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n')
+ f.close()
+ finally:
+ sftp.remove(FOLDER + '/duck.txt')
+
+ def test_8_write_seek(self):
+ """
+ create a text file, seek back and change part of it, and verify that the
+ changes worked.
+ """
+ f = sftp.open(FOLDER + '/testing.txt', 'w')
+ try:
+ f.write('hello kitty.\n')
+ f.seek(-5, f.SEEK_CUR)
+ f.write('dd')
+ f.close()
+
+ self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13)
+ f = sftp.open(FOLDER + '/testing.txt', 'r')
+ data = f.read(20)
+ f.close()
+ self.assertEqual(data, 'hello kiddy.\n')
+ finally:
+ sftp.remove(FOLDER + '/testing.txt')
+
+ def test_9_symlink(self):
+ """
+ create a symlink and then check that lstat doesn't follow it.
+ """
+ f = sftp.open(FOLDER + '/original.txt', 'w')
+ try:
+ f.write('original\n')
+ f.close()
+ sftp.symlink('original.txt', FOLDER + '/link.txt')
+ self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt')
+
+ f = sftp.open(FOLDER + '/link.txt', 'r')
+ self.assertEqual(f.readlines(), [ 'original\n' ])
+ f.close()
+ self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12)
+ self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9)
+ finally:
+ try:
+ sftp.remove(FOLDER + '/link.txt')
+ except:
+ pass
+ try:
+ sftp.remove(FOLDER + '/original.txt')
+ except:
+ pass