diff options
Diffstat (limited to 'tests')
-rwxr-xr-x | tests/test_sftp.py | 48 | ||||
-rw-r--r-- | tests/test_transport.py | 36 |
2 files changed, 74 insertions, 10 deletions
diff --git a/tests/test_sftp.py b/tests/test_sftp.py index 4c5065e8..94e9dea5 100755 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -62,7 +62,7 @@ liver insulin receptors. Their sensitivity to insulin is, however, similarly decreased compared with chicken. ''' -FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing') +FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000') sftp = None tc = None @@ -121,7 +121,6 @@ class SFTPTest (unittest.TestCase): ts.start_server(event, server) tc.connect(username='slowdive', password='pygmalion') event.wait(1.0) -# self.assert_(self.ts.is_active()) sftp = paramiko.SFTP.from_transport(tc) init_loopback = staticmethod(init_loopback) @@ -132,7 +131,14 @@ class SFTPTest (unittest.TestCase): set_big_file_test = staticmethod(set_big_file_test) def setUp(self): - sftp.mkdir(FOLDER) + global FOLDER + for i in xrange(1000): + FOLDER = FOLDER[:-3] + '%03d' % i + try: + sftp.mkdir(FOLDER) + break + except (IOError, OSError): + pass def tearDown(self): sftp.rmdir(FOLDER) @@ -511,17 +517,39 @@ class SFTPTest (unittest.TestCase): global g_big_file_test if not g_big_file_test: return - kblob = (1024 * 1024 * 'x') + mblob = (1024 * 1024 * 'x') try: f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) - f.write(kblob) + f.write(mblob) f.close() self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) finally: sftp.remove('%s/hongry.txt' % FOLDER) + + def test_H_big_file_renegotiate(self): + """ + write a 1MB file, forcing key renegotiation in the middle. + """ + global g_big_file_test + if not g_big_file_test: + return + t = sftp.sock.get_transport() + t.packetizer.REKEY_BYTES = 512 * 1024 + k32blob = (32 * 1024 * 'x') + try: + f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) + for i in xrange(32): + f.write(k32blob) + f.close() - def test_H_realpath(self): + self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + self.assertNotEquals(t.H, t.session_id) + finally: + sftp.remove('%s/hongry.txt' % FOLDER) + t.packetizer.REKEY_BYTES = pow(2, 30) + + def test_I_realpath(self): """ test that realpath is returning something non-empty and not an error. @@ -532,7 +560,7 @@ class SFTPTest (unittest.TestCase): self.assert_(len(f) > 0) self.assertEquals(os.path.join(pwd, FOLDER), f) - def test_I_mkdir(self): + def test_J_mkdir(self): """ verify that mkdir/rmdir work. """ @@ -555,7 +583,7 @@ class SFTPTest (unittest.TestCase): except IOError: pass - def test_J_chdir(self): + def test_K_chdir(self): """ verify that chdir/getcwd work. """ @@ -592,7 +620,7 @@ class SFTPTest (unittest.TestCase): except: pass - def test_K_get_put(self): + def test_L_get_put(self): """ verify that get/put work. """ @@ -621,7 +649,7 @@ class SFTPTest (unittest.TestCase): os.unlink(localname) sftp.unlink(FOLDER + '/bunny.txt') - def test_L_check(self): + def test_M_check(self): """ verify that file.check() works against our own server. (it's an sftp extension that we support, and may be the only ones who diff --git a/tests/test_transport.py b/tests/test_transport.py index 81254b48..c97fb160 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -504,3 +504,39 @@ class TransportTest (unittest.TestCase): self.assertEquals('', chan.recv(16)) chan.close() + + def test_F_renegotiate(self): + """ + verify that a transport can correctly renegotiate mid-stream. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.ts.start_server(event, server) + self.tc.connect(hostkey=public_host_key, + username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + self.tc.packetizer.REKEY_BYTES = 16384 + + chan = self.tc.open_session() + self.assert_(chan.exec_command('yes')) + schan = self.ts.accept(1.0) + + self.assertEquals(self.tc.H, self.tc.session_id) + for i in range(20): + chan.send('x' * 1024) + chan.close() + + # allow a few seconds for the rekeying to complete + for i in xrange(50): + if self.tc.H != self.tc.session_id: + break + time.sleep(0.1) + self.assertNotEquals(self.tc.H, self.tc.session_id) + + schan.close() |