summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rwxr-xr-xtest.py2
-rw-r--r--tests/test_kex.py177
-rwxr-xr-xtests/test_sftp.py25
3 files changed, 204 insertions, 0 deletions
diff --git a/test.py b/test.py
index 2decf412..b8b76d42 100755
--- a/test.py
+++ b/test.py
@@ -31,6 +31,7 @@ sys.path.append('tests/')
from test_message import MessageTest
from test_file import BufferedFileTest
from test_pkey import KeyTest
+from test_kex import KexTest
from test_transport import TransportTest
from test_sftp import SFTPTest
@@ -72,6 +73,7 @@ suite.addTest(unittest.makeSuite(MessageTest))
suite.addTest(unittest.makeSuite(BufferedFileTest))
if options.use_pkey:
suite.addTest(unittest.makeSuite(KeyTest))
+suite.addTest(unittest.makeSuite(KexTest))
suite.addTest(unittest.makeSuite(TransportTest))
if options.use_sftp:
suite.addTest(unittest.makeSuite(SFTPTest))
diff --git a/tests/test_kex.py b/tests/test_kex.py
new file mode 100644
index 00000000..c7a86899
--- /dev/null
+++ b/tests/test_kex.py
@@ -0,0 +1,177 @@
+#!/usr/bin/python
+
+# Copyright (C) 2003-2004 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Some unit tests for the key exchange protocols.
+"""
+
+import unittest
+import paramiko.util
+from paramiko.kex_group1 import KexGroup1
+from paramiko.kex_gex import KexGex
+from paramiko import Message
+
+
+class FakeRandpool (object):
+ def stir(self):
+ pass
+ def get_bytes(self, n):
+ return chr(0xcc) * n
+
+class FakeKey (object):
+ def __str__(self):
+ return 'fake-key'
+ def sign_ssh_data(self, randpool, H):
+ return 'fake-sig'
+
+class FakeModulusPack (object):
+ P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFFL
+ G = 2
+ def get_modulus(self, min, ask, max):
+ return self.G, self.P
+
+class FakeTransport (object):
+ randpool = FakeRandpool()
+ local_version = 'SSH-2.0-paramiko_1.0'
+ remote_version = 'SSH-2.0-lame'
+ local_kex_init = 'local-kex-init'
+ remote_kex_init = 'remote-kex-init'
+
+ def _send_message(self, m):
+ self._message = m
+ def _expect_packet(self, t):
+ self._expect = t
+ def _set_K_H(self, K, H):
+ self._K = K
+ self._H = H
+ def _verify_key(self, host_key, sig):
+ self._verify = (host_key, sig)
+ def _activate_outbound(self):
+ self._activated = True
+ def _log(self, level, s):
+ pass
+ def get_server_key(self):
+ return FakeKey()
+ def _get_modulus_pack(self):
+ return FakeModulusPack()
+
+
+class KexTest (unittest.TestCase):
+
+ K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504L
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_1_group1_client(self):
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexGroup1(transport)
+ kex.start_kex()
+ x = '1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4'
+ self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
+ self.assertEquals(paramiko.kex_group1._MSG_KEXDH_REPLY, transport._expect)
+
+ # fake "reply"
+ msg = Message()
+ msg.add_string('fake-host-key')
+ msg.add_mpint(69)
+ msg.add_string('fake-sig')
+ kex.parse_next(paramiko.kex_group1._MSG_KEXDH_REPLY, msg)
+ H = '0C39EDB98E9853B85D4527DA940EB03301925329'
+ self.assertEquals(self.K, transport._K)
+ self.assertEquals(H, paramiko.util.hexify(transport._H))
+ self.assertEquals(('fake-host-key', 'fake-sig'), transport._verify)
+ self.assert_(transport._activated)
+
+ def test_2_group1_server(self):
+ transport = FakeTransport()
+ transport.server_mode = True
+ kex = KexGroup1(transport)
+ kex.start_kex()
+ self.assertEquals(paramiko.kex_group1._MSG_KEXDH_INIT, transport._expect)
+
+ msg = Message()
+ msg.add_mpint(69)
+ kex.parse_next(paramiko.kex_group1._MSG_KEXDH_INIT, msg)
+ H = '77FE6F0094FB8DB3270106A77F88D66E09EEF8AF'
+ x = '1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967'
+ self.assertEquals(self.K, transport._K)
+ self.assertEquals(H, paramiko.util.hexify(transport._H))
+ self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
+ self.assert_(transport._activated)
+
+ def test_3_gex_client(self):
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexGex(transport)
+ kex.start_kex()
+ x = '22000004000000080000002000'
+ self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
+ self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, transport._expect)
+
+ msg = Message()
+ msg.add_mpint(FakeModulusPack.P)
+ msg.add_mpint(FakeModulusPack.G)
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
+ x = '20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4'
+ self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
+ self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, transport._expect)
+
+ msg = Message()
+ msg.add_string('fake-host-key')
+ msg.add_mpint(69)
+ msg.add_string('fake-sig')
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
+ H = 'A265563F2FA87F1A89BF007EE90D58BE2E4A4BD0'
+ self.assertEquals(self.K, transport._K)
+ self.assertEquals(H, paramiko.util.hexify(transport._H))
+ self.assertEquals(('fake-host-key', 'fake-sig'), transport._verify)
+ self.assert_(transport._activated)
+
+ def test_4_gex_server(self):
+ transport = FakeTransport()
+ transport.server_mode = True
+ kex = KexGex(transport)
+ kex.start_kex()
+ self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, transport._expect)
+
+ msg = Message()
+ msg.add_int(1024)
+ msg.add_int(2048)
+ msg.add_int(4096)
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg)
+ x = '1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102'
+ self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
+ self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, transport._expect)
+
+ msg = Message()
+ msg.add_mpint(12345)
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581L
+ H = 'CE754197C21BF3452863B4F44D0B3951F12516EF'
+ x = '210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967'
+ self.assertEquals(K, transport._K)
+ self.assertEquals(H, paramiko.util.hexify(transport._H))
+ self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
+ self.assert_(transport._activated)
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index f112561b..64334fbc 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -385,3 +385,28 @@ class SFTPTest (unittest.TestCase):
f = sftp.normalize('./' + FOLDER)
self.assert_(len(f) > 0)
self.assert_(f == pwd + '/' + FOLDER)
+
+ def test_E_mkdir(self):
+ """
+ verify that mkdir/rmdir work.
+ """
+ try:
+ sftp.mkdir(FOLDER + '/subfolder')
+ except:
+ self.assert_(False, 'exception creating subfolder')
+ try:
+ sftp.mkdir(FOLDER + '/subfolder')
+ self.assert_(False, 'no exception overwriting subfolder')
+ except:
+ pass
+ try:
+ sftp.rmdir(FOLDER + '/subfolder')
+ except:
+ self.assert_(False, 'exception removing subfolder')
+ try:
+ sftp.rmdir(FOLDER + '/subfolder')
+ self.assert_(False, 'no exception removing nonexistent subfolder')
+ except:
+ pass
+
+