summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/stub_sftp.py1
-rw-r--r--tests/test_auth.py4
-rw-r--r--tests/test_buffered_pipe.py10
-rw-r--r--tests/test_client.py202
-rw-r--r--tests/test_gssapi.py135
-rw-r--r--tests/test_hostkeys.py1
-rw-r--r--tests/test_kex.py137
-rw-r--r--tests/test_kex_gss.py131
-rw-r--r--tests/test_message.py8
-rw-r--r--tests/test_packetizer.py55
-rw-r--r--tests/test_pkey.py37
-rwxr-xr-xtests/test_sftp.py63
-rw-r--r--tests/test_sftp_big.py18
-rw-r--r--tests/test_ssh_gss.py124
-rw-r--r--tests/test_transport.py218
-rw-r--r--tests/test_util.py162
-rw-r--r--tests/util.py10
17 files changed, 1118 insertions, 198 deletions
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py
index 47644433..24380ba1 100644
--- a/tests/stub_sftp.py
+++ b/tests/stub_sftp.py
@@ -21,6 +21,7 @@ A stub SFTP server for loopback SFTP testing.
"""
import os
+import sys
from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \
SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED
from paramiko.common import o666
diff --git a/tests/test_auth.py b/tests/test_auth.py
index 1d972d53..ec78e3ce 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -118,12 +118,12 @@ class AuthTest (unittest.TestCase):
self.ts.add_server_key(host_key)
self.event = threading.Event()
self.server = NullServer()
- self.assertTrue(not self.event.isSet())
+ self.assertTrue(not self.event.is_set())
self.ts.start_server(self.event, self.server)
def verify_finished(self):
self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
+ self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
def test_1_bad_auth_type(self):
diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py
index a53081a9..eeb4d0ad 100644
--- a/tests/test_buffered_pipe.py
+++ b/tests/test_buffered_pipe.py
@@ -22,10 +22,10 @@ Some unit tests for BufferedPipe.
import threading
import time
+import unittest
from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
from paramiko import pipe
-
-from tests.util import ParamikoTest
+from paramiko.py3compat import b
def delay_thread(p):
@@ -40,7 +40,7 @@ def close_thread(p):
p.close()
-class BufferedPipeTest(ParamikoTest):
+class BufferedPipeTest(unittest.TestCase):
def test_1_buffered_pipe(self):
p = BufferedPipe()
self.assertTrue(not p.read_ready())
@@ -48,12 +48,12 @@ class BufferedPipeTest(ParamikoTest):
self.assertTrue(p.read_ready())
data = p.read(6)
self.assertEqual(b'hello.', data)
-
+
p.feed('plus/minus')
self.assertEqual(b'plu', p.read(3))
self.assertEqual(b's/m', p.read(3))
self.assertEqual(b'inus', p.read(4))
-
+
p.close()
self.assertTrue(not p.read_ready())
self.assertEqual(b'', p.read(1))
diff --git a/tests/test_client.py b/tests/test_client.py
index 7e5c80b4..f71efd5a 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -20,6 +20,8 @@
Some unit tests for SSHClient.
"""
+from __future__ import with_statement
+
import socket
from tempfile import mkstemp
import threading
@@ -27,12 +29,25 @@ import unittest
import weakref
import warnings
import os
+import time
from tests.util import test_path
import paramiko
-from paramiko.common import PY2
+from paramiko.common import PY2, b
+from paramiko.ssh_exception import SSHException
+
+
+FINGERPRINTS = {
+ 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c',
+ 'ssh-rsa': b'\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5',
+ 'ecdsa-sha2-nistp256': b'\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60',
+}
class NullServer (paramiko.ServerInterface):
+ def __init__(self, *args, **kwargs):
+ # Allow tests to enable/disable specific key types
+ self.__allowed_keys = kwargs.pop('allowed_keys', [])
+ super(NullServer, self).__init__(*args, **kwargs)
def get_allowed_auths(self, username):
if username == 'slowdive':
@@ -45,7 +60,14 @@ class NullServer (paramiko.ServerInterface):
return paramiko.AUTH_FAILED
def check_auth_publickey(self, username, key):
- if (key.get_name() == 'ssh-dss') and key.get_fingerprint() == b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c':
+ try:
+ expected = FINGERPRINTS[key.get_name()]
+ except KeyError:
+ return paramiko.AUTH_FAILED
+ if (
+ key.get_name() in self.__allowed_keys and
+ key.get_fingerprint() == expected
+ ):
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
@@ -53,7 +75,7 @@ class NullServer (paramiko.ServerInterface):
return paramiko.OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != 'yes':
+ if command != b'yes':
return False
return True
@@ -72,32 +94,46 @@ class SSHClientTest (unittest.TestCase):
if hasattr(self, attr):
getattr(self, attr).close()
- def _run(self):
+ def _run(self, allowed_keys=None, delay=0):
+ if allowed_keys is None:
+ allowed_keys = FINGERPRINTS.keys()
self.socks, addr = self.sockl.accept()
self.ts = paramiko.Transport(self.socks)
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
self.ts.add_server_key(host_key)
- server = NullServer()
+ server = NullServer(allowed_keys=allowed_keys)
+ if delay:
+ time.sleep(delay)
self.ts.start_server(self.event, server)
- def test_1_client(self):
+ def _test_connection(self, **kwargs):
"""
- verify that the SSHClient stuff works too.
+ (Most) kwargs get passed directly into SSHClient.connect().
+
+ The exception is ``allowed_keys`` which is stripped and handed to the
+ ``NullServer`` used for testing.
"""
- threading.Thread(target=self._run).start()
+ run_kwargs = {'allowed_keys': kwargs.pop('allowed_keys', None)}
+ # Server setup
+ threading.Thread(target=self._run, kwargs=run_kwargs).start()
host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+ # Client setup
self.tc = paramiko.SSHClient()
self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
- self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
+ # Actual connection
+ self.tc.connect(self.addr, self.port, username='slowdive', **kwargs)
+
+ # Authentication successful?
self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
+ self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
+ # Command execution functions?
stdin, stdout, stderr = self.tc.exec_command('yes')
schan = self.ts.accept(1.0)
@@ -110,61 +146,77 @@ class SSHClientTest (unittest.TestCase):
self.assertEqual('This is on stderr.\n', stderr.readline())
self.assertEqual('', stderr.readline())
+ # Cleanup
stdin.close()
stdout.close()
stderr.close()
+ def test_1_client(self):
+ """
+ verify that the SSHClient stuff works too.
+ """
+ self._test_connection(password='pygmalion')
+
def test_2_client_dsa(self):
"""
verify that SSHClient works with a DSA key.
"""
- threading.Thread(target=self._run).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
- public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+ self._test_connection(key_filename=test_path('test_dss.key'))
- self.tc = paramiko.SSHClient()
- self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
- self.tc.connect(self.addr, self.port, username='slowdive', key_filename=test_path('test_dss.key'))
-
- self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
- self.assertTrue(self.ts.is_active())
- self.assertEqual('slowdive', self.ts.get_username())
- self.assertEqual(True, self.ts.is_authenticated())
-
- stdin, stdout, stderr = self.tc.exec_command('yes')
- schan = self.ts.accept(1.0)
-
- schan.send('Hello there.\n')
- schan.send_stderr('This is on stderr.\n')
- schan.close()
-
- self.assertEqual('Hello there.\n', stdout.readline())
- self.assertEqual('', stdout.readline())
- self.assertEqual('This is on stderr.\n', stderr.readline())
- self.assertEqual('', stderr.readline())
+ def test_client_rsa(self):
+ """
+ verify that SSHClient works with an RSA key.
+ """
+ self._test_connection(key_filename=test_path('test_rsa.key'))
- stdin.close()
- stdout.close()
- stderr.close()
+ def test_2_5_client_ecdsa(self):
+ """
+ verify that SSHClient works with an ECDSA key.
+ """
+ self._test_connection(key_filename=test_path('test_ecdsa.key'))
def test_3_multiple_key_files(self):
"""
verify that SSHClient accepts and tries multiple key files.
"""
- threading.Thread(target=self._run).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
- public_host_key = paramiko.RSAKey(data=host_key.asbytes())
-
- self.tc = paramiko.SSHClient()
- self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
- self.tc.connect(self.addr, self.port, username='slowdive', key_filename=[test_path('test_rsa.key'), test_path('test_dss.key')])
-
- self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
- self.assertTrue(self.ts.is_active())
- self.assertEqual('slowdive', self.ts.get_username())
- self.assertEqual(True, self.ts.is_authenticated())
+ # This is dumb :(
+ types_ = {
+ 'rsa': 'ssh-rsa',
+ 'dss': 'ssh-dss',
+ 'ecdsa': 'ecdsa-sha2-nistp256',
+ }
+ # Various combos of attempted & valid keys
+ # TODO: try every possible combo using itertools functions
+ for attempt, accept in (
+ (['rsa', 'dss'], ['dss']), # Original test #3
+ (['dss', 'rsa'], ['dss']), # Ordering matters sometimes, sadly
+ (['dss', 'rsa', 'ecdsa'], ['dss']), # Try ECDSA but fail
+ (['rsa', 'ecdsa'], ['ecdsa']), # ECDSA success
+ ):
+ try:
+ self._test_connection(
+ key_filename=[
+ test_path('test_{0}.key'.format(x)) for x in attempt
+ ],
+ allowed_keys=[types_[x] for x in accept],
+ )
+ finally:
+ # Clean up to avoid occasional gc-related deadlocks.
+ # TODO: use nose test generators after nose port
+ self.tearDown()
+ self.setUp()
+
+ def test_multiple_key_files_failure(self):
+ """
+ Expect failure when multiple keys in play and none are accepted
+ """
+ # Until #387 is fixed we have to catch a high-up exception since
+ # various platforms trigger different errors here >_<
+ self.assertRaises(SSHException,
+ self._test_connection,
+ key_filename=[test_path('test_rsa.key')],
+ allowed_keys=['ecdsa-sha2-nistp256'],
+ )
def test_4_auto_add_policy(self):
"""
@@ -180,7 +232,7 @@ class SSHClientTest (unittest.TestCase):
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
+ self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
@@ -221,6 +273,8 @@ class SSHClientTest (unittest.TestCase):
"""
# Unclear why this is borked on Py3, but it is, and does not seem worth
# pursuing at the moment.
+ # XXX: It's the release of the references to e.g packetizer that fails
+ # in py3...
if not PY2:
return
threading.Thread(target=self._run).start()
@@ -233,7 +287,7 @@ class SSHClientTest (unittest.TestCase):
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
+ self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
p = weakref.ref(self.tc._transport.packetizer)
@@ -252,3 +306,47 @@ class SSHClientTest (unittest.TestCase):
gc.collect()
self.assertTrue(p() is None)
+
+ def test_client_can_be_used_as_context_manager(self):
+ """
+ verify that an SSHClient can be used a context manager
+ """
+ threading.Thread(target=self._run).start()
+ host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+
+ with paramiko.SSHClient() as tc:
+ self.tc = tc
+ self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ self.assertEquals(0, len(self.tc.get_host_keys()))
+ self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
+
+ self.event.wait(1.0)
+ self.assertTrue(self.event.is_set())
+ self.assertTrue(self.ts.is_active())
+
+ self.assertTrue(self.tc._transport is not None)
+
+ self.assertTrue(self.tc._transport is None)
+
+ def test_7_banner_timeout(self):
+ """
+ verify that the SSHClient has a configurable banner timeout.
+ """
+ # Start the thread with a 1 second wait.
+ threading.Thread(target=self._run, kwargs={'delay': 1}).start()
+ host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+
+ self.tc = paramiko.SSHClient()
+ self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
+ # Connect with a half second banner timeout.
+ self.assertRaises(
+ paramiko.SSHException,
+ self.tc.connect,
+ self.addr,
+ self.port,
+ username='slowdive',
+ password='pygmalion',
+ banner_timeout=0.5
+ )
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
new file mode 100644
index 00000000..96c268d9
--- /dev/null
+++ b/tests/test_gssapi.py
@@ -0,0 +1,135 @@
+# Copyright (C) 2013-2014 science + computing ag
+# Author: Sebastian Deiss <sebastian.deiss@t-online.de>
+#
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Test the used APIs for GSS-API / SSPI authentication
+"""
+
+import unittest
+import socket
+
+
+class GSSAPITest(unittest.TestCase):
+ @staticmethod
+ def init(hostname=None, srv_mode=False):
+ global krb5_mech, targ_name, server_mode
+ krb5_mech = "1.2.840.113554.1.2.2"
+ targ_name = hostname
+ server_mode = srv_mode
+
+ def test_1_pyasn1(self):
+ """
+ Test the used methods of pyasn1.
+ """
+ from pyasn1.type.univ import ObjectIdentifier
+ from pyasn1.codec.der import encoder, decoder
+ oid = encoder.encode(ObjectIdentifier(krb5_mech))
+ mech, __ = decoder.decode(oid)
+ self.assertEquals(krb5_mech, mech.__str__())
+
+ def test_2_gssapi_sspi(self):
+ """
+ Test the used methods of python-gssapi or sspi, sspicon from pywin32.
+ """
+ _API = "MIT"
+ try:
+ import gssapi
+ except ImportError:
+ import sspicon
+ import sspi
+ _API = "SSPI"
+
+ c_token = None
+ gss_ctxt_status = False
+ mic_msg = b"G'day Mate!"
+
+ if _API == "MIT":
+ if server_mode:
+ gss_flags = (gssapi.C_PROT_READY_FLAG,
+ gssapi.C_INTEG_FLAG,
+ gssapi.C_MUTUAL_FLAG,
+ gssapi.C_DELEG_FLAG)
+ else:
+ gss_flags = (gssapi.C_PROT_READY_FLAG,
+ gssapi.C_INTEG_FLAG,
+ gssapi.C_DELEG_FLAG)
+ # Initialize a GSS-API context.
+ ctx = gssapi.Context()
+ ctx.flags = gss_flags
+ krb5_oid = gssapi.OID.mech_from_string(krb5_mech)
+ target_name = gssapi.Name("host@" + targ_name,
+ gssapi.C_NT_HOSTBASED_SERVICE)
+ gss_ctxt = gssapi.InitContext(peer_name=target_name,
+ mech_type=krb5_oid,
+ req_flags=ctx.flags)
+ if server_mode:
+ c_token = gss_ctxt.step(c_token)
+ gss_ctxt_status = gss_ctxt.established
+ self.assertEquals(False, gss_ctxt_status)
+ # Accept a GSS-API context.
+ gss_srv_ctxt = gssapi.AcceptContext()
+ s_token = gss_srv_ctxt.step(c_token)
+ gss_ctxt_status = gss_srv_ctxt.established
+ self.assertNotEquals(None, s_token)
+ self.assertEquals(True, gss_ctxt_status)
+ # Establish the client context
+ c_token = gss_ctxt.step(s_token)
+ self.assertEquals(None, c_token)
+ else:
+ while not gss_ctxt.established:
+ c_token = gss_ctxt.step(c_token)
+ self.assertNotEquals(None, c_token)
+ # Build MIC
+ mic_token = gss_ctxt.get_mic(mic_msg)
+
+ if server_mode:
+ # Check MIC
+ status = gss_srv_ctxt.verify_mic(mic_msg, mic_token)
+ self.assertEquals(0, status)
+ else:
+ gss_flags = sspicon.ISC_REQ_INTEGRITY |\
+ sspicon.ISC_REQ_MUTUAL_AUTH |\
+ sspicon.ISC_REQ_DELEGATE
+ # Initialize a GSS-API context.
+ target_name = "host/" + socket.getfqdn(targ_name)
+ gss_ctxt = sspi.ClientAuth("Kerberos",
+ scflags=gss_flags,
+ targetspn=target_name)
+ if server_mode:
+ error, token = gss_ctxt.authorize(c_token)
+ c_token = token[0].Buffer
+ self.assertEquals(0, error)
+ # Accept a GSS-API context.
+ gss_srv_ctxt = sspi.ServerAuth("Kerberos", spn=target_name)
+ error, token = gss_srv_ctxt.authorize(c_token)
+ s_token = token[0].Buffer
+ # Establish the context.
+ error, token = gss_ctxt.authorize(s_token)
+ c_token = token[0].Buffer
+ self.assertEquals(None, c_token)
+ self.assertEquals(0, error)
+ # Build MIC
+ mic_token = gss_ctxt.sign(mic_msg)
+ # Check MIC
+ gss_srv_ctxt.verify(mic_msg, mic_token)
+ else:
+ error, token = gss_ctxt.authorize(c_token)
+ c_token = token[0].Buffer
+ self.assertNotEquals(0, error)
diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py
index 0ee1bbf0..2bdcad9c 100644
--- a/tests/test_hostkeys.py
+++ b/tests/test_hostkeys.py
@@ -31,6 +31,7 @@ test_hosts_file = """\
secure.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA1PD6U2/TVxET6lkpKhOk5r\
9q/kAYG6sP9f5zuUYP8i7FOFp/6ncCEbbtg/lB+A3iidyxoSWl+9jtoyyDOOVX4UIDV9G11Ml8om3\
D+jrpI9cycZHqilK0HmxDeCuxbwyMuaCygU9gS2qoRvNLWZk70OpIKSSpBo0Wl3/XUmz9uhc=
+broken.example.com ssh-rsa AAAA
happy.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA8bP1ZA7DCZDB9J0s50l31M\
BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
5ymME3bQ4J/k1IKxCtz/bAlAqFgKoc+EolMziDYqWIATtW0rYTJvzGAzTmMj80/QpsFH+Pc2M=
diff --git a/tests/test_kex.py b/tests/test_kex.py
index c522be46..19804fbf 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -21,17 +21,18 @@ Some unit tests for the key exchange protocols.
"""
from binascii import hexlify
+import os
import unittest
+
import paramiko.util
from paramiko.kex_group1 import KexGroup1
-from paramiko.kex_gex import KexGex
+from paramiko.kex_gex import KexGex, KexGexSHA256
from paramiko import Message
from paramiko.common import byte_chr
-class FakeRng (object):
- def read(self, n):
- return byte_chr(0xcc) * n
+def dummy_urandom(n):
+ return byte_chr(0xcc) * n
class FakeKey (object):
@@ -41,7 +42,7 @@ class FakeKey (object):
def asbytes(self):
return b'fake-key'
- def sign_ssh_data(self, rng, H):
+ def sign_ssh_data(self, H):
return b'fake-sig'
@@ -53,8 +54,7 @@ class FakeModulusPack (object):
return self.G, self.P
-class FakeTransport (object):
- rng = FakeRng()
+class FakeTransport(object):
local_version = 'SSH-2.0-paramiko_1.0'
remote_version = 'SSH-2.0-lame'
local_kex_init = 'local-kex-init'
@@ -91,10 +91,11 @@ class KexTest (unittest.TestCase):
K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504
def setUp(self):
- pass
+ self._original_urandom = os.urandom
+ os.urandom = dummy_urandom
def tearDown(self):
- pass
+ os.urandom = self._original_urandom
def test_1_group1_client(self):
transport = FakeTransport()
@@ -251,3 +252,121 @@ class KexTest (unittest.TestCase):
self.assertEqual(H, hexlify(transport._H).upper())
self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
self.assertTrue(transport._activated)
+
+ def test_7_gex_sha256_client(self):
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexGexSHA256(transport)
+ kex.start_kex()
+ x = b'22000004000000080000002000'
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
+
+ msg = Message()
+ msg.add_mpint(FakeModulusPack.P)
+ msg.add_mpint(FakeModulusPack.G)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
+ x = b'20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4'
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
+
+ msg = Message()
+ msg.add_string('fake-host-key')
+ msg.add_mpint(69)
+ msg.add_string('fake-sig')
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
+ H = b'AD1A9365A67B4496F05594AD1BF656E3CDA0851289A4C1AFF549FEAE50896DF4'
+ self.assertEqual(self.K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify)
+ self.assertTrue(transport._activated)
+
+ def test_8_gex_sha256_old_client(self):
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexGexSHA256(transport)
+ kex.start_kex(_test_old_style=True)
+ x = b'1E00000800'
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
+
+ msg = Message()
+ msg.add_mpint(FakeModulusPack.P)
+ msg.add_mpint(FakeModulusPack.G)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
+ x = b'20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4'
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
+
+ msg = Message()
+ msg.add_string('fake-host-key')
+ msg.add_mpint(69)
+ msg.add_string('fake-sig')
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
+ H = b'518386608B15891AE5237DEE08DCADDE76A0BCEFCE7F6DB3AD66BC41D256DFE5'
+ self.assertEqual(self.K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual((b'fake-host-key', b'fake-sig'), transport._verify)
+ self.assertTrue(transport._activated)
+
+ def test_9_gex_sha256_server(self):
+ transport = FakeTransport()
+ transport.server_mode = True
+ kex = KexGexSHA256(transport)
+ kex.start_kex()
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect)
+
+ msg = Message()
+ msg.add_int(1024)
+ msg.add_int(2048)
+ msg.add_int(4096)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg)
+ x = b'1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102'
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
+
+ msg = Message()
+ msg.add_mpint(12345)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
+ H = b'CCAC0497CF0ABA1DBF55E1A3995D17F4CC31824B0E8D95CDF8A06F169D050D80'
+ x = b'210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967'
+ self.assertEqual(K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertTrue(transport._activated)
+
+ def test_10_gex_sha256_server_with_old_client(self):
+ transport = FakeTransport()
+ transport.server_mode = True
+ kex = KexGexSHA256(transport)
+ kex.start_kex()
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect)
+
+ msg = Message()
+ msg.add_int(2048)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg)
+ x = b'1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102'
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertEqual((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
+
+ msg = Message()
+ msg.add_mpint(12345)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581
+ H = b'3DDD2AD840AD095E397BA4D0573972DC60F6461FD38A187CACA6615A5BC8ADBB'
+ x = b'210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967'
+ self.assertEqual(K, transport._K)
+ self.assertEqual(H, hexlify(transport._H).upper())
+ self.assertEqual(x, hexlify(transport._message.asbytes()).upper())
+ self.assertTrue(transport._activated)
+
+
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
new file mode 100644
index 00000000..3bf788da
--- /dev/null
+++ b/tests/test_kex_gss.py
@@ -0,0 +1,131 @@
+# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
+# Copyright (C) 2013-2014 science + computing ag
+# Author: Sebastian Deiss <sebastian.deiss@t-online.de>
+#
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Unit Tests for the GSS-API / SSPI SSHv2 Diffie-Hellman Key Exchange and user
+authentication
+"""
+
+
+import socket
+import threading
+import unittest
+
+import paramiko
+
+
+class NullServer (paramiko.ServerInterface):
+
+ def get_allowed_auths(self, username):
+ return 'gssapi-keyex'
+
+ def check_auth_gssapi_keyex(self, username,
+ gss_authenticated=paramiko.AUTH_FAILED,
+ cc_file=None):
+ if gss_authenticated == paramiko.AUTH_SUCCESSFUL:
+ return paramiko.AUTH_SUCCESSFUL
+ return paramiko.AUTH_FAILED
+
+ def enable_auth_gssapi(self):
+ UseGSSAPI = True
+ return UseGSSAPI
+
+ def check_channel_request(self, kind, chanid):
+ return paramiko.OPEN_SUCCEEDED
+
+ def check_channel_exec_request(self, channel, command):
+ if command != 'yes':
+ return False
+ return True
+
+
+class GSSKexTest(unittest.TestCase):
+ @staticmethod
+ def init(username, hostname):
+ global krb5_principal, targ_name
+ krb5_principal = username
+ targ_name = hostname
+
+ def setUp(self):
+ self.username = krb5_principal
+ self.hostname = socket.getfqdn(targ_name)
+ self.sockl = socket.socket()
+ self.sockl.bind((targ_name, 0))
+ self.sockl.listen(1)
+ self.addr, self.port = self.sockl.getsockname()
+ self.event = threading.Event()
+ thread = threading.Thread(target=self._run)
+ thread.start()
+
+ def tearDown(self):
+ for attr in "tc ts socks sockl".split():
+ if hasattr(self, attr):
+ getattr(self, attr).close()
+
+ def _run(self):
+ self.socks, addr = self.sockl.accept()
+ self.ts = paramiko.Transport(self.socks, gss_kex=True)
+ host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ self.ts.add_server_key(host_key)
+ self.ts.set_gss_host(targ_name)
+ try:
+ self.ts.load_server_moduli()
+ except:
+ print ('(Failed to load moduli -- gex will be unsupported.)')
+ server = NullServer()
+ self.ts.start_server(self.event, server)
+
+ def test_1_gsskex_and_auth(self):
+ """
+ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated
+ Diffie-Hellman Key Exchange and user authentication with the GSS-API
+ context created during key exchange.
+ """
+ host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+
+ self.tc = paramiko.SSHClient()
+ self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port),
+ 'ssh-rsa', public_host_key)
+ self.tc.connect(self.hostname, self.port, username=self.username,
+ gss_auth=True, gss_kex=True)
+
+ self.event.wait(1.0)
+ self.assert_(self.event.is_set())
+ self.assert_(self.ts.is_active())
+ self.assertEquals(self.username, self.ts.get_username())
+ self.assertEquals(True, self.ts.is_authenticated())
+
+ stdin, stdout, stderr = self.tc.exec_command('yes')
+ schan = self.ts.accept(1.0)
+
+ schan.send('Hello there.\n')
+ schan.send_stderr('This is on stderr.\n')
+ schan.close()
+
+ self.assertEquals('Hello there.\n', stdout.readline())
+ self.assertEquals('', stdout.readline())
+ self.assertEquals('This is on stderr.\n', stderr.readline())
+ self.assertEquals('', stderr.readline())
+
+ stdin.close()
+ stdout.close()
+ stderr.close()
diff --git a/tests/test_message.py b/tests/test_message.py
index f308c037..f18cae90 100644
--- a/tests/test_message.py
+++ b/tests/test_message.py
@@ -92,12 +92,12 @@ class MessageTest (unittest.TestCase):
def test_4_misc(self):
msg = Message(self.__d)
- self.assertEqual(msg.get_int(), 5)
- self.assertEqual(msg.get_int(), 0x1122334455)
- self.assertEqual(msg.get_int(), 0xf00000000000000000)
+ self.assertEqual(msg.get_adaptive_int(), 5)
+ self.assertEqual(msg.get_adaptive_int(), 0x1122334455)
+ self.assertEqual(msg.get_adaptive_int(), 0xf00000000000000000)
self.assertEqual(msg.get_so_far(), self.__d[:29])
self.assertEqual(msg.get_remainder(), self.__d[29:])
msg.rewind()
- self.assertEqual(msg.get_int(), 5)
+ self.assertEqual(msg.get_adaptive_int(), 5)
self.assertEqual(msg.get_so_far(), self.__d[:4])
self.assertEqual(msg.get_remainder(), self.__d[4:])
diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py
index d4d5544e..8faec03c 100644
--- a/tests/test_packetizer.py
+++ b/tests/test_packetizer.py
@@ -21,9 +21,12 @@ Some unit tests for the ssh2 protocol in Transport.
"""
import unittest
+from hashlib import sha1
+
from tests.loop import LoopSocket
+
from Crypto.Cipher import AES
-from Crypto.Hash import SHA
+
from paramiko import Message, Packetizer, util
from paramiko.common import byte_chr, zero_byte
@@ -41,7 +44,7 @@ class PacketizerTest (unittest.TestCase):
p.set_log(util.get_logger('paramiko.transport'))
p.set_hexdump(True)
cipher = AES.new(zero_byte * 16, AES.MODE_CBC, x55 * 16)
- p.set_outbound_cipher(cipher, 16, SHA, 12, x1f * 20)
+ p.set_outbound_cipher(cipher, 16, sha1, 12, x1f * 20)
# message has to be at least 16 bytes long, so we'll have at least one
# block of data encrypted that contains zero random padding bytes
@@ -64,10 +67,56 @@ class PacketizerTest (unittest.TestCase):
p.set_log(util.get_logger('paramiko.transport'))
p.set_hexdump(True)
cipher = AES.new(zero_byte * 16, AES.MODE_CBC, x55 * 16)
- p.set_inbound_cipher(cipher, 16, SHA, 12, x1f * 20)
+ p.set_inbound_cipher(cipher, 16, sha1, 12, x1f * 20)
wsock.send(b'\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0\x90\xd2\x16\x56\x0d\x71\x73\x61\x38\x7c\x4c\x3d\xfb\x97\x7d\xe2\x6e\x03\xb1\xa0\xc2\x1c\xd6\x41\x41\x4c\xb4\x59')
cmd, m = p.read_message()
self.assertEqual(100, cmd)
self.assertEqual(100, m.get_int())
self.assertEqual(1, m.get_int())
self.assertEqual(900, m.get_int())
+
+ def test_3_closed(self):
+ rsock = LoopSocket()
+ wsock = LoopSocket()
+ rsock.link(wsock)
+ p = Packetizer(wsock)
+ p.set_log(util.get_logger('paramiko.transport'))
+ p.set_hexdump(True)
+ cipher = AES.new(zero_byte * 16, AES.MODE_CBC, x55 * 16)
+ p.set_outbound_cipher(cipher, 16, sha1, 12, x1f * 20)
+
+ # message has to be at least 16 bytes long, so we'll have at least one
+ # block of data encrypted that contains zero random padding bytes
+ m = Message()
+ m.add_byte(byte_chr(100))
+ m.add_int(100)
+ m.add_int(1)
+ m.add_int(900)
+ wsock.send = lambda x: 0
+ from functools import wraps
+ import errno
+ import os
+ import signal
+
+ class TimeoutError(Exception):
+ pass
+
+ def timeout(seconds=1, error_message=os.strerror(errno.ETIME)):
+ def decorator(func):
+ def _handle_timeout(signum, frame):
+ raise TimeoutError(error_message)
+
+ def wrapper(*args, **kwargs):
+ signal.signal(signal.SIGALRM, _handle_timeout)
+ signal.alarm(seconds)
+ try:
+ result = func(*args, **kwargs)
+ finally:
+ signal.alarm(0)
+ return result
+
+ return wraps(func)(wrapper)
+
+ return decorator
+ send = timeout()(p.send_message)
+ self.assertRaises(EOFError, send, m)
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 6ff68fc2..f673254f 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -20,11 +20,14 @@
Some unit tests for public/private key objects.
"""
-from binascii import hexlify
import unittest
+import os
+from binascii import hexlify
+from hashlib import md5
+
from paramiko import RSAKey, DSSKey, ECDSAKey, Message, util
from paramiko.py3compat import StringIO, byte_chr, b, bytes
-from paramiko.common import rng
+
from tests.util import test_path
# from openssh's ssh-keygen
@@ -90,8 +93,7 @@ class KeyTest (unittest.TestCase):
pass
def test_1_generate_key_bytes(self):
- from Crypto.Hash import MD5
- key = util.generate_key_bytes(MD5, x1234, 'happy birthday', 30)
+ key = util.generate_key_bytes(md5, x1234, 'happy birthday', 30)
exp = b'\x61\xE1\xF2\x72\xF4\xC1\xC4\x56\x15\x86\xBD\x32\x24\x98\xC0\xE9\x24\x67\x27\x80\xF4\x7B\xB3\x7D\xDA\x7D\x54\x01\x9E\x64'
self.assertEqual(exp, key)
@@ -166,7 +168,7 @@ class KeyTest (unittest.TestCase):
def test_8_sign_rsa(self):
# verify that the rsa private key can sign and verify
key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
- msg = key.sign_ssh_data(rng, b'ice weasels')
+ msg = key.sign_ssh_data(b'ice weasels')
self.assertTrue(type(msg) is Message)
msg.rewind()
self.assertEqual('ssh-rsa', msg.get_text())
@@ -179,7 +181,7 @@ class KeyTest (unittest.TestCase):
def test_9_sign_dss(self):
# verify that the dss private key can sign and verify
key = DSSKey.from_private_key_file(test_path('test_dss.key'))
- msg = key.sign_ssh_data(rng, b'ice weasels')
+ msg = key.sign_ssh_data(b'ice weasels')
self.assertTrue(type(msg) is Message)
msg.rewind()
self.assertEqual('ssh-dss', msg.get_text())
@@ -193,13 +195,13 @@ class KeyTest (unittest.TestCase):
def test_A_generate_rsa(self):
key = RSAKey.generate(1024)
- msg = key.sign_ssh_data(rng, b'jerri blank')
+ msg = key.sign_ssh_data(b'jerri blank')
msg.rewind()
self.assertTrue(key.verify_ssh_sig(b'jerri blank', msg))
def test_B_generate_dss(self):
key = DSSKey.generate(1024)
- msg = key.sign_ssh_data(rng, b'jerri blank')
+ msg = key.sign_ssh_data(b'jerri blank')
msg.rewind()
self.assertTrue(key.verify_ssh_sig(b'jerri blank', msg))
@@ -240,7 +242,7 @@ class KeyTest (unittest.TestCase):
def test_13_sign_ecdsa(self):
# verify that the rsa private key can sign and verify
key = ECDSAKey.from_private_key_file(test_path('test_ecdsa.key'))
- msg = key.sign_ssh_data(rng, b'ice weasels')
+ msg = key.sign_ssh_data(b'ice weasels')
self.assertTrue(type(msg) is Message)
msg.rewind()
self.assertEqual('ecdsa-sha2-nistp256', msg.get_text())
@@ -252,3 +254,20 @@ class KeyTest (unittest.TestCase):
msg.rewind()
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg))
+
+ def test_salt_size(self):
+ # Read an existing encrypted private key
+ file_ = test_path('test_rsa_password.key')
+ password = 'television'
+ newfile = file_ + '.new'
+ newpassword = 'radio'
+ key = RSAKey(filename=file_, password=password)
+ # Write out a newly re-encrypted copy with a new password.
+ # When the bug under test exists, this will ValueError.
+ try:
+ key.write_private_key_file(newfile, password=newpassword)
+ # Verify the inner key data still matches (when no ValueError)
+ key2 = RSAKey(filename=newfile, password=newpassword)
+ self.assertEqual(key, key2)
+ finally:
+ os.remove(newfile)
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index 2b6aa3b6..ff146ade 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -23,12 +23,13 @@ a real actual sftp server is contacted, and a new folder is created there to
do test file operations in (so no existing files will be harmed).
"""
-from binascii import hexlify
import os
+import socket
import sys
-import warnings
import threading
import unittest
+import warnings
+from binascii import hexlify
from tempfile import mkstemp
import paramiko
@@ -96,7 +97,7 @@ def get_sftp():
class SFTPTest (unittest.TestCase):
-
+ @staticmethod
def init(hostname, username, keyfile, passwd):
global sftp, tc
@@ -128,8 +129,8 @@ class SFTPTest (unittest.TestCase):
sys.stderr.write('\n')
sys.exit(1)
sftp = paramiko.SFTP.from_transport(t)
- init = staticmethod(init)
+ @staticmethod
def init_loopback():
global sftp, tc
@@ -149,12 +150,11 @@ class SFTPTest (unittest.TestCase):
event.wait(1.0)
sftp = paramiko.SFTP.from_transport(tc)
- init_loopback = staticmethod(init_loopback)
+ @staticmethod
def set_big_file_test(onoff):
global g_big_file_test
g_big_file_test = onoff
- set_big_file_test = staticmethod(set_big_file_test)
def setUp(self):
global FOLDER
@@ -195,6 +195,21 @@ class SFTPTest (unittest.TestCase):
pass
sftp = paramiko.SFTP.from_transport(tc)
+ def test_2_sftp_can_be_used_as_context_manager(self):
+ """
+ verify that the sftp session is closed when exiting the context manager
+ """
+ global sftp
+ with sftp:
+ pass
+ try:
+ sftp.open(FOLDER + '/test2', 'w')
+ self.fail('expected exception')
+ except (EOFError, socket.error):
+ pass
+ finally:
+ sftp = paramiko.SFTP.from_transport(tc)
+
def test_3_write(self):
"""
verify that a file can be created and written, and the size is correct.
@@ -279,8 +294,8 @@ class SFTPTest (unittest.TestCase):
def test_7_listdir(self):
"""
- verify that a folder can be created, a bunch of files can be placed in it,
- and those files show up in sftp.listdir.
+ verify that a folder can be created, a bunch of files can be placed in
+ it, and those files show up in sftp.listdir.
"""
try:
sftp.open(FOLDER + '/duck.txt', 'w').close()
@@ -298,6 +313,26 @@ class SFTPTest (unittest.TestCase):
sftp.remove(FOLDER + '/fish.txt')
sftp.remove(FOLDER + '/tertiary.py')
+ def test_7_5_listdir_iter(self):
+ """
+ listdir_iter version of above test
+ """
+ try:
+ sftp.open(FOLDER + '/duck.txt', 'w').close()
+ sftp.open(FOLDER + '/fish.txt', 'w').close()
+ sftp.open(FOLDER + '/tertiary.py', 'w').close()
+
+ x = [x.filename for x in sftp.listdir_iter(FOLDER)]
+ self.assertEqual(len(x), 3)
+ self.assertTrue('duck.txt' in x)
+ self.assertTrue('fish.txt' in x)
+ self.assertTrue('tertiary.py' in x)
+ self.assertTrue('random' not in x)
+ finally:
+ sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(FOLDER + '/fish.txt')
+ sftp.remove(FOLDER + '/tertiary.py')
+
def test_8_setstat(self):
"""
verify that the setstat functions (chown, chmod, utime, truncate) work.
@@ -575,7 +610,7 @@ class SFTPTest (unittest.TestCase):
with sftp.open(FOLDER + '/bunny.txt', 'rb') as f:
self.assertEqual(text, f.read(128))
- self.assertEqual((41, 41), saved_progress[-1])
+ self.assertEqual([(41, 41)], saved_progress)
os.unlink(localname)
fd, localname = mkstemp()
@@ -585,7 +620,7 @@ class SFTPTest (unittest.TestCase):
with open(localname, 'rb') as f:
self.assertEqual(text, f.read(128))
- self.assertEqual((41, 41), saved_progress[-1])
+ self.assertEqual([(41, 41)], saved_progress)
os.unlink(localname)
sftp.unlink(FOLDER + '/bunny.txt')
@@ -661,7 +696,8 @@ class SFTPTest (unittest.TestCase):
f.readv([(0, 12)])
with sftp.open(FOLDER + '/zero', 'r') as f:
- f.prefetch()
+ file_size = f.stat().st_size
+ f.prefetch(file_size)
f.read(100)
finally:
sftp.unlink(FOLDER + '/zero')
@@ -776,6 +812,11 @@ class SFTPTest (unittest.TestCase):
sftp.remove('%s/nonutf8data' % FOLDER)
+ def test_sftp_attributes_empty_str(self):
+ sftp_attributes = SFTPAttributes()
+ self.assertEqual(str(sftp_attributes), "?--------- 1 0 0 0 (unknown date) ?")
+
+
if __name__ == '__main__':
SFTPTest.init_loopback()
# logging is required by test_N_file_with_percent
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index abed27b8..cfad5682 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -132,7 +132,8 @@ class BigSFTPTest (unittest.TestCase):
start = time.time()
with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
- f.prefetch()
+ file_size = f.stat().st_size
+ f.prefetch(file_size)
# read on odd boundaries to make sure the bytes aren't getting scrambled
n = 0
@@ -171,7 +172,8 @@ class BigSFTPTest (unittest.TestCase):
chunk = 793
for i in range(10):
with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
- f.prefetch()
+ file_size = f.stat().st_size
+ f.prefetch(file_size)
base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
offsets = [base_offset + j * chunk for j in range(100)]
# randomly seek around and read them out
@@ -245,9 +247,11 @@ class BigSFTPTest (unittest.TestCase):
for i in range(10):
with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
- f.prefetch()
+ file_size = f.stat().st_size
+ f.prefetch(file_size)
with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
- f.prefetch()
+ file_size = f.stat().st_size
+ f.prefetch(file_size)
for n in range(1024):
data = f.read(1024)
self.assertEqual(data, kblob)
@@ -275,7 +279,8 @@ class BigSFTPTest (unittest.TestCase):
self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
- f.prefetch()
+ file_size = f.stat().st_size
+ f.prefetch(file_size)
data = f.read(1024)
self.assertEqual(data, kblob)
@@ -353,7 +358,8 @@ class BigSFTPTest (unittest.TestCase):
# try to read it too.
with sftp.open('%s/hongry.txt' % FOLDER, 'r', 128 * 1024) as f:
- f.prefetch()
+ file_size = f.stat().st_size
+ f.prefetch(file_size)
total = 0
while total < 1024 * 1024:
total += len(f.read(32 * 1024))
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
new file mode 100644
index 00000000..e20d348f
--- /dev/null
+++ b/tests/test_ssh_gss.py
@@ -0,0 +1,124 @@
+# Copyright (C) 2003-2007 Robey Pointer <robeypointer@gmail.com>
+# Copyright (C) 2013-2014 science + computing ag
+# Author: Sebastian Deiss <sebastian.deiss@t-online.de>
+#
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Unit Tests for the GSS-API / SSPI SSHv2 Authentication (gssapi-with-mic)
+"""
+
+import socket
+import threading
+import unittest
+
+import paramiko
+
+
+class NullServer (paramiko.ServerInterface):
+
+ def get_allowed_auths(self, username):
+ return 'gssapi-with-mic'
+
+ def check_auth_gssapi_with_mic(self, username,
+ gss_authenticated=paramiko.AUTH_FAILED,
+ cc_file=None):
+ if gss_authenticated == paramiko.AUTH_SUCCESSFUL:
+ return paramiko.AUTH_SUCCESSFUL
+ return paramiko.AUTH_FAILED
+
+ def enable_auth_gssapi(self):
+ UseGSSAPI = True
+ GSSAPICleanupCredentials = True
+ return UseGSSAPI
+
+ def check_channel_request(self, kind, chanid):
+ return paramiko.OPEN_SUCCEEDED
+
+ def check_channel_exec_request(self, channel, command):
+ if command != 'yes':
+ return False
+ return True
+
+
+class GSSAuthTest(unittest.TestCase):
+ @staticmethod
+ def init(username, hostname):
+ global krb5_principal, targ_name
+ krb5_principal = username
+ targ_name = hostname
+
+ def setUp(self):
+ self.username = krb5_principal
+ self.hostname = socket.getfqdn(targ_name)
+ self.sockl = socket.socket()
+ self.sockl.bind((targ_name, 0))
+ self.sockl.listen(1)
+ self.addr, self.port = self.sockl.getsockname()
+ self.event = threading.Event()
+ thread = threading.Thread(target=self._run)
+ thread.start()
+
+ def tearDown(self):
+ for attr in "tc ts socks sockl".split():
+ if hasattr(self, attr):
+ getattr(self, attr).close()
+
+ def _run(self):
+ self.socks, addr = self.sockl.accept()
+ self.ts = paramiko.Transport(self.socks)
+ host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ self.ts.add_server_key(host_key)
+ server = NullServer()
+ self.ts.start_server(self.event, server)
+
+ def test_1_gss_auth(self):
+ """
+ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication
+ (gssapi-with-mic) in client and server mode.
+ """
+ host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = paramiko.RSAKey(data=host_key.asbytes())
+
+ self.tc = paramiko.SSHClient()
+ self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port),
+ 'ssh-rsa', public_host_key)
+ self.tc.connect(self.hostname, self.port, username=self.username,
+ gss_auth=True)
+
+ self.event.wait(1.0)
+ self.assert_(self.event.is_set())
+ self.assert_(self.ts.is_active())
+ self.assertEquals(self.username, self.ts.get_username())
+ self.assertEquals(True, self.ts.is_authenticated())
+
+ stdin, stdout, stderr = self.tc.exec_command('yes')
+ schan = self.ts.accept(1.0)
+
+ schan.send('Hello there.\n')
+ schan.send_stderr('This is on stderr.\n')
+ schan.close()
+
+ self.assertEquals('Hello there.\n', stdout.readline())
+ self.assertEquals('', stdout.readline())
+ self.assertEquals('This is on stderr.\n', stderr.readline())
+ self.assertEquals('', stderr.readline())
+
+ stdin.close()
+ stdout.close()
+ stderr.close()
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 485a18e8..5069e5b0 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -20,22 +20,28 @@
Some unit tests for the ssh2 protocol in Transport.
"""
+from __future__ import with_statement
+
from binascii import hexlify
import select
import socket
import time
import threading
import random
+from hashlib import sha1
+import unittest
from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \
- SSHException, ChannelException
+ SSHException, ChannelException, Packetizer
from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
-from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST
+from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, \
+ MIN_PACKET_SIZE, MIN_WINDOW_SIZE, MAX_WINDOW_SIZE, \
+ DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE
from paramiko.py3compat import bytes
from paramiko.message import Message
from tests.loop import LoopSocket
-from tests.util import ParamikoTest, test_path
+from tests.util import test_path
LONG_BANNER = """\
@@ -55,7 +61,7 @@ class NullServer (ServerInterface):
paranoid_did_password = False
paranoid_did_public_key = False
paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key'))
-
+
def get_allowed_auths(self, username):
if username == 'slowdive':
return 'publickey,password'
@@ -72,30 +78,30 @@ class NullServer (ServerInterface):
return OPEN_SUCCEEDED
def check_channel_exec_request(self, channel, command):
- if command != 'yes':
+ if command != b'yes':
return False
return True
def check_channel_shell_request(self, channel):
return True
-
+
def check_global_request(self, kind, msg):
self._global_request = kind
return False
-
+
def check_channel_x11_request(self, channel, single_connection, auth_protocol, auth_cookie, screen_number):
self._x11_single_connection = single_connection
self._x11_auth_protocol = auth_protocol
self._x11_auth_cookie = auth_cookie
self._x11_screen_number = screen_number
return True
-
+
def check_port_forward_request(self, addr, port):
self._listen = socket.socket()
self._listen.bind(('127.0.0.1', 0))
self._listen.listen(1)
return self._listen.getsockname()[1]
-
+
def cancel_port_forward_request(self, addr, port):
self._listen.close()
self._listen = None
@@ -105,7 +111,7 @@ class NullServer (ServerInterface):
return OPEN_SUCCEEDED
-class TransportTest(ParamikoTest):
+class TransportTest(unittest.TestCase):
def setUp(self):
self.socks = LoopSocket()
self.sockc = LoopSocket()
@@ -123,20 +129,20 @@ class TransportTest(ParamikoTest):
host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
-
+
if client_options is not None:
client_options(self.tc.get_security_options())
if server_options is not None:
server_options(self.ts.get_security_options())
-
+
event = threading.Event()
self.server = NullServer()
- self.assertTrue(not event.isSet())
+ self.assertTrue(not event.is_set())
self.ts.start_server(event, self.server)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
- self.assertTrue(event.isSet())
+ self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
def test_1_security_options(self):
@@ -155,7 +161,7 @@ class TransportTest(ParamikoTest):
self.assertTrue(False)
except TypeError:
pass
-
+
def test_2_compute_key(self):
self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929
self.tc.H = b'\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3'
@@ -175,7 +181,7 @@ class TransportTest(ParamikoTest):
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
- self.assertTrue(not event.isSet())
+ self.assertTrue(not event.is_set())
self.assertEqual(None, self.tc.get_username())
self.assertEqual(None, self.ts.get_username())
self.assertEqual(False, self.tc.is_authenticated())
@@ -184,7 +190,7 @@ class TransportTest(ParamikoTest):
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
- self.assertTrue(event.isSet())
+ self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.tc.get_username())
self.assertEqual('slowdive', self.ts.get_username())
@@ -200,15 +206,15 @@ class TransportTest(ParamikoTest):
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
- self.assertTrue(not event.isSet())
+ self.assertTrue(not event.is_set())
self.socks.send(LONG_BANNER)
self.ts.start_server(event, server)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
- self.assertTrue(event.isSet())
+ self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
-
+
def test_4_special(self):
"""
verify that the client can demand odd handshake settings, and can
@@ -222,7 +228,7 @@ class TransportTest(ParamikoTest):
self.assertEqual('aes256-cbc', self.tc.remote_cipher)
self.assertEqual(12, self.tc.packetizer.get_mac_size_out())
self.assertEqual(12, self.tc.packetizer.get_mac_size_in())
-
+
self.tc.send_ignore(1024)
self.tc.renegotiate_keys()
self.ts.send_ignore(1024)
@@ -236,7 +242,7 @@ class TransportTest(ParamikoTest):
self.tc.set_keepalive(1)
time.sleep(2)
self.assertEqual('keepalive@lag.net', self.server._global_request)
-
+
def test_6_exec_command(self):
"""
verify that exec_command() does something reasonable.
@@ -246,11 +252,11 @@ class TransportTest(ParamikoTest):
chan = self.tc.open_session()
schan = self.ts.accept(1.0)
try:
- chan.exec_command('no')
+ chan.exec_command(b'command contains \xfc and is not a valid UTF-8 string')
self.assertTrue(False)
except SSHException:
pass
-
+
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
@@ -264,7 +270,7 @@ class TransportTest(ParamikoTest):
f = chan.makefile_stderr()
self.assertEqual('This is on stderr.\n', f.readline())
self.assertEqual('', f.readline())
-
+
# now try it with combined stdout/stderr
chan = self.tc.open_session()
chan.exec_command('yes')
@@ -273,11 +279,27 @@ class TransportTest(ParamikoTest):
schan.send_stderr('This is on stderr.\n')
schan.close()
- chan.set_combine_stderr(True)
+ chan.set_combine_stderr(True)
f = chan.makefile()
self.assertEqual('Hello there.\n', f.readline())
self.assertEqual('This is on stderr.\n', f.readline())
self.assertEqual('', f.readline())
+
+ def test_6a_channel_can_be_used_as_context_manager(self):
+ """
+ verify that exec_command() does something reasonable.
+ """
+ self.setup_test_server()
+
+ with self.tc.open_session() as chan:
+ with self.ts.accept(1.0) as schan:
+ chan.exec_command('yes')
+ schan.send('Hello there.\n')
+ schan.close()
+
+ f = chan.makefile()
+ self.assertEqual('Hello there.\n', f.readline())
+ self.assertEqual('', f.readline())
def test_7_invoke_shell(self):
"""
@@ -320,7 +342,7 @@ class TransportTest(ParamikoTest):
schan.shutdown_write()
schan.send_exit_status(23)
schan.close()
-
+
f = chan.makefile()
self.assertEqual('Hello there.\n', f.readline())
self.assertEqual('', f.readline())
@@ -342,14 +364,14 @@ class TransportTest(ParamikoTest):
chan.invoke_shell()
schan = self.ts.accept(1.0)
- # nothing should be ready
+ # nothing should be ready
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
self.assertEqual([], w)
self.assertEqual([], e)
-
+
schan.send('hello\n')
-
+
# something should be ready now (give it 1 second to appear)
for i in range(10):
r, w, e = select.select([chan], [], [], 0.1)
@@ -361,7 +383,7 @@ class TransportTest(ParamikoTest):
self.assertEqual([], e)
self.assertEqual(b'hello\n', chan.recv(6))
-
+
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
@@ -369,7 +391,7 @@ class TransportTest(ParamikoTest):
self.assertEqual([], e)
schan.close()
-
+
# detect eof?
for i in range(10):
r, w, e = select.select([chan], [], [], 0.1)
@@ -380,14 +402,14 @@ class TransportTest(ParamikoTest):
self.assertEqual([], w)
self.assertEqual([], e)
self.assertEqual(bytes(), chan.recv(16))
-
+
# make sure the pipe is still open for now...
p = chan._pipe
self.assertEqual(False, p._closed)
chan.close()
# ...and now is closed.
self.assertEqual(True, p._closed)
-
+
def test_B_renegotiate(self):
"""
verify that a transport can correctly renegotiate mid-stream.
@@ -402,7 +424,7 @@ class TransportTest(ParamikoTest):
for i in range(20):
chan.send('x' * 1024)
chan.close()
-
+
# allow a few seconds for the rekeying to complete
for i in range(50):
if self.tc.H != self.tc.session_id:
@@ -426,9 +448,11 @@ class TransportTest(ParamikoTest):
bytes = self.tc.packetizer._Packetizer__sent_bytes
chan.send('x' * 1024)
bytes2 = self.tc.packetizer._Packetizer__sent_bytes
+ block_size = self.tc._cipher_info[self.tc.local_cipher]['block-size']
+ mac_size = self.tc._mac_info[self.tc.local_mac]['size']
# tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :)
self.assertTrue(bytes2 - bytes < 1024)
- self.assertEqual(52, bytes2 - bytes)
+ self.assertEqual(16 + block_size + mac_size, bytes2 - bytes)
chan.close()
schan.close()
@@ -441,28 +465,28 @@ class TransportTest(ParamikoTest):
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
-
+
requested = []
def handler(c, addr_port):
addr, port = addr_port
requested.append((addr, port))
self.tc._queue_incoming_channel(c)
-
+
self.assertEqual(None, getattr(self.server, '_x11_screen_number', None))
cookie = chan.request_x11(0, single_connection=True, handler=handler)
self.assertEqual(0, self.server._x11_screen_number)
self.assertEqual('MIT-MAGIC-COOKIE-1', self.server._x11_auth_protocol)
self.assertEqual(cookie, self.server._x11_auth_cookie)
self.assertEqual(True, self.server._x11_single_connection)
-
+
x11_server = self.ts.open_x11_channel(('localhost', 6093))
x11_client = self.tc.accept()
self.assertEqual('localhost', requested[0][0])
self.assertEqual(6093, requested[0][1])
-
+
x11_server.send('hello')
self.assertEqual(b'hello', x11_client.recv(5))
-
+
x11_server.close()
x11_client.close()
chan.close()
@@ -477,13 +501,13 @@ class TransportTest(ParamikoTest):
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
-
+
requested = []
def handler(c, origin_addr_port, server_addr_port):
requested.append(origin_addr_port)
requested.append(server_addr_port)
self.tc._queue_incoming_channel(c)
-
+
port = self.tc.request_port_forward('127.0.0.1', 0, handler)
self.assertEqual(port, self.server._listen.getsockname()[1])
@@ -492,14 +516,14 @@ class TransportTest(ParamikoTest):
ss, _ = self.server._listen.accept()
sch = self.ts.open_forwarded_tcpip_channel(ss.getsockname(), ss.getpeername())
cch = self.tc.accept()
-
+
sch.send('hello')
self.assertEqual(b'hello', cch.recv(5))
sch.close()
cch.close()
ss.close()
cs.close()
-
+
# now cancel it.
self.tc.cancel_port_forward('127.0.0.1', port)
self.assertTrue(self.server._listen is None)
@@ -513,7 +537,7 @@ class TransportTest(ParamikoTest):
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
-
+
# open a port on the "server" that the client will ask to forward to.
greeting_server = socket.socket()
greeting_server.bind(('127.0.0.1', 0))
@@ -524,13 +548,13 @@ class TransportTest(ParamikoTest):
sch = self.ts.accept(1.0)
cch = socket.socket()
cch.connect(self.server._tcpip_dest)
-
+
ss, _ = greeting_server.accept()
ss.send(b'Hello!\n')
ss.close()
sch.send(cch.recv(8192))
sch.close()
-
+
self.assertEqual(b'Hello!\n', cs.recv(7))
cs.close()
@@ -544,14 +568,14 @@ class TransportTest(ParamikoTest):
chan.invoke_shell()
schan = self.ts.accept(1.0)
- # nothing should be ready
+ # nothing should be ready
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
self.assertEqual([], w)
self.assertEqual([], e)
-
+
schan.send_stderr('hello\n')
-
+
# something should be ready now (give it 1 second to appear)
for i in range(10):
r, w, e = select.select([chan], [], [], 0.1)
@@ -563,7 +587,7 @@ class TransportTest(ParamikoTest):
self.assertEqual([], e)
self.assertEqual(b'hello\n', chan.recv_stderr(6))
-
+
# and, should be dead again now
r, w, e = select.select([chan], [], [], 0.1)
self.assertEqual([], r)
@@ -585,12 +609,13 @@ class TransportTest(ParamikoTest):
self.assertEqual(chan.send_ready(), True)
total = 0
K = '*' * 1024
- while total < 1024 * 1024:
+ limit = 1+(64 * 2 ** 15)
+ while total < limit:
chan.send(K)
total += len(K)
if not chan.send_ready():
break
- self.assertTrue(total < 1024 * 1024)
+ self.assertTrue(total < limit)
schan.close()
chan.close()
@@ -599,10 +624,10 @@ class TransportTest(ParamikoTest):
def test_I_rekey_deadlock(self):
"""
Regression test for deadlock when in-transit messages are received after MSG_KEXINIT is sent
-
+
Note: When this test fails, it may leak threads.
"""
-
+
# Test for an obscure deadlocking bug that can occur if we receive
# certain messages while initiating a key exchange.
#
@@ -619,7 +644,7 @@ class TransportTest(ParamikoTest):
# NeedRekeyException.
# 4. In response to NeedRekeyException, the transport thread sends
# MSG_KEXINIT to the remote host.
- #
+ #
# On the remote host (using any SSH implementation):
# 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST is sent.
# 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is sent.
@@ -654,11 +679,11 @@ class TransportTest(ParamikoTest):
self.done_event = done_event
self.watchdog_event = threading.Event()
self.last = None
-
+
def run(self):
try:
for i in range(1, 1+self.iterations):
- if self.done_event.isSet():
+ if self.done_event.is_set():
break
self.watchdog_event.set()
#print i, "SEND"
@@ -666,7 +691,7 @@ class TransportTest(ParamikoTest):
finally:
self.done_event.set()
self.watchdog_event.set()
-
+
class ReceiveThread(threading.Thread):
def __init__(self, chan, done_event):
threading.Thread.__init__(self, None, None, self.__class__.__name__)
@@ -674,10 +699,10 @@ class TransportTest(ParamikoTest):
self.chan = chan
self.done_event = done_event
self.watchdog_event = threading.Event()
-
+
def run(self):
try:
- while not self.done_event.isSet():
+ while not self.done_event.is_set():
if self.chan.recv_ready():
chan.recv(65536)
self.watchdog_event.set()
@@ -687,10 +712,10 @@ class TransportTest(ParamikoTest):
finally:
self.done_event.set()
self.watchdog_event.set()
-
+
self.setup_test_server()
self.ts.packetizer.REKEY_BYTES = 2048
-
+
chan = self.tc.open_session()
chan.exec_command('yes')
schan = self.ts.accept(1.0)
@@ -712,7 +737,7 @@ class TransportTest(ParamikoTest):
self._send_message(m2)
return _negotiate_keys(self, m)
self.tc._handler_table[MSG_KEXINIT] = _negotiate_keys_wrapper
-
+
# Parameters for the test
iterations = 500 # The deadlock does not happen every time, but it
# should after many iterations.
@@ -724,23 +749,23 @@ class TransportTest(ParamikoTest):
# Start the sending thread
st = SendThread(schan, iterations, done_event)
st.start()
-
+
# Start the receiving thread
rt = ReceiveThread(chan, done_event)
rt.start()
- # Act as a watchdog timer, checking
+ # Act as a watchdog timer, checking
deadlocked = False
- while not deadlocked and not done_event.isSet():
+ while not deadlocked and not done_event.is_set():
for event in (st.watchdog_event, rt.watchdog_event):
event.wait(timeout)
- if done_event.isSet():
+ if done_event.is_set():
break
- if not event.isSet():
+ if not event.is_set():
deadlocked = True
break
event.clear()
-
+
# Tell the threads to stop (if they haven't already stopped). Note
# that if one or more threads are deadlocked, they might hang around
# forever (until the process exits).
@@ -752,3 +777,54 @@ class TransportTest(ParamikoTest):
# Close the channels
schan.close()
chan.close()
+
+ def test_J_sanitze_packet_size(self):
+ """
+ verify that we conform to the rfc of packet and window sizes.
+ """
+ for val, correct in [(4095, MIN_PACKET_SIZE),
+ (None, DEFAULT_MAX_PACKET_SIZE),
+ (2**32, MAX_WINDOW_SIZE)]:
+ self.assertEqual(self.tc._sanitize_packet_size(val), correct)
+
+ def test_K_sanitze_window_size(self):
+ """
+ verify that we conform to the rfc of packet and window sizes.
+ """
+ for val, correct in [(32767, MIN_WINDOW_SIZE),
+ (None, DEFAULT_WINDOW_SIZE),
+ (2**32, MAX_WINDOW_SIZE)]:
+ self.assertEqual(self.tc._sanitize_window_size(val), correct)
+
+ def test_L_handshake_timeout(self):
+ """
+ verify that we can get a hanshake timeout.
+ """
+ # Tweak client Transport instance's Packetizer instance so
+ # its read_message() sleeps a bit. This helps prevent race conditions
+ # where the client Transport's timeout timer thread doesn't even have
+ # time to get scheduled before the main client thread finishes
+ # handshaking with the server.
+ # (Doing this on the server's transport *sounds* more 'correct' but
+ # actually doesn't work nearly as well for whatever reason.)
+ class SlowPacketizer(Packetizer):
+ def read_message(self):
+ time.sleep(1)
+ return super(SlowPacketizer, self).read_message()
+ # NOTE: prettttty sure since the replaced .packetizer Packetizer is now
+ # no longer doing anything with its copy of the socket...everything'll
+ # be fine. Even tho it's a bit squicky.
+ self.tc.packetizer = SlowPacketizer(self.tc.sock)
+ # Continue with regular test red tape.
+ host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ public_host_key = RSAKey(data=host_key.asbytes())
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assertTrue(not event.is_set())
+ self.tc.handshake_timeout = 0.000000000001
+ self.ts.start_server(event, server)
+ self.assertRaises(EOFError, self.tc.connect,
+ hostkey=public_host_key,
+ username='slowdive',
+ password='pygmalion')
diff --git a/tests/test_util.py b/tests/test_util.py
index 0e7d0b2b..a6a2c30b 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -23,13 +23,14 @@ Some unit tests for utility functions.
from binascii import hexlify
import errno
import os
-from Crypto.Hash import SHA
+from hashlib import sha1
+import unittest
+
import paramiko.util
from paramiko.util import lookup_ssh_host_config as host_config, safe_string
from paramiko.py3compat import StringIO, byte_ord, b
-from tests.util import ParamikoTest
-
+# Note some lines in this configuration have trailing spaces on purpose
test_config_file = """\
Host *
User robey
@@ -40,7 +41,12 @@ Host *.example.com
\tUser bjork
Port=3333
Host *
- \t \t Crazy something dumb
+"""
+
+dont_strip_whitespace_please = "\t \t Crazy something dumb "
+
+test_config_file += dont_strip_whitespace_please
+test_config_file += """
Host spoo.example.com
Crazy something else
"""
@@ -59,7 +65,7 @@ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
from paramiko import *
-class UtilTest(ParamikoTest):
+class UtilTest(unittest.TestCase):
def test_1_import(self):
"""
verify that all the classes can be imported from paramiko.
@@ -105,7 +111,7 @@ class UtilTest(ParamikoTest):
self.assertEqual(config._config,
[{'host': ['*'], 'config': {}}, {'host': ['*'], 'config': {'identityfile': ['~/.ssh/id_rsa'], 'user': 'robey'}},
{'host': ['*.example.com'], 'config': {'user': 'bjork', 'port': '3333'}},
- {'host': ['*'], 'config': {'crazy': 'something dumb '}},
+ {'host': ['*'], 'config': {'crazy': 'something dumb'}},
{'host': ['spoo.example.com'], 'config': {'crazy': 'something else'}}])
def test_3_host_config(self):
@@ -114,14 +120,14 @@ class UtilTest(ParamikoTest):
config = paramiko.util.parse_ssh_config(f)
for host, values in {
- 'irc.danger.com': {'crazy': 'something dumb ',
+ 'irc.danger.com': {'crazy': 'something dumb',
'hostname': 'irc.danger.com',
'user': 'robey'},
- 'irc.example.com': {'crazy': 'something dumb ',
+ 'irc.example.com': {'crazy': 'something dumb',
'hostname': 'irc.example.com',
'user': 'robey',
'port': '3333'},
- 'spoo.example.com': {'crazy': 'something dumb ',
+ 'spoo.example.com': {'crazy': 'something dumb',
'hostname': 'spoo.example.com',
'user': 'robey',
'port': '3333'}
@@ -136,7 +142,7 @@ class UtilTest(ParamikoTest):
)
def test_4_generate_key_bytes(self):
- x = paramiko.util.generate_key_bytes(SHA, b'ABCDEFGH', 'This is my secret passphrase.', 64)
+ x = paramiko.util.generate_key_bytes(sha1, b'ABCDEFGH', 'This is my secret passphrase.', 64)
hex = ''.join(['%02x' % byte_ord(c) for c in x])
self.assertEqual(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')
@@ -153,12 +159,6 @@ class UtilTest(ParamikoTest):
finally:
os.unlink('hostfile.temp')
- def test_6_random(self):
- from paramiko.common import rng
- # just verify that we can pull out 32 bytes and not get an exception.
- x = rng.read(32)
- self.assertEqual(len(x), 32)
-
def test_7_host_config_expose_issue_33(self):
test_config_file = """
Host www13.*
@@ -339,12 +339,122 @@ IdentityFile something_%l_using_fqdn
config = paramiko.util.parse_ssh_config(StringIO(test_config))
assert config.lookup('meh') # will die during lookup() if bug regresses
+ def test_clamp_value(self):
+ self.assertEqual(32768, paramiko.util.clamp_value(32767, 32768, 32769))
+ self.assertEqual(32767, paramiko.util.clamp_value(32767, 32765, 32769))
+ self.assertEqual(32769, paramiko.util.clamp_value(32767, 32770, 32769))
+
def test_13_config_dos_crlf_succeeds(self):
config_file = StringIO("host abcqwerty\r\nHostName 127.0.0.1\r\n")
config = paramiko.SSHConfig()
config.parse(config_file)
self.assertEqual(config.lookup("abcqwerty")["hostname"], "127.0.0.1")
+ def test_14_get_hostnames(self):
+ f = StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ self.assertEqual(config.get_hostnames(), set(['*', '*.example.com', 'spoo.example.com']))
+
+ def test_quoted_host_names(self):
+ test_config_file = """\
+Host "param pam" param "pam"
+ Port 1111
+
+Host "param2"
+ Port 2222
+
+Host param3 parara
+ Port 3333
+
+Host param4 "p a r" "p" "par" para
+ Port 4444
+"""
+ res = {
+ 'param pam': {'hostname': 'param pam', 'port': '1111'},
+ 'param': {'hostname': 'param', 'port': '1111'},
+ 'pam': {'hostname': 'pam', 'port': '1111'},
+
+ 'param2': {'hostname': 'param2', 'port': '2222'},
+
+ 'param3': {'hostname': 'param3', 'port': '3333'},
+ 'parara': {'hostname': 'parara', 'port': '3333'},
+
+ 'param4': {'hostname': 'param4', 'port': '4444'},
+ 'p a r': {'hostname': 'p a r', 'port': '4444'},
+ 'p': {'hostname': 'p', 'port': '4444'},
+ 'par': {'hostname': 'par', 'port': '4444'},
+ 'para': {'hostname': 'para', 'port': '4444'},
+ }
+ f = StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ for host, values in res.items():
+ self.assertEquals(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ values
+ )
+
+ def test_quoted_params_in_config(self):
+ test_config_file = """\
+Host "param pam" param "pam"
+ IdentityFile id_rsa
+
+Host "param2"
+ IdentityFile "test rsa key"
+
+Host param3 parara
+ IdentityFile id_rsa
+ IdentityFile "test rsa key"
+"""
+ res = {
+ 'param pam': {'hostname': 'param pam', 'identityfile': ['id_rsa']},
+ 'param': {'hostname': 'param', 'identityfile': ['id_rsa']},
+ 'pam': {'hostname': 'pam', 'identityfile': ['id_rsa']},
+
+ 'param2': {'hostname': 'param2', 'identityfile': ['test rsa key']},
+
+ 'param3': {'hostname': 'param3', 'identityfile': ['id_rsa', 'test rsa key']},
+ 'parara': {'hostname': 'parara', 'identityfile': ['id_rsa', 'test rsa key']},
+ }
+ f = StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ for host, values in res.items():
+ self.assertEquals(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ values
+ )
+
+ def test_quoted_host_in_config(self):
+ conf = SSHConfig()
+ correct_data = {
+ 'param': ['param'],
+ '"param"': ['param'],
+
+ 'param pam': ['param', 'pam'],
+ '"param" "pam"': ['param', 'pam'],
+ '"param" pam': ['param', 'pam'],
+ 'param "pam"': ['param', 'pam'],
+
+ 'param "pam" p': ['param', 'pam', 'p'],
+ '"param" pam "p"': ['param', 'pam', 'p'],
+
+ '"pa ram"': ['pa ram'],
+ '"pa ram" pam': ['pa ram', 'pam'],
+ 'param "p a m"': ['param', 'p a m'],
+ }
+ incorrect_data = [
+ 'param"',
+ '"param',
+ 'param "pam',
+ 'param "pam" "p a',
+ ]
+ for host, values in correct_data.items():
+ self.assertEquals(
+ conf._get_hosts(host),
+ values
+ )
+ for host in incorrect_data:
+ self.assertRaises(Exception, conf._get_hosts, host)
+
def test_safe_string(self):
vanilla = b("vanilla")
has_bytes = b("has \7\3 bytes")
@@ -355,3 +465,23 @@ IdentityFile something_%l_using_fqdn
assert safe_vanilla == vanilla, err.format(safe_vanilla, vanilla)
assert safe_has_bytes == expected_bytes, \
err.format(safe_has_bytes, expected_bytes)
+
+ def test_proxycommand_none_issue_418(self):
+ test_config_file = """
+Host proxycommand-standard-none
+ ProxyCommand None
+
+Host proxycommand-with-equals-none
+ ProxyCommand=None
+ """
+ for host, values in {
+ 'proxycommand-standard-none': {'hostname': 'proxycommand-standard-none'},
+ 'proxycommand-with-equals-none': {'hostname': 'proxycommand-with-equals-none'}
+ }.items():
+
+ f = StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ self.assertEqual(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ values
+ )
diff --git a/tests/util.py b/tests/util.py
index 66d2696c..b546a7e1 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -1,17 +1,7 @@
import os
-import unittest
root_path = os.path.dirname(os.path.realpath(__file__))
-
-class ParamikoTest(unittest.TestCase):
- # for Python 2.3 and below
- if not hasattr(unittest.TestCase, 'assertTrue'):
- assertTrue = unittest.TestCase.failUnless
- if not hasattr(unittest.TestCase, 'assertFalse'):
- assertFalse = unittest.TestCase.failIf
-
-
def test_path(filename):
return os.path.join(root_path, filename)