From 811f2bf30f011f911fa72b29e7a566e6f6c2acab Mon Sep 17 00:00:00 2001
From: Robey Pointer <robey@lag.net>
Date: Sun, 19 Dec 2004 19:56:48 +0000
Subject: [project @
 Arch-1:robey@lag.net--2003-public%secsh--dev--1.0--patch-136] loopback sftp
 test add ability to turn off more tests, and a secret (for now) -X option to
 do the sftp tests via loopback socket.  added another symlink sftp test to
 see what happens with absolute symlinks.

---
 tests/test_sftp.py | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 54 insertions(+), 2 deletions(-)

(limited to 'tests')

diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index eebb2184..ef8adbc3 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -27,9 +27,11 @@ do test file operations in (so no existing files will be harmed).
 
 import sys, os
 import random
-import logging
+import logging, threading
 
 import paramiko, unittest
+from stub_sftp import StubServer, StubSFTPServer
+from loop import LoopSocket
 
 ARTICLE = '''
 Insulin sensitivity and liver insulin receptor structure in ducks from two
@@ -61,6 +63,7 @@ decreased compared with chicken.
 FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing')
 
 sftp = None
+g_big_file_test = True
 
 
 class SFTPTest (unittest.TestCase):
@@ -97,6 +100,33 @@ class SFTPTest (unittest.TestCase):
         sftp = paramiko.SFTP.from_transport(t)
     init = staticmethod(init)
 
+    def init_loopback():
+        global sftp
+
+        socks = LoopSocket()
+        sockc = LoopSocket()
+        sockc.link(socks)
+        tc = paramiko.Transport(sockc)
+        ts = paramiko.Transport(socks)
+
+        host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+        ts.add_server_key(host_key)
+        event = threading.Event()
+        server = StubServer()
+        ts.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer)
+        ts.start_server(event, server)
+        tc.connect(username='slowdive', password='pygmalion')
+        event.wait(1.0)
+#        self.assert_(self.ts.is_active())
+
+        sftp = paramiko.SFTP.from_transport(tc)
+    init_loopback = staticmethod(init_loopback)
+
+    def set_big_file_test(onoff):
+        global g_big_file_test
+        g_big_file_test = onoff
+    set_big_file_test = staticmethod(set_big_file_test)
+        
     def setUp(self):
         sftp.mkdir(FOLDER)
 
@@ -294,14 +324,30 @@ class SFTPTest (unittest.TestCase):
             f = sftp.open(FOLDER + '/link.txt', 'r')
             self.assertEqual(f.readlines(), [ 'original\n' ])
             f.close()
+
+            cwd = sftp.normalize('.')
+            if cwd[-1] == '/':
+                cwd = cwd[:-1]
+            abs_path = cwd + '/' + FOLDER + '/original.txt'
+            sftp.symlink(abs_path, FOLDER + '/link2.txt')
+            self.assertEqual(abs_path, sftp.readlink(FOLDER + '/link2.txt'))
+
             self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12)
             self.assertEqual(sftp.stat(FOLDER + '/link.txt').st_size, 9)
+            # the sftp server may be hiding extra path members from us, so the
+            # length may be longer than we expect:
+            self.assert_(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path))
+            self.assertEqual(sftp.stat(FOLDER + '/link2.txt').st_size, 9)
             self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9)
         finally:
             try:
                 sftp.remove(FOLDER + '/link.txt')
             except:
                 pass
+            try:
+                sftp.remove(FOLDER + '/link2.txt')
+            except:
+                pass
             try:
                 sftp.remove(FOLDER + '/original.txt')
             except:
@@ -333,6 +379,9 @@ class SFTPTest (unittest.TestCase):
         """
         create a bunch of files over the same session.
         """
+        global g_big_file_test
+        if not g_big_file_test:
+            return
         numfiles = 100
         try:
             for i in range(numfiles):
@@ -362,6 +411,9 @@ class SFTPTest (unittest.TestCase):
         write a 1MB file, with no linefeeds, using line buffering.
         FIXME: this is slow!  what causes the slowness?
         """
+        global g_big_file_test
+        if not g_big_file_test:
+            return
         kblob = (1024 * 'x')
         try:
             f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 1)
@@ -385,7 +437,7 @@ class SFTPTest (unittest.TestCase):
         self.assert_(len(pwd) > 0)
         f = sftp.normalize('./' + FOLDER)
         self.assert_(len(f) > 0)
-        self.assert_(f == pwd + '/' + FOLDER)
+        self.assertEquals(os.path.join(pwd, FOLDER), f)
 
     def test_E_mkdir(self):
         """
-- 
cgit v1.2.3