diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/__init__.py | 37 | ||||
-rw-r--r-- | tests/conftest.py | 103 | ||||
-rw-r--r-- | tests/loop.py | 6 | ||||
-rw-r--r-- | tests/stub_sftp.py | 1 | ||||
-rw-r--r-- | tests/test_auth.py | 13 | ||||
-rw-r--r-- | tests/test_buffered_pipe.py | 1 | ||||
-rw-r--r-- | tests/test_client.py | 189 | ||||
-rw-r--r--[-rwxr-xr-x] | tests/test_file.py | 8 | ||||
-rw-r--r-- | tests/test_gssapi.py | 33 | ||||
-rw-r--r-- | tests/test_hostkeys.py | 1 | ||||
-rw-r--r-- | tests/test_kex.py | 5 | ||||
-rw-r--r-- | tests/test_kex_gss.py | 3 | ||||
-rw-r--r-- | tests/test_message.py | 1 | ||||
-rw-r--r-- | tests/test_packetizer.py | 5 | ||||
-rw-r--r-- | tests/test_pkey.py | 67 | ||||
-rw-r--r--[-rwxr-xr-x] | tests/test_sftp.py | 656 | ||||
-rw-r--r-- | tests/test_sftp_big.py | 163 | ||||
-rw-r--r-- | tests/test_ssh_gss.py | 32 | ||||
-rw-r--r-- | tests/test_transport.py | 22 | ||||
-rw-r--r-- | tests/test_util.py | 11 | ||||
-rw-r--r-- | tests/util.py | 29 |
21 files changed, 709 insertions, 677 deletions
diff --git a/tests/__init__.py b/tests/__init__.py index 8878f14d..be1d2daa 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,36 +1 @@ -# Copyright (C) 2017 Martin Packman <gzlist@googlemail.com> -# -# This file is part of paramiko. -# -# Paramiko is free software; you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation; either version 2.1 of the License, or (at your option) -# any later version. -# -# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY -# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR -# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. - -"""Base classes and helpers for testing paramiko.""" - -import unittest - -from paramiko.py3compat import ( - builtins, - ) - - -def skipUnlessBuiltin(name): - """Skip decorated test if builtin name does not exist.""" - if getattr(builtins, name, None) is None: - skip = getattr(unittest, "skip", None) - if skip is None: - # Python 2.6 pseudo-skip - return lambda func: None - return skip("No builtin " + repr(name)) - return lambda func: func +# This file's just here so test modules can use explicit-relative imports. diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..d1967a73 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,103 @@ +import logging +import os +import shutil +import threading + +import pytest +from paramiko import RSAKey, SFTPServer, SFTP, Transport + +from .loop import LoopSocket +from .stub_sftp import StubServer, StubSFTPServer +from .util import _support + + +# TODO: not a huge fan of conftest.py files, see if we can move these somewhere +# 'nicer'. + + +# Perform logging by default; pytest will capture and thus hide it normally, +# presenting it on error/failure. (But also allow turning it off when doing +# very pinpoint debugging - e.g. using breakpoints, so you don't want output +# hiding enabled, but also don't want all the logging to gum up the terminal.) +if not os.environ.get('DISABLE_LOGGING', False): + logging.basicConfig( + level=logging.DEBUG, + # Also make sure to set up timestamping for more sanity when debugging. + format="[%(relativeCreated)s]\t%(levelname)s:%(name)s:%(message)s", + datefmt="%H:%M:%S", + ) + + +def make_sftp_folder(): + """ + Ensure expected target temp folder exists on the remote end. + + Will clean it out if it already exists. + """ + # TODO: go back to using the sftp functionality itself for folder setup so + # we can test against live SFTP servers again someday. (Not clear if anyone + # is/was using the old capability for such, though...) + # TODO: something that would play nicer with concurrent testing (but + # probably e.g. using thread ID or UUIDs or something; not the "count up + # until you find one not used!" crap from before...) + # TODO: if we want to lock ourselves even harder into localhost-only + # testing (probably not?) could use tempdir modules for this for improved + # safety. Then again...why would someone have such a folder??? + path = os.environ.get('TEST_FOLDER', 'paramiko-test-target') + # Forcibly nuke this directory locally, since at the moment, the below + # fixtures only ever run with a locally scoped stub test server. + shutil.rmtree(path, ignore_errors=True) + # Then create it anew, again locally, for the same reason. + os.mkdir(path) + return path + + +@pytest.fixture#(scope='session') +def sftp_server(): + """ + Set up an in-memory SFTP server thread. Yields the client Transport/socket. + + The resulting client Transport (along with all the server components) will + be the same object throughout the test session; the `sftp` fixture then + creates new higher level client objects wrapped around the client + Transport, as necessary. + """ + # Sockets & transports + socks = LoopSocket() + sockc = LoopSocket() + sockc.link(socks) + tc = Transport(sockc) + ts = Transport(socks) + # Auth + host_key = RSAKey.from_private_key_file(_support('test_rsa.key')) + ts.add_server_key(host_key) + # Server setup + event = threading.Event() + server = StubServer() + ts.set_subsystem_handler('sftp', SFTPServer, StubSFTPServer) + ts.start_server(event, server) + # Wait (so client has time to connect? Not sure. Old.) + event.wait(1.0) + # Make & yield connection. + tc.connect(username='slowdive', password='pygmalion') + yield tc + # TODO: any need for shutdown? Why didn't old suite do so? Or was that the + # point of the "join all threads from threading module" crap in test.py? + + +@pytest.fixture +def sftp(sftp_server): + """ + Yield an SFTP client connected to the global in-session SFTP server thread. + """ + # Client setup + client = SFTP.from_transport(sftp_server) + # Work in 'remote' folder setup (as it wants to use the client) + # TODO: how cleanest to make this available to tests? Doing it this way is + # marginally less bad than the previous 'global'-using setup, but not by + # much? + client.FOLDER = make_sftp_folder() + # Yield client to caller + yield client + # Clean up - as in make_sftp_folder, we assume local-only exec for now. + shutil.rmtree(client.FOLDER, ignore_errors=True) diff --git a/tests/loop.py b/tests/loop.py index e805ad96..6c432867 100644 --- a/tests/loop.py +++ b/tests/loop.py @@ -16,11 +16,9 @@ # along with Paramiko; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. -""" -... -""" +import socket +import threading -import threading, socket from paramiko.common import asbytes diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py index 0d673091..19545865 100644 --- a/tests/stub_sftp.py +++ b/tests/stub_sftp.py @@ -22,6 +22,7 @@ A stub SFTP server for loopback SFTP testing. import os import sys + from paramiko import ( ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, SFTPHandle, SFTP_OK, SFTP_FAILURE, AUTH_SUCCESSFUL, OPEN_SUCCEEDED, diff --git a/tests/test_auth.py b/tests/test_auth.py index e78397c6..4eade610 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -31,8 +31,10 @@ from paramiko import ( ) from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL from paramiko.py3compat import u -from tests.loop import LoopSocket -from tests.util import test_path + +from .loop import LoopSocket +from .util import _support, slow + _pwd = u('\u2022') @@ -40,7 +42,7 @@ _pwd = u('\u2022') class NullServer (ServerInterface): paranoid_did_password = False paranoid_did_public_key = False - paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key')) + paranoid_key = DSSKey.from_private_key_file(_support('test_dss.key')) def get_allowed_auths(self, username): if username == 'slowdive': @@ -118,7 +120,7 @@ class AuthTest (unittest.TestCase): self.sockc.close() def start_server(self): - host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = RSAKey.from_private_key_file(_support('test_rsa.key')) self.public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) self.event = threading.Event() @@ -170,7 +172,7 @@ class AuthTest (unittest.TestCase): self.tc.connect(hostkey=self.public_host_key) remain = self.tc.auth_password(username='paranoid', password='paranoid') self.assertEqual(['publickey'], remain) - key = DSSKey.from_private_key_file(test_path('test_dss.key')) + key = DSSKey.from_private_key_file(_support('test_dss.key')) remain = self.tc.auth_publickey(username='paranoid', key=key) self.assertEqual([], remain) self.verify_finished() @@ -238,6 +240,7 @@ class AuthTest (unittest.TestCase): etype, evalue, etb = sys.exc_info() self.assertTrue(issubclass(etype, AuthenticationException)) + @slow def test_9_auth_non_responsive(self): """ verify that authentication times out if server takes to long to diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py index eeb4d0ad..03616c55 100644 --- a/tests/test_buffered_pipe.py +++ b/tests/test_buffered_pipe.py @@ -23,6 +23,7 @@ Some unit tests for BufferedPipe. import threading import time import unittest + from paramiko.buffered_pipe import BufferedPipe, PipeTimeout from paramiko import pipe from paramiko.py3compat import b diff --git a/tests/test_client.py b/tests/test_client.py index 7058f394..7163fdcf 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -20,25 +20,32 @@ Some unit tests for SSHClient. """ -from __future__ import with_statement +from __future__ import with_statement, print_function import gc +import os import platform import socket -from tempfile import mkstemp import threading +import time import unittest -import weakref import warnings -import os -import time -from tests.util import test_path +import weakref +from tempfile import mkstemp + +from pytest_relaxed import raises import paramiko from paramiko.pkey import PublicBlob from paramiko.common import PY2 from paramiko.ssh_exception import SSHException, AuthenticationException +from .util import _support, slow + + +requires_gss_auth = unittest.skipUnless( + paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available" +) FINGERPRINTS = { 'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c', @@ -106,8 +113,7 @@ class NullServer(paramiko.ServerInterface): return True -class SSHClientTest (unittest.TestCase): - +class ClientTest(unittest.TestCase): def setUp(self): self.sockl = socket.socket() self.sockl.bind(('localhost', 0)) @@ -120,21 +126,47 @@ class SSHClientTest (unittest.TestCase): look_for_keys=False, ) self.event = threading.Event() + self.kill_event = threading.Event() def tearDown(self): - for attr in "tc ts socks sockl".split(): - if hasattr(self, attr): - getattr(self, attr).close() - - def _run(self, allowed_keys=None, delay=0, public_blob=None): + # Shut down client Transport + if hasattr(self, 'tc'): + self.tc.close() + # Shut down shared socket + if hasattr(self, 'sockl'): + # Signal to server thread that it should shut down early; it checks + # this immediately after accept(). (In scenarios where connection + # actually succeeded during the test, this becomes a no-op.) + self.kill_event.set() + # Forcibly connect to server sock in case the server thread is + # hanging out in its accept() (e.g. if the client side of the test + # fails before it even gets to connecting); there's no other good + # way to force an accept() to exit. + put_a_sock_in_it = socket.socket() + put_a_sock_in_it.connect((self.addr, self.port)) + put_a_sock_in_it.close() + # Then close "our" end of the socket (which _should_ cause the + # accept() to bail out, but does not, for some reason. I blame + # threading.) + self.sockl.close() + + def _run( + self, allowed_keys=None, delay=0, public_blob=None, kill_event=None, + ): if allowed_keys is None: allowed_keys = FINGERPRINTS.keys() self.socks, addr = self.sockl.accept() + # If the kill event was set at this point, it indicates an early + # shutdown, so bail out now and don't even try setting up a Transport + # (which will just verbosely die.) + if kill_event and kill_event.is_set(): + self.socks.close() + return self.ts = paramiko.Transport(self.socks) - keypath = test_path('test_rsa.key') + keypath = _support('test_rsa.key') host_key = paramiko.RSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) - keypath = test_path('test_ecdsa_256.key') + keypath = _support('test_ecdsa_256.key') host_key = paramiko.ECDSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob) @@ -149,12 +181,12 @@ class SSHClientTest (unittest.TestCase): The exception is ``allowed_keys`` which is stripped and handed to the ``NullServer`` used for testing. """ - run_kwargs = {} + run_kwargs = {'kill_event': self.kill_event} for key in ('allowed_keys', 'public_blob'): run_kwargs[key] = kwargs.pop(key, None) # Server setup threading.Thread(target=self._run, kwargs=run_kwargs).start() - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) # Client setup @@ -190,6 +222,8 @@ class SSHClientTest (unittest.TestCase): stdout.close() stderr.close() + +class SSHClientTest(ClientTest): def test_1_client(self): """ verify that the SSHClient stuff works too. @@ -200,22 +234,22 @@ class SSHClientTest (unittest.TestCase): """ verify that SSHClient works with a DSA key. """ - self._test_connection(key_filename=test_path('test_dss.key')) + self._test_connection(key_filename=_support('test_dss.key')) def test_client_rsa(self): """ verify that SSHClient works with an RSA key. """ - self._test_connection(key_filename=test_path('test_rsa.key')) + self._test_connection(key_filename=_support('test_rsa.key')) def test_2_5_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ - self._test_connection(key_filename=test_path('test_ecdsa_256.key')) + self._test_connection(key_filename=_support('test_ecdsa_256.key')) def test_client_ed25519(self): - self._test_connection(key_filename=test_path('test_ed25519.key')) + self._test_connection(key_filename=_support('test_ed25519.key')) def test_3_multiple_key_files(self): """ @@ -238,7 +272,7 @@ class SSHClientTest (unittest.TestCase): try: self._test_connection( key_filename=[ - test_path('test_{0}.key'.format(x)) for x in attempt + _support('test_{}.key'.format(x)) for x in attempt ], allowed_keys=[types_[x] for x in accept], ) @@ -256,7 +290,7 @@ class SSHClientTest (unittest.TestCase): # various platforms trigger different errors here >_< self.assertRaises(SSHException, self._test_connection, - key_filename=[test_path('test_rsa.key')], + key_filename=[_support('test_rsa.key')], allowed_keys=['ecdsa-sha2-nistp256'], ) @@ -266,8 +300,8 @@ class SSHClientTest (unittest.TestCase): # server-side behavior is 100% identical.) # NOTE: only bothered whipping up one cert per overall class/family. for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'): - cert_name = 'test_{0}.key-cert.pub'.format(type_) - cert_path = test_path(os.path.join('cert_support', cert_name)) + cert_name = 'test_{}.key-cert.pub'.format(type_) + cert_path = _support(os.path.join('cert_support', cert_name)) self._test_connection( key_filename=cert_path, public_blob=PublicBlob.from_file(cert_path), @@ -281,12 +315,12 @@ class SSHClientTest (unittest.TestCase): # that a specific cert was found, along with regular authorization # succeeding proving that the overall flow works. for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'): - key_name = 'test_{0}.key'.format(type_) - key_path = test_path(os.path.join('cert_support', key_name)) + key_name = 'test_{}.key'.format(type_) + key_path = _support(os.path.join('cert_support', key_name)) self._test_connection( key_filename=key_path, public_blob=PublicBlob.from_file( - '{0}-cert.pub'.format(key_path) + '{}-cert.pub'.format(key_path) ), ) @@ -302,7 +336,7 @@ class SSHClientTest (unittest.TestCase): """ threading.Thread(target=self._run).start() hostname = '[%s]:%d' % (self.addr, self.port) - key_file = test_path('test_ecdsa_256.key') + key_file = _support('test_ecdsa_256.key') public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) self.tc = paramiko.SSHClient() @@ -325,7 +359,7 @@ class SSHClientTest (unittest.TestCase): """ warnings.filterwarnings('ignore', 'tempnam.*') - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) fd, localname = mkstemp() os.close(fd) @@ -405,7 +439,7 @@ class SSHClientTest (unittest.TestCase): """ # Start the thread with a 1 second wait. threading.Thread(target=self._run, kwargs={'delay': 1}).start() - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() @@ -432,12 +466,13 @@ class SSHClientTest (unittest.TestCase): # 'television' as per tests/test_pkey.py). NOTE: must use # key_filename, loading the actual key here with PKey will except # immediately; we're testing the try/except crap within Client. - key_filename=[test_path('test_rsa_password.key')], + key_filename=[_support('test_rsa_password.key')], # Actual password for default 'slowdive' user password='pygmalion', ) self._test_connection(**kwargs) + @slow def test_9_auth_timeout(self): """ verify that the SSHClient has a configurable auth timeout @@ -450,27 +485,25 @@ class SSHClientTest (unittest.TestCase): auth_timeout=0.5, ) + @requires_gss_auth def test_10_auth_trickledown_gsskex(self): """ Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding """ - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest kwargs = dict( gss_kex=True, - key_filename=[test_path('test_rsa.key')], + key_filename=[_support('test_rsa.key')], ) self._test_connection(**kwargs) + @requires_gss_auth def test_11_auth_trickledown_gssauth(self): """ Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding """ - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest kwargs = dict( gss_auth=True, - key_filename=[test_path('test_rsa.key')], + key_filename=[_support('test_rsa.key')], ) self._test_connection(**kwargs) @@ -489,14 +522,13 @@ class SSHClientTest (unittest.TestCase): password='pygmalion', **self.connect_kwargs ) + @requires_gss_auth def test_13_reject_policy_gsskex(self): """ verify that SSHClient's RejectPolicy works, even if gssapi-keyex was enabled but not used. """ # Test for a bug present in paramiko versions released before 2017-08-01 - if not paramiko.GSS_AUTH_AVAILABLE: - return # for python 2.6 lacks skipTest threading.Thread(target=self._run).start() self.tc = paramiko.SSHClient() @@ -532,7 +564,7 @@ class SSHClientTest (unittest.TestCase): self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) - host_key = ktype.from_private_key_file(test_path(kfile)) + host_key = ktype.from_private_key_file(_support(kfile)) known_hosts = self.tc.get_host_keys() known_hosts.add(hostname, host_key.get_name(), host_key) @@ -556,10 +588,7 @@ class SSHClientTest (unittest.TestCase): def test_host_key_negotiation_4(self): self._client_host_key_good(paramiko.RSAKey, 'test_rsa.key') - def test_update_environment(self): - """ - Verify that environment variables can be set by the client. - """ + def _setup_for_env(self): threading.Thread(target=self._run).start() self.tc = paramiko.SSHClient() @@ -571,6 +600,11 @@ class SSHClientTest (unittest.TestCase): self.assertTrue(self.event.isSet()) self.assertTrue(self.ts.is_active()) + def test_update_environment(self): + """ + Verify that environment variables can be set by the client. + """ + self._setup_for_env() target_env = {b'A': b'B', b'C': b'd'} self.tc.exec_command('yes', environment=target_env) @@ -578,19 +612,20 @@ class SSHClientTest (unittest.TestCase): self.assertEqual(target_env, getattr(schan, 'env', {})) schan.close() - # Cannot use assertRaises in context manager mode as it is not supported - # in Python 2.6. - try: + @unittest.skip("Clients normally fail silently, thus so do we, for now") + def test_env_update_failures(self): + self._setup_for_env() + with self.assertRaises(SSHException) as manager: # Verify that a rejection by the server can be detected self.tc.exec_command('yes', environment={b'INVALID_ENV': b''}) - except SSHException as e: - self.assertTrue('INVALID_ENV' in str(e), - 'Expected variable name in error message') - self.assertTrue(isinstance(e.args[1], SSHException), - 'Expected original SSHException in exception') - else: - self.assertFalse(False, 'SSHException was not thrown.') - + self.assertTrue( + 'INVALID_ENV' in str(manager.exception), + 'Expected variable name in error message' + ) + self.assertTrue( + isinstance(manager.exception.args[1], SSHException), + 'Expected original SSHException in exception' + ) def test_missing_key_policy_accepts_classes_or_instances(self): """ @@ -607,3 +642,45 @@ class SSHClientTest (unittest.TestCase): # Hand in just the class (new behavior) client.set_missing_host_key_policy(paramiko.AutoAddPolicy) assert isinstance(client._policy, paramiko.AutoAddPolicy) + + +class PasswordPassphraseTests(ClientTest): + # TODO: most of these could reasonably be set up to use mocks/assertions + # (e.g. "gave passphrase -> expect PKey was given it as the passphrase") + # instead of suffering a real connection cycle. + # TODO: in that case, move the below to be part of an integration suite? + + def test_password_kwarg_works_for_password_auth(self): + # Straightforward / duplicate of earlier basic password test. + self._test_connection(password='pygmalion') + + # TODO: more granular exception pending #387; should be signaling "no auth + # methods available" because no key and no password + @raises(SSHException) + def test_passphrase_kwarg_not_used_for_password_auth(self): + # Using the "right" password in the "wrong" field shouldn't work. + self._test_connection(passphrase='pygmalion') + + def test_passphrase_kwarg_used_for_key_passphrase(self): + # Straightforward again, with new passphrase kwarg. + self._test_connection( + key_filename=_support('test_rsa_password.key'), + passphrase='television', + ) + + def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given(self): # noqa + # Backwards compatibility: passphrase in the password field. + self._test_connection( + key_filename=_support('test_rsa_password.key'), + password='television', + ) + + @raises(AuthenticationException) # TODO: more granular + def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given(self): # noqa + # Sanity: if we're given both fields, the password field is NOT used as + # a passphrase. + self._test_connection( + key_filename=_support('test_rsa_password.key'), + password='television', + passphrase='wat? lol no', + ) diff --git a/tests/test_file.py b/tests/test_file.py index b33ecd51..3d2c94e6 100755..100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -27,7 +27,7 @@ from paramiko.common import linefeed_byte, crlf, cr_byte from paramiko.file import BufferedFile from paramiko.py3compat import BytesIO -from tests import skipUnlessBuiltin +from .util import needs_builtin class LoopbackFile (BufferedFile): @@ -198,13 +198,13 @@ class BufferedFileTest (unittest.TestCase): f.write(text) self.assertEqual(f.read(), text.encode("utf-8")) - @skipUnlessBuiltin('memoryview') + @needs_builtin('memoryview') def test_write_bytearray(self): with LoopbackFile('rb+') as f: f.write(bytearray(12)) self.assertEqual(f.read(), 12 * b"\0") - @skipUnlessBuiltin('buffer') + @needs_builtin('buffer') def test_write_buffer(self): data = 3 * b"pretend giant block of data\n" offsets = range(0, len(data), 8) @@ -213,7 +213,7 @@ class BufferedFileTest (unittest.TestCase): f.write(buffer(data, offset, 8)) self.assertEqual(f.read(), data) - @skipUnlessBuiltin('memoryview') + @needs_builtin('memoryview') def test_write_memoryview(self): data = 3 * b"pretend giant block of data\n" offsets = range(0, len(data), 8) diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index bc220108..d4b632be 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -25,14 +25,17 @@ Test the used APIs for GSS-API / SSPI authentication import unittest import socket +from .util import needs_gssapi + +@needs_gssapi class GSSAPITest(unittest.TestCase): - @staticmethod - def init(hostname=None, srv_mode=False): - global krb5_mech, targ_name, server_mode - krb5_mech = "1.2.840.113554.1.2.2" - targ_name = hostname - server_mode = srv_mode + def setup(): + # TODO: these vars should all come from os.environ or whatever the + # approved pytest method is for runtime-configuring test data. + self.krb5_mech = "1.2.840.113554.1.2.2" + self.targ_name = "hostname" + self.server_mode = False def test_1_pyasn1(self): """ @@ -40,9 +43,9 @@ class GSSAPITest(unittest.TestCase): """ from pyasn1.type.univ import ObjectIdentifier from pyasn1.codec.der import encoder, decoder - oid = encoder.encode(ObjectIdentifier(krb5_mech)) + oid = encoder.encode(ObjectIdentifier(self.krb5_mech)) mech, __ = decoder.decode(oid) - self.assertEquals(krb5_mech, mech.__str__()) + self.assertEquals(self.krb5_mech, mech.__str__()) def test_2_gssapi_sspi(self): """ @@ -61,7 +64,7 @@ class GSSAPITest(unittest.TestCase): mic_msg = b"G'day Mate!" if _API == "MIT": - if server_mode: + if self.server_mode: gss_flags = (gssapi.C_PROT_READY_FLAG, gssapi.C_INTEG_FLAG, gssapi.C_MUTUAL_FLAG, @@ -73,13 +76,13 @@ class GSSAPITest(unittest.TestCase): # Initialize a GSS-API context. ctx = gssapi.Context() ctx.flags = gss_flags - krb5_oid = gssapi.OID.mech_from_string(krb5_mech) - target_name = gssapi.Name("host@" + targ_name, + krb5_oid = gssapi.OID.mech_from_string(self.krb5_mech) + target_name = gssapi.Name("host@" + self.targ_name, gssapi.C_NT_HOSTBASED_SERVICE) gss_ctxt = gssapi.InitContext(peer_name=target_name, mech_type=krb5_oid, req_flags=ctx.flags) - if server_mode: + if self.server_mode: c_token = gss_ctxt.step(c_token) gss_ctxt_status = gss_ctxt.established self.assertEquals(False, gss_ctxt_status) @@ -99,7 +102,7 @@ class GSSAPITest(unittest.TestCase): # Build MIC mic_token = gss_ctxt.get_mic(mic_msg) - if server_mode: + if self.server_mode: # Check MIC status = gss_srv_ctxt.verify_mic(mic_msg, mic_token) self.assertEquals(0, status) @@ -110,11 +113,11 @@ class GSSAPITest(unittest.TestCase): sspicon.ISC_REQ_DELEGATE ) # Initialize a GSS-API context. - target_name = "host/" + socket.getfqdn(targ_name) + target_name = "host/" + socket.getfqdn(self.targ_name) gss_ctxt = sspi.ClientAuth("Kerberos", scflags=gss_flags, targetspn=target_name) - if server_mode: + if self.server_mode: error, token = gss_ctxt.authorize(c_token) c_token = token[0].Buffer self.assertEquals(0, error) diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py index 2c7ceeb9..cd75f8ab 100644 --- a/tests/test_hostkeys.py +++ b/tests/test_hostkeys.py @@ -23,6 +23,7 @@ Some unit tests for HostKeys. from binascii import hexlify import os import unittest + import paramiko from paramiko.py3compat import decodebytes diff --git a/tests/test_kex.py b/tests/test_kex.py index b7f588f7..b5808e7e 100644 --- a/tests/test_kex.py +++ b/tests/test_kex.py @@ -24,14 +24,15 @@ from binascii import hexlify, unhexlify import os import unittest +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import ec + import paramiko.util from paramiko.kex_group1 import KexGroup1 from paramiko.kex_gex import KexGex, KexGexSHA256 from paramiko import Message from paramiko.common import byte_chr from paramiko.kex_ecdh_nist import KexNistp256 -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.asymmetric import ec def dummy_urandom(n): diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index af342a7c..025d1faa 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -31,6 +31,8 @@ import unittest import paramiko +from .util import needs_gssapi + class NullServer (paramiko.ServerInterface): @@ -57,6 +59,7 @@ class NullServer (paramiko.ServerInterface): return True +@needs_gssapi class GSSKexTest(unittest.TestCase): @staticmethod def init(username, hostname): diff --git a/tests/test_message.py b/tests/test_message.py index f18cae90..645b0509 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -21,6 +21,7 @@ Some unit tests for ssh protocol message blocks. """ import unittest + from paramiko.message import Message from paramiko.common import byte_chr, zero_byte diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py index 02173292..414b7e38 100644 --- a/tests/test_packetizer.py +++ b/tests/test_packetizer.py @@ -27,11 +27,12 @@ from hashlib import sha1 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes -from tests.loop import LoopSocket - from paramiko import Message, Packetizer, util from paramiko.common import byte_chr, zero_byte +from .loop import LoopSocket + + x55 = byte_chr(0x55) x1f = byte_chr(0x1f) diff --git a/tests/test_pkey.py b/tests/test_pkey.py index 42d8e6bb..1827d2a9 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -30,7 +30,8 @@ import base64 from paramiko import RSAKey, DSSKey, ECDSAKey, Ed25519Key, Message, util from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2 -from tests.util import test_path +from .util import _support + # from openssh's ssh-keygen PUB_RSA = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c=' @@ -138,7 +139,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(exp, key) def test_2_load_rsa(self): - key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + key = RSAKey.from_private_key_file(_support('test_rsa.key')) self.assertEqual('ssh-rsa', key.get_name()) exp_rsa = b(FINGER_RSA.split()[1].replace(':', '')) my_rsa = hexlify(key.get_fingerprint()) @@ -154,7 +155,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, key2) def test_3_load_rsa_password(self): - key = RSAKey.from_private_key_file(test_path('test_rsa_password.key'), 'television') + key = RSAKey.from_private_key_file(_support('test_rsa_password.key'), 'television') self.assertEqual('ssh-rsa', key.get_name()) exp_rsa = b(FINGER_RSA.split()[1].replace(':', '')) my_rsa = hexlify(key.get_fingerprint()) @@ -163,7 +164,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(1024, key.get_bits()) def test_4_load_dss(self): - key = DSSKey.from_private_key_file(test_path('test_dss.key')) + key = DSSKey.from_private_key_file(_support('test_dss.key')) self.assertEqual('ssh-dss', key.get_name()) exp_dss = b(FINGER_DSS.split()[1].replace(':', '')) my_dss = hexlify(key.get_fingerprint()) @@ -179,7 +180,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, key2) def test_5_load_dss_password(self): - key = DSSKey.from_private_key_file(test_path('test_dss_password.key'), 'television') + key = DSSKey.from_private_key_file(_support('test_dss_password.key'), 'television') self.assertEqual('ssh-dss', key.get_name()) exp_dss = b(FINGER_DSS.split()[1].replace(':', '')) my_dss = hexlify(key.get_fingerprint()) @@ -189,7 +190,7 @@ class KeyTest(unittest.TestCase): def test_6_compare_rsa(self): # verify that the private & public keys compare equal - key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + key = RSAKey.from_private_key_file(_support('test_rsa.key')) self.assertEqual(key, key) pub = RSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -198,7 +199,7 @@ class KeyTest(unittest.TestCase): def test_7_compare_dss(self): # verify that the private & public keys compare equal - key = DSSKey.from_private_key_file(test_path('test_dss.key')) + key = DSSKey.from_private_key_file(_support('test_dss.key')) self.assertEqual(key, key) pub = DSSKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -207,7 +208,7 @@ class KeyTest(unittest.TestCase): def test_8_sign_rsa(self): # verify that the rsa private key can sign and verify - key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + key = RSAKey.from_private_key_file(_support('test_rsa.key')) msg = key.sign_ssh_data(b'ice weasels') self.assertTrue(type(msg) is Message) msg.rewind() @@ -220,7 +221,7 @@ class KeyTest(unittest.TestCase): def test_9_sign_dss(self): # verify that the dss private key can sign and verify - key = DSSKey.from_private_key_file(test_path('test_dss.key')) + key = DSSKey.from_private_key_file(_support('test_dss.key')) msg = key.sign_ssh_data(b'ice weasels') self.assertTrue(type(msg) is Message) msg.rewind() @@ -275,7 +276,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key.get_name(), 'ecdsa-sha2-nistp521') def test_10_load_ecdsa_256(self): - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_256.key')) self.assertEqual('ecdsa-sha2-nistp256', key.get_name()) exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) @@ -291,7 +292,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, key2) def test_11_load_ecdsa_password_256(self): - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_256.key'), b'television') + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_password_256.key'), b'television') self.assertEqual('ecdsa-sha2-nistp256', key.get_name()) exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) @@ -301,7 +302,7 @@ class KeyTest(unittest.TestCase): def test_12_compare_ecdsa_256(self): # verify that the private & public keys compare equal - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_256.key')) self.assertEqual(key, key) pub = ECDSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -310,7 +311,7 @@ class KeyTest(unittest.TestCase): def test_13_sign_ecdsa_256(self): # verify that the rsa private key can sign and verify - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_256.key')) msg = key.sign_ssh_data(b'ice weasels') self.assertTrue(type(msg) is Message) msg.rewind() @@ -325,7 +326,7 @@ class KeyTest(unittest.TestCase): self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg)) def test_14_load_ecdsa_384(self): - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_384.key')) self.assertEqual('ecdsa-sha2-nistp384', key.get_name()) exp_ecdsa = b(FINGER_ECDSA_384.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) @@ -341,7 +342,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, key2) def test_15_load_ecdsa_password_384(self): - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_384.key'), b'television') + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_password_384.key'), b'television') self.assertEqual('ecdsa-sha2-nistp384', key.get_name()) exp_ecdsa = b(FINGER_ECDSA_384.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) @@ -351,7 +352,7 @@ class KeyTest(unittest.TestCase): def test_16_compare_ecdsa_384(self): # verify that the private & public keys compare equal - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_384.key')) self.assertEqual(key, key) pub = ECDSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -360,7 +361,7 @@ class KeyTest(unittest.TestCase): def test_17_sign_ecdsa_384(self): # verify that the rsa private key can sign and verify - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_384.key')) msg = key.sign_ssh_data(b'ice weasels') self.assertTrue(type(msg) is Message) msg.rewind() @@ -375,7 +376,7 @@ class KeyTest(unittest.TestCase): self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg)) def test_18_load_ecdsa_521(self): - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_521.key')) self.assertEqual('ecdsa-sha2-nistp521', key.get_name()) exp_ecdsa = b(FINGER_ECDSA_521.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) @@ -394,7 +395,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, key2) def test_19_load_ecdsa_password_521(self): - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_521.key'), b'television') + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_password_521.key'), b'television') self.assertEqual('ecdsa-sha2-nistp521', key.get_name()) exp_ecdsa = b(FINGER_ECDSA_521.split()[1].replace(':', '')) my_ecdsa = hexlify(key.get_fingerprint()) @@ -404,7 +405,7 @@ class KeyTest(unittest.TestCase): def test_20_compare_ecdsa_521(self): # verify that the private & public keys compare equal - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_521.key')) self.assertEqual(key, key) pub = ECDSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -413,7 +414,7 @@ class KeyTest(unittest.TestCase): def test_21_sign_ecdsa_521(self): # verify that the rsa private key can sign and verify - key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key')) + key = ECDSAKey.from_private_key_file(_support('test_ecdsa_521.key')) msg = key.sign_ssh_data(b'ice weasels') self.assertTrue(type(msg) is Message) msg.rewind() @@ -429,7 +430,7 @@ class KeyTest(unittest.TestCase): def test_salt_size(self): # Read an existing encrypted private key - file_ = test_path('test_rsa_password.key') + file_ = _support('test_rsa_password.key') password = 'television' newfile = file_ + '.new' newpassword = 'radio' @@ -446,20 +447,20 @@ class KeyTest(unittest.TestCase): os.remove(newfile) def test_stringification(self): - key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + key = RSAKey.from_private_key_file(_support('test_rsa.key')) comparable = TEST_KEY_BYTESTR_2 if PY2 else TEST_KEY_BYTESTR_3 self.assertEqual(str(key), comparable) def test_ed25519(self): - key1 = Ed25519Key.from_private_key_file(test_path('test_ed25519.key')) + key1 = Ed25519Key.from_private_key_file(_support('test_ed25519.key')) key2 = Ed25519Key.from_private_key_file( - test_path('test_ed25519_password.key'), b'abc123' + _support('test_ed25519_password.key'), b'abc123' ) self.assertNotEqual(key1.asbytes(), key2.asbytes()) def test_ed25519_compare(self): # verify that the private & public keys compare equal - key = Ed25519Key.from_private_key_file(test_path('test_ed25519.key')) + key = Ed25519Key.from_private_key_file(_support('test_ed25519.key')) self.assertEqual(key, key) pub = Ed25519Key(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -469,7 +470,7 @@ class KeyTest(unittest.TestCase): def test_ed25519_nonbytes_password(self): # https://github.com/paramiko/paramiko/issues/1039 key = Ed25519Key.from_private_key_file( - test_path('test_ed25519_password.key'), + _support('test_ed25519_password.key'), # NOTE: not a bytes. Amusingly, the test above for same key DOES # explicitly cast to bytes...code smell! 'abc123', @@ -477,14 +478,14 @@ class KeyTest(unittest.TestCase): # No exception -> it's good. Meh. def test_ed25519_load_from_file_obj(self): - with open(test_path('test_ed25519.key')) as pkey_fileobj: + with open(_support('test_ed25519.key')) as pkey_fileobj: key = Ed25519Key.from_private_key(pkey_fileobj) self.assertEqual(key, key) self.assertTrue(key.can_sign()) def test_keyfile_is_actually_encrypted(self): # Read an existing encrypted private key - file_ = test_path('test_rsa_password.key') + file_ = _support('test_rsa_password.key') password = 'television' newfile = file_ + '.new' newpassword = 'radio' @@ -502,10 +503,10 @@ class KeyTest(unittest.TestCase): # test_client.py; this and nearby cert tests are more about the gritty # details. # PKey.load_certificate - key_path = test_path(os.path.join('cert_support', 'test_rsa.key')) + key_path = _support(os.path.join('cert_support', 'test_rsa.key')) key = RSAKey.from_private_key_file(key_path) self.assertTrue(key.public_blob is None) - cert_path = test_path( + cert_path = _support( os.path.join('cert_support', 'test_rsa.key-cert.pub') ) key.load_certificate(cert_path) @@ -524,10 +525,10 @@ class KeyTest(unittest.TestCase): self.assertEqual(msg.get_int64(), 1234) # Prevented from loading certificate that doesn't match - key_path = test_path(os.path.join('cert_support', 'test_ed25519.key')) + key_path = _support(os.path.join('cert_support', 'test_ed25519.key')) key1 = Ed25519Key.from_private_key_file(key_path) self.assertRaises( ValueError, key1.load_certificate, - test_path('test_rsa.key-cert.pub'), + _support('test_rsa.key-cert.pub'), ) diff --git a/tests/test_sftp.py b/tests/test_sftp.py index b3c7bf98..09a50453 100755..100644 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -32,16 +32,19 @@ import warnings from binascii import hexlify from tempfile import mkstemp +import pytest + import paramiko +import paramiko.util from paramiko.py3compat import PY2, b, u, StringIO from paramiko.common import o777, o600, o666, o644 -from tests import skipUnlessBuiltin -from tests.stub_sftp import StubServer, StubSFTPServer -from tests.loop import LoopSocket -from tests.util import test_path -import paramiko.util from paramiko.sftp_attr import SFTPAttributes +from .util import needs_builtin +from .stub_sftp import StubServer, StubSFTPServer +from .util import _support, slow + + ARTICLE = ''' Insulin sensitivity and liver insulin receptor structure in ducks from two genera @@ -81,303 +84,201 @@ decreased compared with chicken. # Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation byte" NON_UTF8_DATA = b'\xC3\xC3' -FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000') - -sftp = None -tc = None -g_big_file_test = True -# we need to use eval(compile()) here because Py3.2 doesn't support the 'u' marker for unicode -# this test is the only line in the entire program that has to be treated specially to support Py3.2 -unicode_folder = eval(compile(r"u'\u00fcnic\u00f8de'" if PY2 else r"'\u00fcnic\u00f8de'", 'test_sftp.py', 'eval')) +unicode_folder = u'\u00fcnic\u00f8de' if PY2 else '\u00fcnic\u00f8de' utf8_folder = b'/\xc3\xbcnic\xc3\xb8\x64\x65' -def get_sftp(): - global sftp - return sftp - - -class SFTPTest (unittest.TestCase): - @staticmethod - def init(hostname, username, keyfile, passwd): - global sftp, tc - - t = paramiko.Transport(hostname) - tc = t - try: - key = paramiko.RSAKey.from_private_key_file(keyfile, passwd) - except paramiko.PasswordRequiredException: - sys.stderr.write('\n\nparamiko.RSAKey.from_private_key_file REQUIRES PASSWORD.\n') - sys.stderr.write('You have two options:\n') - sys.stderr.write('* Use the "-K" option to point to a different (non-password-protected)\n') - sys.stderr.write(' private key file.\n') - sys.stderr.write('* Use the "-P" option to provide the password needed to unlock this private\n') - sys.stderr.write(' key.\n') - sys.stderr.write('\n') - sys.exit(1) - try: - t.connect(username=username, pkey=key) - except paramiko.SSHException: - t.close() - sys.stderr.write('\n\nparamiko.Transport.connect FAILED.\n') - sys.stderr.write('There are several possible reasons why it might fail so quickly:\n\n') - sys.stderr.write('* The host to connect to (%s) is not a valid SSH server.\n' % hostname) - sys.stderr.write(' (Use the "-H" option to change the host.)\n') - sys.stderr.write('* The username to auth as (%s) is invalid.\n' % username) - sys.stderr.write(' (Use the "-U" option to change the username.)\n') - sys.stderr.write('* The private key given (%s) is not accepted by the server.\n' % keyfile) - sys.stderr.write(' (Use the "-K" option to provide a different key file.)\n') - sys.stderr.write('\n') - sys.exit(1) - sftp = paramiko.SFTP.from_transport(t) - - @staticmethod - def init_loopback(): - global sftp, tc - - socks = LoopSocket() - sockc = LoopSocket() - sockc.link(socks) - tc = paramiko.Transport(sockc) - ts = paramiko.Transport(socks) - - host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) - ts.add_server_key(host_key) - event = threading.Event() - server = StubServer() - ts.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer) - ts.start_server(event, server) - tc.connect(username='slowdive', password='pygmalion') - event.wait(1.0) - - sftp = paramiko.SFTP.from_transport(tc) - - @staticmethod - def set_big_file_test(onoff): - global g_big_file_test - g_big_file_test = onoff - - def setUp(self): - global FOLDER - for i in range(1000): - FOLDER = FOLDER[:-3] + '%03d' % i - try: - sftp.mkdir(FOLDER) - break - except (IOError, OSError): - pass - - def tearDown(self): - #sftp.chdir() - sftp.rmdir(FOLDER) - - def test_1_file(self): +@slow +class TestSFTP(object): + def test_1_file(self, sftp): """ verify that we can create a file. """ - f = sftp.open(FOLDER + '/test', 'w') + f = sftp.open(sftp.FOLDER + '/test', 'w') try: - self.assertEqual(f.stat().st_size, 0) + assert f.stat().st_size == 0 finally: f.close() - sftp.remove(FOLDER + '/test') + sftp.remove(sftp.FOLDER + '/test') - def test_2_close(self): + def test_2_close(self, sftp): """ - verify that closing the sftp session doesn't do anything bad, and that - a new one can be opened. + Verify that SFTP session close() causes a socket error on next action. """ - global sftp sftp.close() - try: - sftp.open(FOLDER + '/test2', 'w') - self.fail('expected exception') - except: - pass - sftp = paramiko.SFTP.from_transport(tc) + with pytest.raises(socket.error, match='Socket is closed'): + sftp.open(sftp.FOLDER + '/test2', 'w') - def test_2_sftp_can_be_used_as_context_manager(self): + def test_2_sftp_can_be_used_as_context_manager(self, sftp): """ verify that the sftp session is closed when exiting the context manager """ - global sftp with sftp: pass - try: - sftp.open(FOLDER + '/test2', 'w') - self.fail('expected exception') - except (EOFError, socket.error): - pass - finally: - sftp = paramiko.SFTP.from_transport(tc) + with pytest.raises(socket.error, match='Socket is closed'): + sftp.open(sftp.FOLDER + '/test2', 'w') - def test_3_write(self): + def test_3_write(self, sftp): """ verify that a file can be created and written, and the size is correct. """ try: - with sftp.open(FOLDER + '/duck.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f: f.write(ARTICLE) - self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) + assert sftp.stat(sftp.FOLDER + '/duck.txt').st_size == 1483 finally: - sftp.remove(FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/duck.txt') - def test_3_sftp_file_can_be_used_as_context_manager(self): + def test_3_sftp_file_can_be_used_as_context_manager(self, sftp): """ verify that an opened file can be used as a context manager """ try: - with sftp.open(FOLDER + '/duck.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f: f.write(ARTICLE) - self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) + assert sftp.stat(sftp.FOLDER + '/duck.txt').st_size == 1483 finally: - sftp.remove(FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/duck.txt') - def test_4_append(self): + def test_4_append(self, sftp): """ verify that a file can be opened for append, and tell() still works. """ try: - with sftp.open(FOLDER + '/append.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/append.txt', 'w') as f: f.write('first line\nsecond line\n') - self.assertEqual(f.tell(), 23) + assert f.tell() == 23 - with sftp.open(FOLDER + '/append.txt', 'a+') as f: + with sftp.open(sftp.FOLDER + '/append.txt', 'a+') as f: f.write('third line!!!\n') - self.assertEqual(f.tell(), 37) - self.assertEqual(f.stat().st_size, 37) + assert f.tell() == 37 + assert f.stat().st_size == 37 f.seek(-26, f.SEEK_CUR) - self.assertEqual(f.readline(), 'second line\n') + assert f.readline() == 'second line\n' finally: - sftp.remove(FOLDER + '/append.txt') + sftp.remove(sftp.FOLDER + '/append.txt') - def test_5_rename(self): + def test_5_rename(self, sftp): """ verify that renaming a file works. """ try: - with sftp.open(FOLDER + '/first.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/first.txt', 'w') as f: f.write('content!\n') - sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt') - try: - sftp.open(FOLDER + '/first.txt', 'r') - self.assertTrue(False, 'no exception on reading nonexistent file') - except IOError: - pass - with sftp.open(FOLDER + '/second.txt', 'r') as f: + sftp.rename(sftp.FOLDER + '/first.txt', sftp.FOLDER + '/second.txt') + with pytest.raises(IOError, match='No such file'): + sftp.open(sftp.FOLDER + '/first.txt', 'r') + with sftp.open(sftp.FOLDER + '/second.txt', 'r') as f: f.seek(-6, f.SEEK_END) - self.assertEqual(u(f.read(4)), 'tent') + assert u(f.read(4)) == 'tent' finally: + # TODO: this is gross, make some sort of 'remove if possible' / 'rm + # -f' a-like, jeez try: - sftp.remove(FOLDER + '/first.txt') + sftp.remove(sftp.FOLDER + '/first.txt') except: pass try: - sftp.remove(FOLDER + '/second.txt') + sftp.remove(sftp.FOLDER + '/second.txt') except: pass - def test_5a_posix_rename(self): + def test_5a_posix_rename(self, sftp): """Test posix-rename@openssh.com protocol extension.""" try: # first check that the normal rename works as specified - with sftp.open(FOLDER + '/a', 'w') as f: + with sftp.open(sftp.FOLDER + '/a', 'w') as f: f.write('one') - sftp.rename(FOLDER + '/a', FOLDER + '/b') - with sftp.open(FOLDER + '/a', 'w') as f: + sftp.rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b') + with sftp.open(sftp.FOLDER + '/a', 'w') as f: f.write('two') - try: - sftp.rename(FOLDER + '/a', FOLDER + '/b') - self.assertTrue(False, 'no exception when rename-ing onto existing file') - except (OSError, IOError): - pass + with pytest.raises(IOError): # actual message seems generic + sftp.rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b') # now check with the posix_rename - sftp.posix_rename(FOLDER + '/a', FOLDER + '/b') - with sftp.open(FOLDER + '/b', 'r') as f: + sftp.posix_rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b') + with sftp.open(sftp.FOLDER + '/b', 'r') as f: data = u(f.read()) - self.assertEqual('two', data, "Contents of renamed file not the same as original file") + err = "Contents of renamed file not the same as original file" + assert 'two' == data, err finally: try: - sftp.remove(FOLDER + '/a') + sftp.remove(sftp.FOLDER + '/a') except: pass try: - sftp.remove(FOLDER + '/b') + sftp.remove(sftp.FOLDER + '/b') except: pass - def test_6_folder(self): + def test_6_folder(self, sftp): """ create a temporary folder, verify that we can create a file in it, then remove the folder and verify that we can't create a file in it anymore. """ - sftp.mkdir(FOLDER + '/subfolder') - sftp.open(FOLDER + '/subfolder/test', 'w').close() - sftp.remove(FOLDER + '/subfolder/test') - sftp.rmdir(FOLDER + '/subfolder') - try: - sftp.open(FOLDER + '/subfolder/test') - # shouldn't be able to create that file - self.assertTrue(False, 'no exception at dummy file creation') - except IOError: - pass + sftp.mkdir(sftp.FOLDER + '/subfolder') + sftp.open(sftp.FOLDER + '/subfolder/test', 'w').close() + sftp.remove(sftp.FOLDER + '/subfolder/test') + sftp.rmdir(sftp.FOLDER + '/subfolder') + # shouldn't be able to create that file if dir removed + with pytest.raises(IOError, match="No such file"): + sftp.open(sftp.FOLDER + '/subfolder/test') - def test_7_listdir(self): + def test_7_listdir(self, sftp): """ verify that a folder can be created, a bunch of files can be placed in it, and those files show up in sftp.listdir. """ try: - sftp.open(FOLDER + '/duck.txt', 'w').close() - sftp.open(FOLDER + '/fish.txt', 'w').close() - sftp.open(FOLDER + '/tertiary.py', 'w').close() - - x = sftp.listdir(FOLDER) - self.assertEqual(len(x), 3) - self.assertTrue('duck.txt' in x) - self.assertTrue('fish.txt' in x) - self.assertTrue('tertiary.py' in x) - self.assertTrue('random' not in x) + sftp.open(sftp.FOLDER + '/duck.txt', 'w').close() + sftp.open(sftp.FOLDER + '/fish.txt', 'w').close() + sftp.open(sftp.FOLDER + '/tertiary.py', 'w').close() + + x = sftp.listdir(sftp.FOLDER) + assert len(x) == 3 + assert 'duck.txt' in x + assert 'fish.txt' in x + assert 'tertiary.py' in x + assert 'random' not in x finally: - sftp.remove(FOLDER + '/duck.txt') - sftp.remove(FOLDER + '/fish.txt') - sftp.remove(FOLDER + '/tertiary.py') + sftp.remove(sftp.FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/fish.txt') + sftp.remove(sftp.FOLDER + '/tertiary.py') - def test_7_5_listdir_iter(self): + def test_7_5_listdir_iter(self, sftp): """ listdir_iter version of above test """ try: - sftp.open(FOLDER + '/duck.txt', 'w').close() - sftp.open(FOLDER + '/fish.txt', 'w').close() - sftp.open(FOLDER + '/tertiary.py', 'w').close() - - x = [x.filename for x in sftp.listdir_iter(FOLDER)] - self.assertEqual(len(x), 3) - self.assertTrue('duck.txt' in x) - self.assertTrue('fish.txt' in x) - self.assertTrue('tertiary.py' in x) - self.assertTrue('random' not in x) + sftp.open(sftp.FOLDER + '/duck.txt', 'w').close() + sftp.open(sftp.FOLDER + '/fish.txt', 'w').close() + sftp.open(sftp.FOLDER + '/tertiary.py', 'w').close() + + x = [x.filename for x in sftp.listdir_iter(sftp.FOLDER)] + assert len(x) == 3 + assert 'duck.txt' in x + assert 'fish.txt' in x + assert 'tertiary.py' in x + assert 'random' not in x finally: - sftp.remove(FOLDER + '/duck.txt') - sftp.remove(FOLDER + '/fish.txt') - sftp.remove(FOLDER + '/tertiary.py') + sftp.remove(sftp.FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/fish.txt') + sftp.remove(sftp.FOLDER + '/tertiary.py') - def test_8_setstat(self): + def test_8_setstat(self, sftp): """ verify that the setstat functions (chown, chmod, utime, truncate) work. """ try: - with sftp.open(FOLDER + '/special', 'w') as f: + with sftp.open(sftp.FOLDER + '/special', 'w') as f: f.write('x' * 1024) - stat = sftp.stat(FOLDER + '/special') - sftp.chmod(FOLDER + '/special', (stat.st_mode & ~o777) | o600) - stat = sftp.stat(FOLDER + '/special') + stat = sftp.stat(sftp.FOLDER + '/special') + sftp.chmod(sftp.FOLDER + '/special', (stat.st_mode & ~o777) | o600) + stat = sftp.stat(sftp.FOLDER + '/special') expected_mode = o600 if sys.platform == 'win32': # chmod not really functional on windows @@ -385,35 +286,35 @@ class SFTPTest (unittest.TestCase): if sys.platform == 'cygwin': # even worse. expected_mode = o644 - self.assertEqual(stat.st_mode & o777, expected_mode) - self.assertEqual(stat.st_size, 1024) + assert stat.st_mode & o777 == expected_mode + assert stat.st_size == 1024 mtime = stat.st_mtime - 3600 atime = stat.st_atime - 1800 - sftp.utime(FOLDER + '/special', (atime, mtime)) - stat = sftp.stat(FOLDER + '/special') - self.assertEqual(stat.st_mtime, mtime) + sftp.utime(sftp.FOLDER + '/special', (atime, mtime)) + stat = sftp.stat(sftp.FOLDER + '/special') + assert stat.st_mtime == mtime if sys.platform not in ('win32', 'cygwin'): - self.assertEqual(stat.st_atime, atime) + assert stat.st_atime == atime # can't really test chown, since we'd have to know a valid uid. - sftp.truncate(FOLDER + '/special', 512) - stat = sftp.stat(FOLDER + '/special') - self.assertEqual(stat.st_size, 512) + sftp.truncate(sftp.FOLDER + '/special', 512) + stat = sftp.stat(sftp.FOLDER + '/special') + assert stat.st_size == 512 finally: - sftp.remove(FOLDER + '/special') + sftp.remove(sftp.FOLDER + '/special') - def test_9_fsetstat(self): + def test_9_fsetstat(self, sftp): """ verify that the fsetstat functions (chown, chmod, utime, truncate) work on open files. """ try: - with sftp.open(FOLDER + '/special', 'w') as f: + with sftp.open(sftp.FOLDER + '/special', 'w') as f: f.write('x' * 1024) - with sftp.open(FOLDER + '/special', 'r+') as f: + with sftp.open(sftp.FOLDER + '/special', 'r+') as f: stat = f.stat() f.chmod((stat.st_mode & ~o777) | o600) stat = f.stat() @@ -425,26 +326,26 @@ class SFTPTest (unittest.TestCase): if sys.platform == 'cygwin': # even worse. expected_mode = o644 - self.assertEqual(stat.st_mode & o777, expected_mode) - self.assertEqual(stat.st_size, 1024) + assert stat.st_mode & o777 == expected_mode + assert stat.st_size == 1024 mtime = stat.st_mtime - 3600 atime = stat.st_atime - 1800 f.utime((atime, mtime)) stat = f.stat() - self.assertEqual(stat.st_mtime, mtime) + assert stat.st_mtime == mtime if sys.platform not in ('win32', 'cygwin'): - self.assertEqual(stat.st_atime, atime) + assert stat.st_atime == atime # can't really test chown, since we'd have to know a valid uid. f.truncate(512) stat = f.stat() - self.assertEqual(stat.st_size, 512) + assert stat.st_size == 512 finally: - sftp.remove(FOLDER + '/special') + sftp.remove(sftp.FOLDER + '/special') - def test_A_readline_seek(self): + def test_A_readline_seek(self, sftp): """ create a text file and write a bunch of text into it. then count the lines in the file, and seek around to retrieve particular lines. this should @@ -452,10 +353,10 @@ class SFTPTest (unittest.TestCase): buffering is reset on 'seek'. """ try: - with sftp.open(FOLDER + '/duck.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f: f.write(ARTICLE) - with sftp.open(FOLDER + '/duck.txt', 'r+') as f: + with sftp.open(sftp.FOLDER + '/duck.txt', 'r+') as f: line_number = 0 loc = 0 pos_list = [] @@ -463,35 +364,35 @@ class SFTPTest (unittest.TestCase): line_number += 1 pos_list.append(loc) loc = f.tell() - self.assertTrue(f.seekable()) + assert f.seekable() f.seek(pos_list[6], f.SEEK_SET) - self.assertEqual(f.readline(), 'Nouzilly, France.\n') + assert f.readline(), 'Nouzilly == France.\n' f.seek(pos_list[17], f.SEEK_SET) - self.assertEqual(f.readline()[:4], 'duck') + assert f.readline()[:4] == 'duck' f.seek(pos_list[10], f.SEEK_SET) - self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n') + assert f.readline() == 'duck types were equally resistant to exogenous insulin compared with chicken.\n' finally: - sftp.remove(FOLDER + '/duck.txt') + sftp.remove(sftp.FOLDER + '/duck.txt') - def test_B_write_seek(self): + def test_B_write_seek(self, sftp): """ create a text file, seek back and change part of it, and verify that the changes worked. """ try: - with sftp.open(FOLDER + '/testing.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/testing.txt', 'w') as f: f.write('hello kitty.\n') f.seek(-5, f.SEEK_CUR) f.write('dd') - self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13) - with sftp.open(FOLDER + '/testing.txt', 'r') as f: + assert sftp.stat(sftp.FOLDER + '/testing.txt').st_size == 13 + with sftp.open(sftp.FOLDER + '/testing.txt', 'r') as f: data = f.read(20) - self.assertEqual(data, b'hello kiddy.\n') + assert data == b'hello kiddy.\n' finally: - sftp.remove(FOLDER + '/testing.txt') + sftp.remove(sftp.FOLDER + '/testing.txt') - def test_C_symlink(self): + def test_C_symlink(self, sftp): """ create a symlink and then check that lstat doesn't follow it. """ @@ -500,97 +401,85 @@ class SFTPTest (unittest.TestCase): return try: - with sftp.open(FOLDER + '/original.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/original.txt', 'w') as f: f.write('original\n') - sftp.symlink('original.txt', FOLDER + '/link.txt') - self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt') + sftp.symlink('original.txt', sftp.FOLDER + '/link.txt') + assert sftp.readlink(sftp.FOLDER + '/link.txt') == 'original.txt' - with sftp.open(FOLDER + '/link.txt', 'r') as f: - self.assertEqual(f.readlines(), ['original\n']) + with sftp.open(sftp.FOLDER + '/link.txt', 'r') as f: + assert f.readlines() == ['original\n'] cwd = sftp.normalize('.') if cwd[-1] == '/': cwd = cwd[:-1] - abs_path = cwd + '/' + FOLDER + '/original.txt' - sftp.symlink(abs_path, FOLDER + '/link2.txt') - self.assertEqual(abs_path, sftp.readlink(FOLDER + '/link2.txt')) + abs_path = cwd + '/' + sftp.FOLDER + '/original.txt' + sftp.symlink(abs_path, sftp.FOLDER + '/link2.txt') + assert abs_path == sftp.readlink(sftp.FOLDER + '/link2.txt') - self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12) - self.assertEqual(sftp.stat(FOLDER + '/link.txt').st_size, 9) + assert sftp.lstat(sftp.FOLDER + '/link.txt').st_size == 12 + assert sftp.stat(sftp.FOLDER + '/link.txt').st_size == 9 # the sftp server may be hiding extra path members from us, so the # length may be longer than we expect: - self.assertTrue(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path)) - self.assertEqual(sftp.stat(FOLDER + '/link2.txt').st_size, 9) - self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9) + assert sftp.lstat(sftp.FOLDER + '/link2.txt').st_size >= len(abs_path) + assert sftp.stat(sftp.FOLDER + '/link2.txt').st_size == 9 + assert sftp.stat(sftp.FOLDER + '/original.txt').st_size == 9 finally: try: - sftp.remove(FOLDER + '/link.txt') + sftp.remove(sftp.FOLDER + '/link.txt') except: pass try: - sftp.remove(FOLDER + '/link2.txt') + sftp.remove(sftp.FOLDER + '/link2.txt') except: pass try: - sftp.remove(FOLDER + '/original.txt') + sftp.remove(sftp.FOLDER + '/original.txt') except: pass - def test_D_flush_seek(self): + def test_D_flush_seek(self, sftp): """ verify that buffered writes are automatically flushed on seek. """ try: - with sftp.open(FOLDER + '/happy.txt', 'w', 1) as f: + with sftp.open(sftp.FOLDER + '/happy.txt', 'w', 1) as f: f.write('full line.\n') f.write('partial') f.seek(9, f.SEEK_SET) f.write('?\n') - with sftp.open(FOLDER + '/happy.txt', 'r') as f: - self.assertEqual(f.readline(), u('full line?\n')) - self.assertEqual(f.read(7), b'partial') + with sftp.open(sftp.FOLDER + '/happy.txt', 'r') as f: + assert f.readline() == u('full line?\n') + assert f.read(7) == b'partial' finally: try: - sftp.remove(FOLDER + '/happy.txt') + sftp.remove(sftp.FOLDER + '/happy.txt') except: pass - def test_E_realpath(self): + def test_E_realpath(self, sftp): """ test that realpath is returning something non-empty and not an error. """ pwd = sftp.normalize('.') - self.assertTrue(len(pwd) > 0) - f = sftp.normalize('./' + FOLDER) - self.assertTrue(len(f) > 0) - self.assertEqual(os.path.join(pwd, FOLDER), f) + assert len(pwd) > 0 + f = sftp.normalize('./' + sftp.FOLDER) + assert len(f) > 0 + assert os.path.join(pwd, sftp.FOLDER) == f - def test_F_mkdir(self): + def test_F_mkdir(self, sftp): """ verify that mkdir/rmdir work. """ - try: - sftp.mkdir(FOLDER + '/subfolder') - except: - self.assertTrue(False, 'exception creating subfolder') - try: - sftp.mkdir(FOLDER + '/subfolder') - self.assertTrue(False, 'no exception overwriting subfolder') - except IOError: - pass - try: - sftp.rmdir(FOLDER + '/subfolder') - except: - self.assertTrue(False, 'exception removing subfolder') - try: - sftp.rmdir(FOLDER + '/subfolder') - self.assertTrue(False, 'no exception removing nonexistent subfolder') - except IOError: - pass + sftp.mkdir(sftp.FOLDER + '/subfolder') + with pytest.raises(IOError): # generic msg only + sftp.mkdir(sftp.FOLDER + '/subfolder') + sftp.rmdir(sftp.FOLDER + '/subfolder') + with pytest.raises(IOError, match="No such file"): + sftp.rmdir(sftp.FOLDER + '/subfolder') - def test_G_chdir(self): + def test_G_chdir(self, sftp): """ verify that chdir/getcwd work. """ @@ -598,35 +487,35 @@ class SFTPTest (unittest.TestCase): if root[-1] != '/': root += '/' try: - sftp.mkdir(FOLDER + '/alpha') - sftp.chdir(FOLDER + '/alpha') + sftp.mkdir(sftp.FOLDER + '/alpha') + sftp.chdir(sftp.FOLDER + '/alpha') sftp.mkdir('beta') - self.assertEqual(root + FOLDER + '/alpha', sftp.getcwd()) - self.assertEqual(['beta'], sftp.listdir('.')) + assert root + sftp.FOLDER + '/alpha' == sftp.getcwd() + assert ['beta'] == sftp.listdir('.') sftp.chdir('beta') with sftp.open('fish', 'w') as f: f.write('hello\n') sftp.chdir('..') - self.assertEqual(['fish'], sftp.listdir('beta')) + assert ['fish'] == sftp.listdir('beta') sftp.chdir('..') - self.assertEqual(['fish'], sftp.listdir('alpha/beta')) + assert ['fish'] == sftp.listdir('alpha/beta') finally: sftp.chdir(root) try: - sftp.unlink(FOLDER + '/alpha/beta/fish') + sftp.unlink(sftp.FOLDER + '/alpha/beta/fish') except: pass try: - sftp.rmdir(FOLDER + '/alpha/beta') + sftp.rmdir(sftp.FOLDER + '/alpha/beta') except: pass try: - sftp.rmdir(FOLDER + '/alpha') + sftp.rmdir(sftp.FOLDER + '/alpha') except: pass - def test_H_get_put(self): + def test_H_get_put(self, sftp): """ verify that get/put work. """ @@ -641,103 +530,102 @@ class SFTPTest (unittest.TestCase): def progress_callback(x, y): saved_progress.append((x, y)) - sftp.put(localname, FOLDER + '/bunny.txt', progress_callback) + sftp.put(localname, sftp.FOLDER + '/bunny.txt', progress_callback) - with sftp.open(FOLDER + '/bunny.txt', 'rb') as f: - self.assertEqual(text, f.read(128)) - self.assertEqual([(41, 41)], saved_progress) + with sftp.open(sftp.FOLDER + '/bunny.txt', 'rb') as f: + assert text == f.read(128) + assert [(41, 41)] == saved_progress os.unlink(localname) fd, localname = mkstemp() os.close(fd) saved_progress = [] - sftp.get(FOLDER + '/bunny.txt', localname, progress_callback) + sftp.get(sftp.FOLDER + '/bunny.txt', localname, progress_callback) with open(localname, 'rb') as f: - self.assertEqual(text, f.read(128)) - self.assertEqual([(41, 41)], saved_progress) + assert text == f.read(128) + assert [(41, 41)] == saved_progress os.unlink(localname) - sftp.unlink(FOLDER + '/bunny.txt') + sftp.unlink(sftp.FOLDER + '/bunny.txt') - def test_I_check(self): + def test_I_check(self, sftp): """ verify that file.check() works against our own server. (it's an sftp extension that we support, and may be the only ones who support it.) """ - with sftp.open(FOLDER + '/kitty.txt', 'w') as f: + with sftp.open(sftp.FOLDER + '/kitty.txt', 'w') as f: f.write('here kitty kitty' * 64) try: - with sftp.open(FOLDER + '/kitty.txt', 'r') as f: + with sftp.open(sftp.FOLDER + '/kitty.txt', 'r') as f: sum = f.check('sha1') - self.assertEqual('91059CFC6615941378D413CB5ADAF4C5EB293402', u(hexlify(sum)).upper()) + assert '91059CFC6615941378D413CB5ADAF4C5EB293402' == u(hexlify(sum)).upper() sum = f.check('md5', 0, 512) - self.assertEqual('93DE4788FCA28D471516963A1FE3856A', u(hexlify(sum)).upper()) + assert '93DE4788FCA28D471516963A1FE3856A' == u(hexlify(sum)).upper() sum = f.check('md5', 0, 0, 510) - self.assertEqual('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6', - u(hexlify(sum)).upper()) + assert u(hexlify(sum)).upper() == 'EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6' # noqa finally: - sftp.unlink(FOLDER + '/kitty.txt') + sftp.unlink(sftp.FOLDER + '/kitty.txt') - def test_J_x_flag(self): + def test_J_x_flag(self, sftp): """ verify that the 'x' flag works when opening a file. """ - sftp.open(FOLDER + '/unusual.txt', 'wx').close() + sftp.open(sftp.FOLDER + '/unusual.txt', 'wx').close() try: try: - sftp.open(FOLDER + '/unusual.txt', 'wx') + sftp.open(sftp.FOLDER + '/unusual.txt', 'wx') self.fail('expected exception') except IOError: pass finally: - sftp.unlink(FOLDER + '/unusual.txt') + sftp.unlink(sftp.FOLDER + '/unusual.txt') - def test_K_utf8(self): + def test_K_utf8(self, sftp): """ verify that unicode strings are encoded into utf8 correctly. """ - with sftp.open(FOLDER + '/something', 'w') as f: + with sftp.open(sftp.FOLDER + '/something', 'w') as f: f.write('okay') try: - sftp.rename(FOLDER + '/something', FOLDER + '/' + unicode_folder) - sftp.open(b(FOLDER) + utf8_folder, 'r') + sftp.rename(sftp.FOLDER + '/something', sftp.FOLDER + '/' + unicode_folder) + sftp.open(b(sftp.FOLDER) + utf8_folder, 'r') except Exception as e: self.fail('exception ' + str(e)) - sftp.unlink(b(FOLDER) + utf8_folder) + sftp.unlink(b(sftp.FOLDER) + utf8_folder) - def test_L_utf8_chdir(self): - sftp.mkdir(FOLDER + '/' + unicode_folder) + def test_L_utf8_chdir(self, sftp): + sftp.mkdir(sftp.FOLDER + '/' + unicode_folder) try: - sftp.chdir(FOLDER + '/' + unicode_folder) + sftp.chdir(sftp.FOLDER + '/' + unicode_folder) with sftp.open('something', 'w') as f: f.write('okay') sftp.unlink('something') finally: sftp.chdir() - sftp.rmdir(FOLDER + '/' + unicode_folder) + sftp.rmdir(sftp.FOLDER + '/' + unicode_folder) - def test_M_bad_readv(self): + def test_M_bad_readv(self, sftp): """ verify that readv at the end of the file doesn't essplode. """ - sftp.open(FOLDER + '/zero', 'w').close() + sftp.open(sftp.FOLDER + '/zero', 'w').close() try: - with sftp.open(FOLDER + '/zero', 'r') as f: + with sftp.open(sftp.FOLDER + '/zero', 'r') as f: f.readv([(0, 12)]) - with sftp.open(FOLDER + '/zero', 'r') as f: + with sftp.open(sftp.FOLDER + '/zero', 'r') as f: file_size = f.stat().st_size f.prefetch(file_size) f.read(100) finally: - sftp.unlink(FOLDER + '/zero') + sftp.unlink(sftp.FOLDER + '/zero') - def test_N_put_without_confirm(self): + def test_N_put_without_confirm(self, sftp): """ verify that get/put work without confirmation. """ @@ -752,138 +640,132 @@ class SFTPTest (unittest.TestCase): def progress_callback(x, y): saved_progress.append((x, y)) - res = sftp.put(localname, FOLDER + '/bunny.txt', progress_callback, False) + res = sftp.put(localname, sftp.FOLDER + '/bunny.txt', progress_callback, False) - self.assertEqual(SFTPAttributes().attr, res.attr) + assert SFTPAttributes().attr == res.attr - with sftp.open(FOLDER + '/bunny.txt', 'r') as f: - self.assertEqual(text, f.read(128)) - self.assertEqual((41, 41), saved_progress[-1]) + with sftp.open(sftp.FOLDER + '/bunny.txt', 'r') as f: + assert text == f.read(128) + assert (41, 41) == saved_progress[-1] os.unlink(localname) - sftp.unlink(FOLDER + '/bunny.txt') + sftp.unlink(sftp.FOLDER + '/bunny.txt') - def test_O_getcwd(self): + def test_O_getcwd(self, sftp): """ verify that chdir/getcwd work. """ - self.assertEqual(None, sftp.getcwd()) + assert sftp.getcwd() == None root = sftp.normalize('.') if root[-1] != '/': root += '/' try: - sftp.mkdir(FOLDER + '/alpha') - sftp.chdir(FOLDER + '/alpha') - self.assertEqual('/' + FOLDER + '/alpha', sftp.getcwd()) + sftp.mkdir(sftp.FOLDER + '/alpha') + sftp.chdir(sftp.FOLDER + '/alpha') + assert sftp.getcwd() == '/' + sftp.FOLDER + '/alpha' finally: sftp.chdir(root) try: - sftp.rmdir(FOLDER + '/alpha') + sftp.rmdir(sftp.FOLDER + '/alpha') except: pass - def XXX_test_M_seek_append(self): + def XXX_test_M_seek_append(self, sftp): """ verify that seek does't affect writes during append. does not work except through paramiko. :( openssh fails. """ try: - with sftp.open(FOLDER + '/append.txt', 'a') as f: + with sftp.open(sftp.FOLDER + '/append.txt', 'a') as f: f.write('first line\nsecond line\n') f.seek(11, f.SEEK_SET) f.write('third line\n') - with sftp.open(FOLDER + '/append.txt', 'r') as f: - self.assertEqual(f.stat().st_size, 34) - self.assertEqual(f.readline(), 'first line\n') - self.assertEqual(f.readline(), 'second line\n') - self.assertEqual(f.readline(), 'third line\n') + with sftp.open(sftp.FOLDER + '/append.txt', 'r') as f: + assert f.stat().st_size == 34 + assert f.readline() == 'first line\n' + assert f.readline() == 'second line\n' + assert f.readline() == 'third line\n' finally: - sftp.remove(FOLDER + '/append.txt') + sftp.remove(sftp.FOLDER + '/append.txt') - def test_putfo_empty_file(self): + def test_putfo_empty_file(self, sftp): """ Send an empty file and confirm it is sent. """ - target = FOLDER + '/empty file.txt' + target = sftp.FOLDER + '/empty file.txt' stream = StringIO() try: attrs = sftp.putfo(stream, target) # the returned attributes should not be null - self.assertNotEqual(attrs, None) + assert attrs is not None finally: sftp.remove(target) - - def test_N_file_with_percent(self): + # TODO: this test doesn't actually fail if the regression (removing '%' + # expansion to '%%' within sftp.py's def _log()) is removed - stacktraces + # appear but they're clearly emitted from subthreads that have no error + # handling. No point running it until that is fixed somehow. + @pytest.mark.skip("Doesn't prove anything right now") + def test_N_file_with_percent(self, sftp): """ verify that we can create a file with a '%' in the filename. ( it needs to be properly escaped by _log() ) """ - self.assertTrue( paramiko.util.get_logger("paramiko").handlers, "This unit test requires logging to be enabled" ) - f = sftp.open(FOLDER + '/test%file', 'w') + f = sftp.open(sftp.FOLDER + '/test%file', 'w') try: - self.assertEqual(f.stat().st_size, 0) + assert f.stat().st_size == 0 finally: f.close() - sftp.remove(FOLDER + '/test%file') - + sftp.remove(sftp.FOLDER + '/test%file') - def test_O_non_utf8_data(self): + def test_O_non_utf8_data(self, sftp): """Test write() and read() of non utf8 data""" try: - with sftp.open('%s/nonutf8data' % FOLDER, 'w') as f: + with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'w') as f: f.write(NON_UTF8_DATA) - with sftp.open('%s/nonutf8data' % FOLDER, 'r') as f: + with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'r') as f: data = f.read() - self.assertEqual(data, NON_UTF8_DATA) - with sftp.open('%s/nonutf8data' % FOLDER, 'wb') as f: + assert data == NON_UTF8_DATA + with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'wb') as f: f.write(NON_UTF8_DATA) - with sftp.open('%s/nonutf8data' % FOLDER, 'rb') as f: + with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'rb') as f: data = f.read() - self.assertEqual(data, NON_UTF8_DATA) + assert data == NON_UTF8_DATA finally: - sftp.remove('%s/nonutf8data' % FOLDER) + sftp.remove('%s/nonutf8data' % sftp.FOLDER) - def test_sftp_attributes_empty_str(self): + def test_sftp_attributes_empty_str(self, sftp): sftp_attributes = SFTPAttributes() - self.assertEqual(str(sftp_attributes), "?--------- 1 0 0 0 (unknown date) ?") + assert str(sftp_attributes) == "?--------- 1 0 0 0 (unknown date) ?" - @skipUnlessBuiltin('buffer') - def test_write_buffer(self): + @needs_builtin('buffer') + def test_write_buffer(self, sftp): """Test write() using a buffer instance.""" data = 3 * b'A potentially large block of data to chunk up.\n' try: - with sftp.open('%s/write_buffer' % FOLDER, 'wb') as f: + with sftp.open('%s/write_buffer' % sftp.FOLDER, 'wb') as f: for offset in range(0, len(data), 8): f.write(buffer(data, offset, 8)) - with sftp.open('%s/write_buffer' % FOLDER, 'rb') as f: - self.assertEqual(f.read(), data) + with sftp.open('%s/write_buffer' % sftp.FOLDER, 'rb') as f: + assert f.read() == data finally: - sftp.remove('%s/write_buffer' % FOLDER) + sftp.remove('%s/write_buffer' % sftp.FOLDER) - @skipUnlessBuiltin('memoryview') - def test_write_memoryview(self): + @needs_builtin('memoryview') + def test_write_memoryview(self, sftp): """Test write() using a memoryview instance.""" data = 3 * b'A potentially large block of data to chunk up.\n' try: - with sftp.open('%s/write_memoryview' % FOLDER, 'wb') as f: + with sftp.open('%s/write_memoryview' % sftp.FOLDER, 'wb') as f: view = memoryview(data) for offset in range(0, len(data), 8): f.write(view[offset:offset+8]) - with sftp.open('%s/write_memoryview' % FOLDER, 'rb') as f: - self.assertEqual(f.read(), data) + with sftp.open('%s/write_memoryview' % sftp.FOLDER, 'rb') as f: + assert f.read() == data finally: - sftp.remove('%s/write_memoryview' % FOLDER) - - -if __name__ == '__main__': - SFTPTest.init_loopback() - # logging is required by test_N_file_with_percent - paramiko.util.log_to_file('test_sftp.log') - from unittest import main - main() + sftp.remove('%s/write_memoryview' % sftp.FOLDER) diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py index cfad5682..a659098d 100644 --- a/tests/test_sftp_big.py +++ b/tests/test_sftp_big.py @@ -31,94 +31,75 @@ import time import unittest from paramiko.common import o660 -from tests.test_sftp import get_sftp -FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000') +from .util import slow -class BigSFTPTest (unittest.TestCase): - - def setUp(self): - global FOLDER - sftp = get_sftp() - for i in range(1000): - FOLDER = FOLDER[:-3] + '%03d' % i - try: - sftp.mkdir(FOLDER) - break - except (IOError, OSError): - pass - - def tearDown(self): - sftp = get_sftp() - sftp.rmdir(FOLDER) - - def test_1_lots_of_files(self): +@slow +class TestBigSFTP(object): + def test_1_lots_of_files(self, sftp): """ create a bunch of files over the same session. """ - sftp = get_sftp() numfiles = 100 try: for i in range(numfiles): - with sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1) as f: + with sftp.open('%s/file%d.txt' % (sftp.FOLDER, i), 'w', 1) as f: f.write('this is file #%d.\n' % i) - sftp.chmod('%s/file%d.txt' % (FOLDER, i), o660) + sftp.chmod('%s/file%d.txt' % (sftp.FOLDER, i), o660) # now make sure every file is there, by creating a list of filenmes # and reading them in random order. numlist = list(range(numfiles)) while len(numlist) > 0: r = numlist[random.randint(0, len(numlist) - 1)] - with sftp.open('%s/file%d.txt' % (FOLDER, r)) as f: - self.assertEqual(f.readline(), 'this is file #%d.\n' % r) + with sftp.open('%s/file%d.txt' % (sftp.FOLDER, r)) as f: + assert f.readline() == 'this is file #%d.\n' % r numlist.remove(r) finally: for i in range(numfiles): try: - sftp.remove('%s/file%d.txt' % (FOLDER, i)) + sftp.remove('%s/file%d.txt' % (sftp.FOLDER, i)) except: pass - def test_2_big_file(self): + def test_2_big_file(self, sftp): """ write a 1MB file with no buffering. """ - sftp = get_sftp() kblob = (1024 * b'x') start = time.time() try: - with sftp.open('%s/hongry.txt' % FOLDER, 'w') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w') as f: for n in range(1024): f.write(kblob) if n % 128 == 0: sys.stderr.write('.') sys.stderr.write(' ') - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 end = time.time() sys.stderr.write('%ds ' % round(end - start)) start = time.time() - with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f: for n in range(1024): data = f.read(1024) - self.assertEqual(data, kblob) + assert data == kblob end = time.time() sys.stderr.write('%ds ' % round(end - start)) finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_3_big_file_pipelined(self): + def test_3_big_file_pipelined(self, sftp): """ write a 1MB file, with no linefeeds, using pipelining. """ - sftp = get_sftp() kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) start = time.time() try: - with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -126,12 +107,12 @@ class BigSFTPTest (unittest.TestCase): sys.stderr.write('.') sys.stderr.write(' ') - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 end = time.time() sys.stderr.write('%ds ' % round(end - start)) start = time.time() - with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f: file_size = f.stat().st_size f.prefetch(file_size) @@ -145,19 +126,18 @@ class BigSFTPTest (unittest.TestCase): chunk = size - n data = f.read(chunk) offset = n % 1024 - self.assertEqual(data, k2blob[offset:offset + chunk]) + assert data == k2blob[offset:offset + chunk] n += chunk end = time.time() sys.stderr.write('%ds ' % round(end - start)) finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_4_prefetch_seek(self): - sftp = get_sftp() + def test_4_prefetch_seek(self, sftp): kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) try: - with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -165,13 +145,13 @@ class BigSFTPTest (unittest.TestCase): sys.stderr.write('.') sys.stderr.write(' ') - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 start = time.time() k2blob = kblob + kblob chunk = 793 for i in range(10): - with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f: file_size = f.stat().st_size f.prefetch(file_size) base_offset = (512 * 1024) + 17 * random.randint(1000, 2000) @@ -183,18 +163,17 @@ class BigSFTPTest (unittest.TestCase): f.seek(offset) data = f.read(chunk) n_offset = offset % 1024 - self.assertEqual(data, k2blob[n_offset:n_offset + chunk]) + assert data == k2blob[n_offset:n_offset + chunk] offset += chunk end = time.time() sys.stderr.write('%ds ' % round(end - start)) finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_5_readv_seek(self): - sftp = get_sftp() + def test_5_readv_seek(self, sftp): kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) try: - with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -202,13 +181,13 @@ class BigSFTPTest (unittest.TestCase): sys.stderr.write('.') sys.stderr.write(' ') - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 start = time.time() k2blob = kblob + kblob chunk = 793 for i in range(10): - with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f: base_offset = (512 * 1024) + 17 * random.randint(1000, 2000) # make a bunch of offsets and put them in random order offsets = [base_offset + j * chunk for j in range(100)] @@ -221,21 +200,20 @@ class BigSFTPTest (unittest.TestCase): for i in range(len(readv_list)): offset = readv_list[i][0] n_offset = offset % 1024 - self.assertEqual(next(ret), k2blob[n_offset:n_offset + chunk]) + assert next(ret) == k2blob[n_offset:n_offset + chunk] end = time.time() sys.stderr.write('%ds ' % round(end - start)) finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_6_lots_of_prefetching(self): + def test_6_lots_of_prefetching(self, sftp): """ prefetch a 1MB file a bunch of times, discarding the file object without using it, to verify that paramiko doesn't get confused. """ - sftp = get_sftp() kblob = (1024 * b'x') try: - with sftp.open('%s/hongry.txt' % FOLDER, 'w') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w') as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -243,32 +221,31 @@ class BigSFTPTest (unittest.TestCase): sys.stderr.write('.') sys.stderr.write(' ') - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 for i in range(10): - with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f: file_size = f.stat().st_size f.prefetch(file_size) - with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f: file_size = f.stat().st_size f.prefetch(file_size) for n in range(1024): data = f.read(1024) - self.assertEqual(data, kblob) + assert data == kblob if n % 128 == 0: sys.stderr.write('.') sys.stderr.write(' ') finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_7_prefetch_readv(self): + def test_7_prefetch_readv(self, sftp): """ verify that prefetch and readv don't conflict with each other. """ - sftp = get_sftp() kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) try: - with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -276,13 +253,13 @@ class BigSFTPTest (unittest.TestCase): sys.stderr.write('.') sys.stderr.write(' ') - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 - with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f: file_size = f.stat().st_size f.prefetch(file_size) data = f.read(1024) - self.assertEqual(data, kblob) + assert data == kblob chunk_size = 793 base_offset = 512 * 1024 @@ -290,23 +267,22 @@ class BigSFTPTest (unittest.TestCase): chunks = [(base_offset + (chunk_size * i), chunk_size) for i in range(20)] for data in f.readv(chunks): offset = base_offset % 1024 - self.assertEqual(chunk_size, len(data)) - self.assertEqual(k2blob[offset:offset + chunk_size], data) + assert chunk_size == len(data) + assert k2blob[offset:offset + chunk_size] == data base_offset += chunk_size sys.stderr.write(' ') finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_8_large_readv(self): + def test_8_large_readv(self, sftp): """ verify that a very large readv is broken up correctly and still returned as a single blob. """ - sftp = get_sftp() kblob = bytes().join([struct.pack('>H', n) for n in range(512)]) try: - with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f: f.set_pipelined(True) for n in range(1024): f.write(kblob) @@ -314,62 +290,53 @@ class BigSFTPTest (unittest.TestCase): sys.stderr.write('.') sys.stderr.write(' ') - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 - with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f: data = list(f.readv([(23 * 1024, 128 * 1024)])) - self.assertEqual(1, len(data)) + assert len(data) == 1 data = data[0] - self.assertEqual(128 * 1024, len(data)) + assert len(data) == 128 * 1024 sys.stderr.write(' ') finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_9_big_file_big_buffer(self): + def test_9_big_file_big_buffer(self, sftp): """ write a 1MB file, with no linefeeds, and a big buffer. """ - sftp = get_sftp() mblob = (1024 * 1024 * 'x') try: - with sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w', 128 * 1024) as f: f.write(mblob) - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) - def test_A_big_file_renegotiate(self): + def test_A_big_file_renegotiate(self, sftp): """ write a 1MB file, forcing key renegotiation in the middle. """ - sftp = get_sftp() t = sftp.sock.get_transport() t.packetizer.REKEY_BYTES = 512 * 1024 k32blob = (32 * 1024 * 'x') try: - with sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w', 128 * 1024) as f: for i in range(32): f.write(k32blob) - self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) - self.assertNotEqual(t.H, t.session_id) + assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024 + assert t.H != t.session_id # try to read it too. - with sftp.open('%s/hongry.txt' % FOLDER, 'r', 128 * 1024) as f: + with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r', 128 * 1024) as f: file_size = f.stat().st_size f.prefetch(file_size) total = 0 while total < 1024 * 1024: total += len(f.read(32 * 1024)) finally: - sftp.remove('%s/hongry.txt' % FOLDER) + sftp.remove('%s/hongry.txt' % sftp.FOLDER) t.packetizer.REKEY_BYTES = pow(2, 30) - - -if __name__ == '__main__': - from tests.test_sftp import SFTPTest - SFTPTest.init_loopback() - from unittest import main - main() diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index d8d05d2b..f0645e0e 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -29,17 +29,20 @@ import unittest import paramiko -from tests.util import test_path -from tests.test_client import FINGERPRINTS +from .util import _support, needs_gssapi +from .test_client import FINGERPRINTS -class NullServer (paramiko.ServerInterface): +class NullServer (paramiko.ServerInterface): def get_allowed_auths(self, username): return 'gssapi-with-mic,publickey' - def check_auth_gssapi_with_mic(self, username, - gss_authenticated=paramiko.AUTH_FAILED, - cc_file=None): + def check_auth_gssapi_with_mic( + self, + username, + gss_authenticated=paramiko.AUTH_FAILED, + cc_file=None, + ): if gss_authenticated == paramiko.AUTH_SUCCESSFUL: return paramiko.AUTH_SUCCESSFUL return paramiko.AUTH_FAILED @@ -66,18 +69,15 @@ class NullServer (paramiko.ServerInterface): return True +@needs_gssapi class GSSAuthTest(unittest.TestCase): - @staticmethod - def init(username, hostname): - global krb5_principal, targ_name - krb5_principal = username - targ_name = hostname - def setUp(self): - self.username = krb5_principal - self.hostname = socket.getfqdn(targ_name) + # TODO: username and targ_name should come from os.environ or whatever + # the approved pytest method is for runtime-configuring test data. + self.username = "krb5_principal" + self.hostname = socket.getfqdn("targ_name") self.sockl = socket.socket() - self.sockl.bind((targ_name, 0)) + self.sockl.bind(("targ_name", 0)) self.sockl.listen(1) self.addr, self.port = self.sockl.getsockname() self.event = threading.Event() @@ -148,6 +148,6 @@ class GSSAuthTest(unittest.TestCase): Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding """ self.hostname = "this_host_does_not_exists_and_causes_a_GSSAPI-exception" - self._test_connection(key_filename=[test_path('test_rsa.key')], + self._test_connection(key_filename=[_support('test_rsa.key')], allow_agent=False, look_for_keys=False) diff --git a/tests/test_transport.py b/tests/test_transport.py index 99cbc3e0..9474acfc 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -43,9 +43,9 @@ from paramiko.common import ( ) from paramiko.py3compat import bytes from paramiko.message import Message -from tests import skipUnlessBuiltin -from tests.loop import LoopSocket -from tests.util import test_path + +from .util import needs_builtin, _support, slow +from .loop import LoopSocket LONG_BANNER = """\ @@ -64,7 +64,7 @@ Maybe. class NullServer (ServerInterface): paranoid_did_password = False paranoid_did_public_key = False - paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key')) + paranoid_key = DSSKey.from_private_key_file(_support('test_dss.key')) def get_allowed_auths(self, username): if username == 'slowdive': @@ -136,7 +136,7 @@ class TransportTest(unittest.TestCase): def setup_test_server( self, client_options=None, server_options=None, connect_kwargs=None, ): - host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) @@ -200,7 +200,7 @@ class TransportTest(unittest.TestCase): loopback sockets. this is hardly "simple" but it's simpler than the later tests. :) """ - host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -225,7 +225,7 @@ class TransportTest(unittest.TestCase): """ verify that a long banner doesn't mess up the handshake. """ - host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -257,6 +257,7 @@ class TransportTest(unittest.TestCase): self.tc.renegotiate_keys() self.ts.send_ignore(1024) + @slow def test_5_keepalive(self): """ verify that the keepalive will be sent. @@ -820,6 +821,7 @@ class TransportTest(unittest.TestCase): (2**32, MAX_WINDOW_SIZE)]: self.assertEqual(self.tc._sanitize_window_size(val), correct) + @slow def test_L_handshake_timeout(self): """ verify that we can get a hanshake timeout. @@ -840,7 +842,7 @@ class TransportTest(unittest.TestCase): # be fine. Even tho it's a bit squicky. self.tc.packetizer = SlowPacketizer(self.tc.sock) # Continue with regular test red tape. - host_key = RSAKey.from_private_key_file(test_path('test_rsa.key')) + host_key = RSAKey.from_private_key_file(_support('test_rsa.key')) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -892,7 +894,7 @@ class TransportTest(unittest.TestCase): expected = text.encode("utf-8") self.assertEqual(sfile.read(len(expected)), expected) - @skipUnlessBuiltin('buffer') + @needs_builtin('buffer') def test_channel_send_buffer(self): """ verify sending buffer instances to a channel @@ -915,7 +917,7 @@ class TransportTest(unittest.TestCase): chan.sendall(buffer(data)) self.assertEqual(sfile.read(len(data)), data) - @skipUnlessBuiltin('memoryview') + @needs_builtin('memoryview') def test_channel_send_memoryview(self): """ verify sending memoryview instances to a channel diff --git a/tests/test_util.py b/tests/test_util.py index 7880e156..90473f43 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -30,6 +30,7 @@ import paramiko.util from paramiko.util import lookup_ssh_host_config as host_config, safe_string from paramiko.py3compat import StringIO, byte_ord, b + # Note some lines in this configuration have trailing spaces on purpose test_config_file = """\ Host * @@ -366,7 +367,7 @@ IdentityFile something_%l_using_fqdn def test_get_hostnames(self): f = StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) - self.assertEqual(config.get_hostnames(), set(['*', '*.example.com', 'spoo.example.com'])) + self.assertEqual(config.get_hostnames(), {'*', '*.example.com', 'spoo.example.com'}) def test_quoted_host_names(self): test_config_file = """\ @@ -469,12 +470,12 @@ Host param3 parara self.assertRaises(Exception, conf._get_hosts, host) def test_safe_string(self): - vanilla = b("vanilla") - has_bytes = b("has \7\3 bytes") + vanilla = b"vanilla" + has_bytes = b"has \7\3 bytes" safe_vanilla = safe_string(vanilla) safe_has_bytes = safe_string(has_bytes) - expected_bytes = b("has %07%03 bytes") - err = "{0!r} != {1!r}" + expected_bytes = b"has %07%03 bytes" + err = "{!r} != {!r}" msg = err.format(safe_vanilla, vanilla) assert safe_vanilla == vanilla, msg msg = err.format(safe_has_bytes, expected_bytes) diff --git a/tests/util.py b/tests/util.py index c1b43da8..4ca02374 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,6 +1,27 @@ -import os +from os.path import dirname, realpath, join -root_path = os.path.dirname(os.path.realpath(__file__)) +import pytest -def test_path(filename): - return os.path.join(root_path, filename) +from paramiko.py3compat import builtins + + +def _support(filename): + return join(dirname(realpath(__file__)), filename) + + +# TODO: consider using pytest.importorskip('gssapi') instead? We presumably +# still need CLI configurability for the Kerberos parameters, though, so can't +# JUST key off presence of GSSAPI optional dependency... +# TODO: anyway, s/True/os.environ.get('RUN_GSSAPI', False)/ or something. +needs_gssapi = pytest.mark.skipif(True, reason="No GSSAPI to test") + + +def needs_builtin(name): + """ + Skip decorated test if builtin name does not exist. + """ + reason = "Test requires a builtin '{}'".format(name) + return pytest.mark.skipif(not hasattr(builtins, name), reason=reason) + + +slow = pytest.mark.slow |