summaryrefslogtreecommitdiffhomepage
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/__init__.py37
-rw-r--r--tests/conftest.py103
-rw-r--r--tests/loop.py6
-rw-r--r--tests/stub_sftp.py1
-rw-r--r--tests/test_auth.py13
-rw-r--r--tests/test_buffered_pipe.py1
-rw-r--r--tests/test_client.py189
-rw-r--r--[-rwxr-xr-x]tests/test_file.py8
-rw-r--r--tests/test_gssapi.py33
-rw-r--r--tests/test_hostkeys.py1
-rw-r--r--tests/test_kex.py5
-rw-r--r--tests/test_kex_gss.py3
-rw-r--r--tests/test_message.py1
-rw-r--r--tests/test_packetizer.py5
-rw-r--r--tests/test_pkey.py67
-rw-r--r--[-rwxr-xr-x]tests/test_sftp.py656
-rw-r--r--tests/test_sftp_big.py163
-rw-r--r--tests/test_ssh_gss.py32
-rw-r--r--tests/test_transport.py22
-rw-r--r--tests/test_util.py11
-rw-r--r--tests/util.py29
21 files changed, 709 insertions, 677 deletions
diff --git a/tests/__init__.py b/tests/__init__.py
index 8878f14d..be1d2daa 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -1,36 +1 @@
-# Copyright (C) 2017 Martin Packman <gzlist@googlemail.com>
-#
-# This file is part of paramiko.
-#
-# Paramiko is free software; you can redistribute it and/or modify it under the
-# terms of the GNU Lesser General Public License as published by the Free
-# Software Foundation; either version 2.1 of the License, or (at your option)
-# any later version.
-#
-# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY
-# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
-# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
-# details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
-# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
-
-"""Base classes and helpers for testing paramiko."""
-
-import unittest
-
-from paramiko.py3compat import (
- builtins,
- )
-
-
-def skipUnlessBuiltin(name):
- """Skip decorated test if builtin name does not exist."""
- if getattr(builtins, name, None) is None:
- skip = getattr(unittest, "skip", None)
- if skip is None:
- # Python 2.6 pseudo-skip
- return lambda func: None
- return skip("No builtin " + repr(name))
- return lambda func: func
+# This file's just here so test modules can use explicit-relative imports.
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 00000000..d1967a73
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,103 @@
+import logging
+import os
+import shutil
+import threading
+
+import pytest
+from paramiko import RSAKey, SFTPServer, SFTP, Transport
+
+from .loop import LoopSocket
+from .stub_sftp import StubServer, StubSFTPServer
+from .util import _support
+
+
+# TODO: not a huge fan of conftest.py files, see if we can move these somewhere
+# 'nicer'.
+
+
+# Perform logging by default; pytest will capture and thus hide it normally,
+# presenting it on error/failure. (But also allow turning it off when doing
+# very pinpoint debugging - e.g. using breakpoints, so you don't want output
+# hiding enabled, but also don't want all the logging to gum up the terminal.)
+if not os.environ.get('DISABLE_LOGGING', False):
+ logging.basicConfig(
+ level=logging.DEBUG,
+ # Also make sure to set up timestamping for more sanity when debugging.
+ format="[%(relativeCreated)s]\t%(levelname)s:%(name)s:%(message)s",
+ datefmt="%H:%M:%S",
+ )
+
+
+def make_sftp_folder():
+ """
+ Ensure expected target temp folder exists on the remote end.
+
+ Will clean it out if it already exists.
+ """
+ # TODO: go back to using the sftp functionality itself for folder setup so
+ # we can test against live SFTP servers again someday. (Not clear if anyone
+ # is/was using the old capability for such, though...)
+ # TODO: something that would play nicer with concurrent testing (but
+ # probably e.g. using thread ID or UUIDs or something; not the "count up
+ # until you find one not used!" crap from before...)
+ # TODO: if we want to lock ourselves even harder into localhost-only
+ # testing (probably not?) could use tempdir modules for this for improved
+ # safety. Then again...why would someone have such a folder???
+ path = os.environ.get('TEST_FOLDER', 'paramiko-test-target')
+ # Forcibly nuke this directory locally, since at the moment, the below
+ # fixtures only ever run with a locally scoped stub test server.
+ shutil.rmtree(path, ignore_errors=True)
+ # Then create it anew, again locally, for the same reason.
+ os.mkdir(path)
+ return path
+
+
+@pytest.fixture#(scope='session')
+def sftp_server():
+ """
+ Set up an in-memory SFTP server thread. Yields the client Transport/socket.
+
+ The resulting client Transport (along with all the server components) will
+ be the same object throughout the test session; the `sftp` fixture then
+ creates new higher level client objects wrapped around the client
+ Transport, as necessary.
+ """
+ # Sockets & transports
+ socks = LoopSocket()
+ sockc = LoopSocket()
+ sockc.link(socks)
+ tc = Transport(sockc)
+ ts = Transport(socks)
+ # Auth
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
+ ts.add_server_key(host_key)
+ # Server setup
+ event = threading.Event()
+ server = StubServer()
+ ts.set_subsystem_handler('sftp', SFTPServer, StubSFTPServer)
+ ts.start_server(event, server)
+ # Wait (so client has time to connect? Not sure. Old.)
+ event.wait(1.0)
+ # Make & yield connection.
+ tc.connect(username='slowdive', password='pygmalion')
+ yield tc
+ # TODO: any need for shutdown? Why didn't old suite do so? Or was that the
+ # point of the "join all threads from threading module" crap in test.py?
+
+
+@pytest.fixture
+def sftp(sftp_server):
+ """
+ Yield an SFTP client connected to the global in-session SFTP server thread.
+ """
+ # Client setup
+ client = SFTP.from_transport(sftp_server)
+ # Work in 'remote' folder setup (as it wants to use the client)
+ # TODO: how cleanest to make this available to tests? Doing it this way is
+ # marginally less bad than the previous 'global'-using setup, but not by
+ # much?
+ client.FOLDER = make_sftp_folder()
+ # Yield client to caller
+ yield client
+ # Clean up - as in make_sftp_folder, we assume local-only exec for now.
+ shutil.rmtree(client.FOLDER, ignore_errors=True)
diff --git a/tests/loop.py b/tests/loop.py
index e805ad96..6c432867 100644
--- a/tests/loop.py
+++ b/tests/loop.py
@@ -16,11 +16,9 @@
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
-"""
-...
-"""
+import socket
+import threading
-import threading, socket
from paramiko.common import asbytes
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py
index 0d673091..19545865 100644
--- a/tests/stub_sftp.py
+++ b/tests/stub_sftp.py
@@ -22,6 +22,7 @@ A stub SFTP server for loopback SFTP testing.
import os
import sys
+
from paramiko import (
ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes,
SFTPHandle, SFTP_OK, SFTP_FAILURE, AUTH_SUCCESSFUL, OPEN_SUCCEEDED,
diff --git a/tests/test_auth.py b/tests/test_auth.py
index e78397c6..4eade610 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -31,8 +31,10 @@ from paramiko import (
)
from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL
from paramiko.py3compat import u
-from tests.loop import LoopSocket
-from tests.util import test_path
+
+from .loop import LoopSocket
+from .util import _support, slow
+
_pwd = u('\u2022')
@@ -40,7 +42,7 @@ _pwd = u('\u2022')
class NullServer (ServerInterface):
paranoid_did_password = False
paranoid_did_public_key = False
- paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key'))
+ paranoid_key = DSSKey.from_private_key_file(_support('test_dss.key'))
def get_allowed_auths(self, username):
if username == 'slowdive':
@@ -118,7 +120,7 @@ class AuthTest (unittest.TestCase):
self.sockc.close()
def start_server(self):
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
self.public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
self.event = threading.Event()
@@ -170,7 +172,7 @@ class AuthTest (unittest.TestCase):
self.tc.connect(hostkey=self.public_host_key)
remain = self.tc.auth_password(username='paranoid', password='paranoid')
self.assertEqual(['publickey'], remain)
- key = DSSKey.from_private_key_file(test_path('test_dss.key'))
+ key = DSSKey.from_private_key_file(_support('test_dss.key'))
remain = self.tc.auth_publickey(username='paranoid', key=key)
self.assertEqual([], remain)
self.verify_finished()
@@ -238,6 +240,7 @@ class AuthTest (unittest.TestCase):
etype, evalue, etb = sys.exc_info()
self.assertTrue(issubclass(etype, AuthenticationException))
+ @slow
def test_9_auth_non_responsive(self):
"""
verify that authentication times out if server takes to long to
diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py
index eeb4d0ad..03616c55 100644
--- a/tests/test_buffered_pipe.py
+++ b/tests/test_buffered_pipe.py
@@ -23,6 +23,7 @@ Some unit tests for BufferedPipe.
import threading
import time
import unittest
+
from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
from paramiko import pipe
from paramiko.py3compat import b
diff --git a/tests/test_client.py b/tests/test_client.py
index 7058f394..7163fdcf 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -20,25 +20,32 @@
Some unit tests for SSHClient.
"""
-from __future__ import with_statement
+from __future__ import with_statement, print_function
import gc
+import os
import platform
import socket
-from tempfile import mkstemp
import threading
+import time
import unittest
-import weakref
import warnings
-import os
-import time
-from tests.util import test_path
+import weakref
+from tempfile import mkstemp
+
+from pytest_relaxed import raises
import paramiko
from paramiko.pkey import PublicBlob
from paramiko.common import PY2
from paramiko.ssh_exception import SSHException, AuthenticationException
+from .util import _support, slow
+
+
+requires_gss_auth = unittest.skipUnless(
+ paramiko.GSS_AUTH_AVAILABLE, "GSS auth not available"
+)
FINGERPRINTS = {
'ssh-dss': b'\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c',
@@ -106,8 +113,7 @@ class NullServer(paramiko.ServerInterface):
return True
-class SSHClientTest (unittest.TestCase):
-
+class ClientTest(unittest.TestCase):
def setUp(self):
self.sockl = socket.socket()
self.sockl.bind(('localhost', 0))
@@ -120,21 +126,47 @@ class SSHClientTest (unittest.TestCase):
look_for_keys=False,
)
self.event = threading.Event()
+ self.kill_event = threading.Event()
def tearDown(self):
- for attr in "tc ts socks sockl".split():
- if hasattr(self, attr):
- getattr(self, attr).close()
-
- def _run(self, allowed_keys=None, delay=0, public_blob=None):
+ # Shut down client Transport
+ if hasattr(self, 'tc'):
+ self.tc.close()
+ # Shut down shared socket
+ if hasattr(self, 'sockl'):
+ # Signal to server thread that it should shut down early; it checks
+ # this immediately after accept(). (In scenarios where connection
+ # actually succeeded during the test, this becomes a no-op.)
+ self.kill_event.set()
+ # Forcibly connect to server sock in case the server thread is
+ # hanging out in its accept() (e.g. if the client side of the test
+ # fails before it even gets to connecting); there's no other good
+ # way to force an accept() to exit.
+ put_a_sock_in_it = socket.socket()
+ put_a_sock_in_it.connect((self.addr, self.port))
+ put_a_sock_in_it.close()
+ # Then close "our" end of the socket (which _should_ cause the
+ # accept() to bail out, but does not, for some reason. I blame
+ # threading.)
+ self.sockl.close()
+
+ def _run(
+ self, allowed_keys=None, delay=0, public_blob=None, kill_event=None,
+ ):
if allowed_keys is None:
allowed_keys = FINGERPRINTS.keys()
self.socks, addr = self.sockl.accept()
+ # If the kill event was set at this point, it indicates an early
+ # shutdown, so bail out now and don't even try setting up a Transport
+ # (which will just verbosely die.)
+ if kill_event and kill_event.is_set():
+ self.socks.close()
+ return
self.ts = paramiko.Transport(self.socks)
- keypath = test_path('test_rsa.key')
+ keypath = _support('test_rsa.key')
host_key = paramiko.RSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
- keypath = test_path('test_ecdsa_256.key')
+ keypath = _support('test_ecdsa_256.key')
host_key = paramiko.ECDSAKey.from_private_key_file(keypath)
self.ts.add_server_key(host_key)
server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob)
@@ -149,12 +181,12 @@ class SSHClientTest (unittest.TestCase):
The exception is ``allowed_keys`` which is stripped and handed to the
``NullServer`` used for testing.
"""
- run_kwargs = {}
+ run_kwargs = {'kill_event': self.kill_event}
for key in ('allowed_keys', 'public_blob'):
run_kwargs[key] = kwargs.pop(key, None)
# Server setup
threading.Thread(target=self._run, kwargs=run_kwargs).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
# Client setup
@@ -190,6 +222,8 @@ class SSHClientTest (unittest.TestCase):
stdout.close()
stderr.close()
+
+class SSHClientTest(ClientTest):
def test_1_client(self):
"""
verify that the SSHClient stuff works too.
@@ -200,22 +234,22 @@ class SSHClientTest (unittest.TestCase):
"""
verify that SSHClient works with a DSA key.
"""
- self._test_connection(key_filename=test_path('test_dss.key'))
+ self._test_connection(key_filename=_support('test_dss.key'))
def test_client_rsa(self):
"""
verify that SSHClient works with an RSA key.
"""
- self._test_connection(key_filename=test_path('test_rsa.key'))
+ self._test_connection(key_filename=_support('test_rsa.key'))
def test_2_5_client_ecdsa(self):
"""
verify that SSHClient works with an ECDSA key.
"""
- self._test_connection(key_filename=test_path('test_ecdsa_256.key'))
+ self._test_connection(key_filename=_support('test_ecdsa_256.key'))
def test_client_ed25519(self):
- self._test_connection(key_filename=test_path('test_ed25519.key'))
+ self._test_connection(key_filename=_support('test_ed25519.key'))
def test_3_multiple_key_files(self):
"""
@@ -238,7 +272,7 @@ class SSHClientTest (unittest.TestCase):
try:
self._test_connection(
key_filename=[
- test_path('test_{0}.key'.format(x)) for x in attempt
+ _support('test_{}.key'.format(x)) for x in attempt
],
allowed_keys=[types_[x] for x in accept],
)
@@ -256,7 +290,7 @@ class SSHClientTest (unittest.TestCase):
# various platforms trigger different errors here >_<
self.assertRaises(SSHException,
self._test_connection,
- key_filename=[test_path('test_rsa.key')],
+ key_filename=[_support('test_rsa.key')],
allowed_keys=['ecdsa-sha2-nistp256'],
)
@@ -266,8 +300,8 @@ class SSHClientTest (unittest.TestCase):
# server-side behavior is 100% identical.)
# NOTE: only bothered whipping up one cert per overall class/family.
for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'):
- cert_name = 'test_{0}.key-cert.pub'.format(type_)
- cert_path = test_path(os.path.join('cert_support', cert_name))
+ cert_name = 'test_{}.key-cert.pub'.format(type_)
+ cert_path = _support(os.path.join('cert_support', cert_name))
self._test_connection(
key_filename=cert_path,
public_blob=PublicBlob.from_file(cert_path),
@@ -281,12 +315,12 @@ class SSHClientTest (unittest.TestCase):
# that a specific cert was found, along with regular authorization
# succeeding proving that the overall flow works.
for type_ in ('rsa', 'dss', 'ecdsa_256', 'ed25519'):
- key_name = 'test_{0}.key'.format(type_)
- key_path = test_path(os.path.join('cert_support', key_name))
+ key_name = 'test_{}.key'.format(type_)
+ key_path = _support(os.path.join('cert_support', key_name))
self._test_connection(
key_filename=key_path,
public_blob=PublicBlob.from_file(
- '{0}-cert.pub'.format(key_path)
+ '{}-cert.pub'.format(key_path)
),
)
@@ -302,7 +336,7 @@ class SSHClientTest (unittest.TestCase):
"""
threading.Thread(target=self._run).start()
hostname = '[%s]:%d' % (self.addr, self.port)
- key_file = test_path('test_ecdsa_256.key')
+ key_file = _support('test_ecdsa_256.key')
public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file)
self.tc = paramiko.SSHClient()
@@ -325,7 +359,7 @@ class SSHClientTest (unittest.TestCase):
"""
warnings.filterwarnings('ignore', 'tempnam.*')
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
fd, localname = mkstemp()
os.close(fd)
@@ -405,7 +439,7 @@ class SSHClientTest (unittest.TestCase):
"""
# Start the thread with a 1 second wait.
threading.Thread(target=self._run, kwargs={'delay': 1}).start()
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = paramiko.RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
@@ -432,12 +466,13 @@ class SSHClientTest (unittest.TestCase):
# 'television' as per tests/test_pkey.py). NOTE: must use
# key_filename, loading the actual key here with PKey will except
# immediately; we're testing the try/except crap within Client.
- key_filename=[test_path('test_rsa_password.key')],
+ key_filename=[_support('test_rsa_password.key')],
# Actual password for default 'slowdive' user
password='pygmalion',
)
self._test_connection(**kwargs)
+ @slow
def test_9_auth_timeout(self):
"""
verify that the SSHClient has a configurable auth timeout
@@ -450,27 +485,25 @@ class SSHClientTest (unittest.TestCase):
auth_timeout=0.5,
)
+ @requires_gss_auth
def test_10_auth_trickledown_gsskex(self):
"""
Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding
"""
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
kwargs = dict(
gss_kex=True,
- key_filename=[test_path('test_rsa.key')],
+ key_filename=[_support('test_rsa.key')],
)
self._test_connection(**kwargs)
+ @requires_gss_auth
def test_11_auth_trickledown_gssauth(self):
"""
Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
"""
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
kwargs = dict(
gss_auth=True,
- key_filename=[test_path('test_rsa.key')],
+ key_filename=[_support('test_rsa.key')],
)
self._test_connection(**kwargs)
@@ -489,14 +522,13 @@ class SSHClientTest (unittest.TestCase):
password='pygmalion', **self.connect_kwargs
)
+ @requires_gss_auth
def test_13_reject_policy_gsskex(self):
"""
verify that SSHClient's RejectPolicy works,
even if gssapi-keyex was enabled but not used.
"""
# Test for a bug present in paramiko versions released before 2017-08-01
- if not paramiko.GSS_AUTH_AVAILABLE:
- return # for python 2.6 lacks skipTest
threading.Thread(target=self._run).start()
self.tc = paramiko.SSHClient()
@@ -532,7 +564,7 @@ class SSHClientTest (unittest.TestCase):
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.RejectPolicy())
- host_key = ktype.from_private_key_file(test_path(kfile))
+ host_key = ktype.from_private_key_file(_support(kfile))
known_hosts = self.tc.get_host_keys()
known_hosts.add(hostname, host_key.get_name(), host_key)
@@ -556,10 +588,7 @@ class SSHClientTest (unittest.TestCase):
def test_host_key_negotiation_4(self):
self._client_host_key_good(paramiko.RSAKey, 'test_rsa.key')
- def test_update_environment(self):
- """
- Verify that environment variables can be set by the client.
- """
+ def _setup_for_env(self):
threading.Thread(target=self._run).start()
self.tc = paramiko.SSHClient()
@@ -571,6 +600,11 @@ class SSHClientTest (unittest.TestCase):
self.assertTrue(self.event.isSet())
self.assertTrue(self.ts.is_active())
+ def test_update_environment(self):
+ """
+ Verify that environment variables can be set by the client.
+ """
+ self._setup_for_env()
target_env = {b'A': b'B', b'C': b'd'}
self.tc.exec_command('yes', environment=target_env)
@@ -578,19 +612,20 @@ class SSHClientTest (unittest.TestCase):
self.assertEqual(target_env, getattr(schan, 'env', {}))
schan.close()
- # Cannot use assertRaises in context manager mode as it is not supported
- # in Python 2.6.
- try:
+ @unittest.skip("Clients normally fail silently, thus so do we, for now")
+ def test_env_update_failures(self):
+ self._setup_for_env()
+ with self.assertRaises(SSHException) as manager:
# Verify that a rejection by the server can be detected
self.tc.exec_command('yes', environment={b'INVALID_ENV': b''})
- except SSHException as e:
- self.assertTrue('INVALID_ENV' in str(e),
- 'Expected variable name in error message')
- self.assertTrue(isinstance(e.args[1], SSHException),
- 'Expected original SSHException in exception')
- else:
- self.assertFalse(False, 'SSHException was not thrown.')
-
+ self.assertTrue(
+ 'INVALID_ENV' in str(manager.exception),
+ 'Expected variable name in error message'
+ )
+ self.assertTrue(
+ isinstance(manager.exception.args[1], SSHException),
+ 'Expected original SSHException in exception'
+ )
def test_missing_key_policy_accepts_classes_or_instances(self):
"""
@@ -607,3 +642,45 @@ class SSHClientTest (unittest.TestCase):
# Hand in just the class (new behavior)
client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
assert isinstance(client._policy, paramiko.AutoAddPolicy)
+
+
+class PasswordPassphraseTests(ClientTest):
+ # TODO: most of these could reasonably be set up to use mocks/assertions
+ # (e.g. "gave passphrase -> expect PKey was given it as the passphrase")
+ # instead of suffering a real connection cycle.
+ # TODO: in that case, move the below to be part of an integration suite?
+
+ def test_password_kwarg_works_for_password_auth(self):
+ # Straightforward / duplicate of earlier basic password test.
+ self._test_connection(password='pygmalion')
+
+ # TODO: more granular exception pending #387; should be signaling "no auth
+ # methods available" because no key and no password
+ @raises(SSHException)
+ def test_passphrase_kwarg_not_used_for_password_auth(self):
+ # Using the "right" password in the "wrong" field shouldn't work.
+ self._test_connection(passphrase='pygmalion')
+
+ def test_passphrase_kwarg_used_for_key_passphrase(self):
+ # Straightforward again, with new passphrase kwarg.
+ self._test_connection(
+ key_filename=_support('test_rsa_password.key'),
+ passphrase='television',
+ )
+
+ def test_password_kwarg_used_for_passphrase_when_no_passphrase_kwarg_given(self): # noqa
+ # Backwards compatibility: passphrase in the password field.
+ self._test_connection(
+ key_filename=_support('test_rsa_password.key'),
+ password='television',
+ )
+
+ @raises(AuthenticationException) # TODO: more granular
+ def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given(self): # noqa
+ # Sanity: if we're given both fields, the password field is NOT used as
+ # a passphrase.
+ self._test_connection(
+ key_filename=_support('test_rsa_password.key'),
+ password='television',
+ passphrase='wat? lol no',
+ )
diff --git a/tests/test_file.py b/tests/test_file.py
index b33ecd51..3d2c94e6 100755..100644
--- a/tests/test_file.py
+++ b/tests/test_file.py
@@ -27,7 +27,7 @@ from paramiko.common import linefeed_byte, crlf, cr_byte
from paramiko.file import BufferedFile
from paramiko.py3compat import BytesIO
-from tests import skipUnlessBuiltin
+from .util import needs_builtin
class LoopbackFile (BufferedFile):
@@ -198,13 +198,13 @@ class BufferedFileTest (unittest.TestCase):
f.write(text)
self.assertEqual(f.read(), text.encode("utf-8"))
- @skipUnlessBuiltin('memoryview')
+ @needs_builtin('memoryview')
def test_write_bytearray(self):
with LoopbackFile('rb+') as f:
f.write(bytearray(12))
self.assertEqual(f.read(), 12 * b"\0")
- @skipUnlessBuiltin('buffer')
+ @needs_builtin('buffer')
def test_write_buffer(self):
data = 3 * b"pretend giant block of data\n"
offsets = range(0, len(data), 8)
@@ -213,7 +213,7 @@ class BufferedFileTest (unittest.TestCase):
f.write(buffer(data, offset, 8))
self.assertEqual(f.read(), data)
- @skipUnlessBuiltin('memoryview')
+ @needs_builtin('memoryview')
def test_write_memoryview(self):
data = 3 * b"pretend giant block of data\n"
offsets = range(0, len(data), 8)
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index bc220108..d4b632be 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -25,14 +25,17 @@ Test the used APIs for GSS-API / SSPI authentication
import unittest
import socket
+from .util import needs_gssapi
+
+@needs_gssapi
class GSSAPITest(unittest.TestCase):
- @staticmethod
- def init(hostname=None, srv_mode=False):
- global krb5_mech, targ_name, server_mode
- krb5_mech = "1.2.840.113554.1.2.2"
- targ_name = hostname
- server_mode = srv_mode
+ def setup():
+ # TODO: these vars should all come from os.environ or whatever the
+ # approved pytest method is for runtime-configuring test data.
+ self.krb5_mech = "1.2.840.113554.1.2.2"
+ self.targ_name = "hostname"
+ self.server_mode = False
def test_1_pyasn1(self):
"""
@@ -40,9 +43,9 @@ class GSSAPITest(unittest.TestCase):
"""
from pyasn1.type.univ import ObjectIdentifier
from pyasn1.codec.der import encoder, decoder
- oid = encoder.encode(ObjectIdentifier(krb5_mech))
+ oid = encoder.encode(ObjectIdentifier(self.krb5_mech))
mech, __ = decoder.decode(oid)
- self.assertEquals(krb5_mech, mech.__str__())
+ self.assertEquals(self.krb5_mech, mech.__str__())
def test_2_gssapi_sspi(self):
"""
@@ -61,7 +64,7 @@ class GSSAPITest(unittest.TestCase):
mic_msg = b"G'day Mate!"
if _API == "MIT":
- if server_mode:
+ if self.server_mode:
gss_flags = (gssapi.C_PROT_READY_FLAG,
gssapi.C_INTEG_FLAG,
gssapi.C_MUTUAL_FLAG,
@@ -73,13 +76,13 @@ class GSSAPITest(unittest.TestCase):
# Initialize a GSS-API context.
ctx = gssapi.Context()
ctx.flags = gss_flags
- krb5_oid = gssapi.OID.mech_from_string(krb5_mech)
- target_name = gssapi.Name("host@" + targ_name,
+ krb5_oid = gssapi.OID.mech_from_string(self.krb5_mech)
+ target_name = gssapi.Name("host@" + self.targ_name,
gssapi.C_NT_HOSTBASED_SERVICE)
gss_ctxt = gssapi.InitContext(peer_name=target_name,
mech_type=krb5_oid,
req_flags=ctx.flags)
- if server_mode:
+ if self.server_mode:
c_token = gss_ctxt.step(c_token)
gss_ctxt_status = gss_ctxt.established
self.assertEquals(False, gss_ctxt_status)
@@ -99,7 +102,7 @@ class GSSAPITest(unittest.TestCase):
# Build MIC
mic_token = gss_ctxt.get_mic(mic_msg)
- if server_mode:
+ if self.server_mode:
# Check MIC
status = gss_srv_ctxt.verify_mic(mic_msg, mic_token)
self.assertEquals(0, status)
@@ -110,11 +113,11 @@ class GSSAPITest(unittest.TestCase):
sspicon.ISC_REQ_DELEGATE
)
# Initialize a GSS-API context.
- target_name = "host/" + socket.getfqdn(targ_name)
+ target_name = "host/" + socket.getfqdn(self.targ_name)
gss_ctxt = sspi.ClientAuth("Kerberos",
scflags=gss_flags,
targetspn=target_name)
- if server_mode:
+ if self.server_mode:
error, token = gss_ctxt.authorize(c_token)
c_token = token[0].Buffer
self.assertEquals(0, error)
diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py
index 2c7ceeb9..cd75f8ab 100644
--- a/tests/test_hostkeys.py
+++ b/tests/test_hostkeys.py
@@ -23,6 +23,7 @@ Some unit tests for HostKeys.
from binascii import hexlify
import os
import unittest
+
import paramiko
from paramiko.py3compat import decodebytes
diff --git a/tests/test_kex.py b/tests/test_kex.py
index b7f588f7..b5808e7e 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -24,14 +24,15 @@ from binascii import hexlify, unhexlify
import os
import unittest
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.asymmetric import ec
+
import paramiko.util
from paramiko.kex_group1 import KexGroup1
from paramiko.kex_gex import KexGex, KexGexSHA256
from paramiko import Message
from paramiko.common import byte_chr
from paramiko.kex_ecdh_nist import KexNistp256
-from cryptography.hazmat.backends import default_backend
-from cryptography.hazmat.primitives.asymmetric import ec
def dummy_urandom(n):
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index af342a7c..025d1faa 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -31,6 +31,8 @@ import unittest
import paramiko
+from .util import needs_gssapi
+
class NullServer (paramiko.ServerInterface):
@@ -57,6 +59,7 @@ class NullServer (paramiko.ServerInterface):
return True
+@needs_gssapi
class GSSKexTest(unittest.TestCase):
@staticmethod
def init(username, hostname):
diff --git a/tests/test_message.py b/tests/test_message.py
index f18cae90..645b0509 100644
--- a/tests/test_message.py
+++ b/tests/test_message.py
@@ -21,6 +21,7 @@ Some unit tests for ssh protocol message blocks.
"""
import unittest
+
from paramiko.message import Message
from paramiko.common import byte_chr, zero_byte
diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py
index 02173292..414b7e38 100644
--- a/tests/test_packetizer.py
+++ b/tests/test_packetizer.py
@@ -27,11 +27,12 @@ from hashlib import sha1
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes
-from tests.loop import LoopSocket
-
from paramiko import Message, Packetizer, util
from paramiko.common import byte_chr, zero_byte
+from .loop import LoopSocket
+
+
x55 = byte_chr(0x55)
x1f = byte_chr(0x1f)
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
index 42d8e6bb..1827d2a9 100644
--- a/tests/test_pkey.py
+++ b/tests/test_pkey.py
@@ -30,7 +30,8 @@ import base64
from paramiko import RSAKey, DSSKey, ECDSAKey, Ed25519Key, Message, util
from paramiko.py3compat import StringIO, byte_chr, b, bytes, PY2
-from tests.util import test_path
+from .util import _support
+
# from openssh's ssh-keygen
PUB_RSA = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c='
@@ -138,7 +139,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(exp, key)
def test_2_load_rsa(self):
- key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ key = RSAKey.from_private_key_file(_support('test_rsa.key'))
self.assertEqual('ssh-rsa', key.get_name())
exp_rsa = b(FINGER_RSA.split()[1].replace(':', ''))
my_rsa = hexlify(key.get_fingerprint())
@@ -154,7 +155,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key, key2)
def test_3_load_rsa_password(self):
- key = RSAKey.from_private_key_file(test_path('test_rsa_password.key'), 'television')
+ key = RSAKey.from_private_key_file(_support('test_rsa_password.key'), 'television')
self.assertEqual('ssh-rsa', key.get_name())
exp_rsa = b(FINGER_RSA.split()[1].replace(':', ''))
my_rsa = hexlify(key.get_fingerprint())
@@ -163,7 +164,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(1024, key.get_bits())
def test_4_load_dss(self):
- key = DSSKey.from_private_key_file(test_path('test_dss.key'))
+ key = DSSKey.from_private_key_file(_support('test_dss.key'))
self.assertEqual('ssh-dss', key.get_name())
exp_dss = b(FINGER_DSS.split()[1].replace(':', ''))
my_dss = hexlify(key.get_fingerprint())
@@ -179,7 +180,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key, key2)
def test_5_load_dss_password(self):
- key = DSSKey.from_private_key_file(test_path('test_dss_password.key'), 'television')
+ key = DSSKey.from_private_key_file(_support('test_dss_password.key'), 'television')
self.assertEqual('ssh-dss', key.get_name())
exp_dss = b(FINGER_DSS.split()[1].replace(':', ''))
my_dss = hexlify(key.get_fingerprint())
@@ -189,7 +190,7 @@ class KeyTest(unittest.TestCase):
def test_6_compare_rsa(self):
# verify that the private & public keys compare equal
- key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ key = RSAKey.from_private_key_file(_support('test_rsa.key'))
self.assertEqual(key, key)
pub = RSAKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -198,7 +199,7 @@ class KeyTest(unittest.TestCase):
def test_7_compare_dss(self):
# verify that the private & public keys compare equal
- key = DSSKey.from_private_key_file(test_path('test_dss.key'))
+ key = DSSKey.from_private_key_file(_support('test_dss.key'))
self.assertEqual(key, key)
pub = DSSKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -207,7 +208,7 @@ class KeyTest(unittest.TestCase):
def test_8_sign_rsa(self):
# verify that the rsa private key can sign and verify
- key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ key = RSAKey.from_private_key_file(_support('test_rsa.key'))
msg = key.sign_ssh_data(b'ice weasels')
self.assertTrue(type(msg) is Message)
msg.rewind()
@@ -220,7 +221,7 @@ class KeyTest(unittest.TestCase):
def test_9_sign_dss(self):
# verify that the dss private key can sign and verify
- key = DSSKey.from_private_key_file(test_path('test_dss.key'))
+ key = DSSKey.from_private_key_file(_support('test_dss.key'))
msg = key.sign_ssh_data(b'ice weasels')
self.assertTrue(type(msg) is Message)
msg.rewind()
@@ -275,7 +276,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key.get_name(), 'ecdsa-sha2-nistp521')
def test_10_load_ecdsa_256(self):
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_256.key'))
self.assertEqual('ecdsa-sha2-nistp256', key.get_name())
exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
@@ -291,7 +292,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key, key2)
def test_11_load_ecdsa_password_256(self):
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_256.key'), b'television')
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_password_256.key'), b'television')
self.assertEqual('ecdsa-sha2-nistp256', key.get_name())
exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
@@ -301,7 +302,7 @@ class KeyTest(unittest.TestCase):
def test_12_compare_ecdsa_256(self):
# verify that the private & public keys compare equal
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_256.key'))
self.assertEqual(key, key)
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -310,7 +311,7 @@ class KeyTest(unittest.TestCase):
def test_13_sign_ecdsa_256(self):
# verify that the rsa private key can sign and verify
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_256.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_256.key'))
msg = key.sign_ssh_data(b'ice weasels')
self.assertTrue(type(msg) is Message)
msg.rewind()
@@ -325,7 +326,7 @@ class KeyTest(unittest.TestCase):
self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg))
def test_14_load_ecdsa_384(self):
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_384.key'))
self.assertEqual('ecdsa-sha2-nistp384', key.get_name())
exp_ecdsa = b(FINGER_ECDSA_384.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
@@ -341,7 +342,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key, key2)
def test_15_load_ecdsa_password_384(self):
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_384.key'), b'television')
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_password_384.key'), b'television')
self.assertEqual('ecdsa-sha2-nistp384', key.get_name())
exp_ecdsa = b(FINGER_ECDSA_384.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
@@ -351,7 +352,7 @@ class KeyTest(unittest.TestCase):
def test_16_compare_ecdsa_384(self):
# verify that the private & public keys compare equal
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_384.key'))
self.assertEqual(key, key)
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -360,7 +361,7 @@ class KeyTest(unittest.TestCase):
def test_17_sign_ecdsa_384(self):
# verify that the rsa private key can sign and verify
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_384.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_384.key'))
msg = key.sign_ssh_data(b'ice weasels')
self.assertTrue(type(msg) is Message)
msg.rewind()
@@ -375,7 +376,7 @@ class KeyTest(unittest.TestCase):
self.assertTrue(pub.verify_ssh_sig(b'ice weasels', msg))
def test_18_load_ecdsa_521(self):
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_521.key'))
self.assertEqual('ecdsa-sha2-nistp521', key.get_name())
exp_ecdsa = b(FINGER_ECDSA_521.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
@@ -394,7 +395,7 @@ class KeyTest(unittest.TestCase):
self.assertEqual(key, key2)
def test_19_load_ecdsa_password_521(self):
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_password_521.key'), b'television')
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_password_521.key'), b'television')
self.assertEqual('ecdsa-sha2-nistp521', key.get_name())
exp_ecdsa = b(FINGER_ECDSA_521.split()[1].replace(':', ''))
my_ecdsa = hexlify(key.get_fingerprint())
@@ -404,7 +405,7 @@ class KeyTest(unittest.TestCase):
def test_20_compare_ecdsa_521(self):
# verify that the private & public keys compare equal
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_521.key'))
self.assertEqual(key, key)
pub = ECDSAKey(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -413,7 +414,7 @@ class KeyTest(unittest.TestCase):
def test_21_sign_ecdsa_521(self):
# verify that the rsa private key can sign and verify
- key = ECDSAKey.from_private_key_file(test_path('test_ecdsa_521.key'))
+ key = ECDSAKey.from_private_key_file(_support('test_ecdsa_521.key'))
msg = key.sign_ssh_data(b'ice weasels')
self.assertTrue(type(msg) is Message)
msg.rewind()
@@ -429,7 +430,7 @@ class KeyTest(unittest.TestCase):
def test_salt_size(self):
# Read an existing encrypted private key
- file_ = test_path('test_rsa_password.key')
+ file_ = _support('test_rsa_password.key')
password = 'television'
newfile = file_ + '.new'
newpassword = 'radio'
@@ -446,20 +447,20 @@ class KeyTest(unittest.TestCase):
os.remove(newfile)
def test_stringification(self):
- key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ key = RSAKey.from_private_key_file(_support('test_rsa.key'))
comparable = TEST_KEY_BYTESTR_2 if PY2 else TEST_KEY_BYTESTR_3
self.assertEqual(str(key), comparable)
def test_ed25519(self):
- key1 = Ed25519Key.from_private_key_file(test_path('test_ed25519.key'))
+ key1 = Ed25519Key.from_private_key_file(_support('test_ed25519.key'))
key2 = Ed25519Key.from_private_key_file(
- test_path('test_ed25519_password.key'), b'abc123'
+ _support('test_ed25519_password.key'), b'abc123'
)
self.assertNotEqual(key1.asbytes(), key2.asbytes())
def test_ed25519_compare(self):
# verify that the private & public keys compare equal
- key = Ed25519Key.from_private_key_file(test_path('test_ed25519.key'))
+ key = Ed25519Key.from_private_key_file(_support('test_ed25519.key'))
self.assertEqual(key, key)
pub = Ed25519Key(data=key.asbytes())
self.assertTrue(key.can_sign())
@@ -469,7 +470,7 @@ class KeyTest(unittest.TestCase):
def test_ed25519_nonbytes_password(self):
# https://github.com/paramiko/paramiko/issues/1039
key = Ed25519Key.from_private_key_file(
- test_path('test_ed25519_password.key'),
+ _support('test_ed25519_password.key'),
# NOTE: not a bytes. Amusingly, the test above for same key DOES
# explicitly cast to bytes...code smell!
'abc123',
@@ -477,14 +478,14 @@ class KeyTest(unittest.TestCase):
# No exception -> it's good. Meh.
def test_ed25519_load_from_file_obj(self):
- with open(test_path('test_ed25519.key')) as pkey_fileobj:
+ with open(_support('test_ed25519.key')) as pkey_fileobj:
key = Ed25519Key.from_private_key(pkey_fileobj)
self.assertEqual(key, key)
self.assertTrue(key.can_sign())
def test_keyfile_is_actually_encrypted(self):
# Read an existing encrypted private key
- file_ = test_path('test_rsa_password.key')
+ file_ = _support('test_rsa_password.key')
password = 'television'
newfile = file_ + '.new'
newpassword = 'radio'
@@ -502,10 +503,10 @@ class KeyTest(unittest.TestCase):
# test_client.py; this and nearby cert tests are more about the gritty
# details.
# PKey.load_certificate
- key_path = test_path(os.path.join('cert_support', 'test_rsa.key'))
+ key_path = _support(os.path.join('cert_support', 'test_rsa.key'))
key = RSAKey.from_private_key_file(key_path)
self.assertTrue(key.public_blob is None)
- cert_path = test_path(
+ cert_path = _support(
os.path.join('cert_support', 'test_rsa.key-cert.pub')
)
key.load_certificate(cert_path)
@@ -524,10 +525,10 @@ class KeyTest(unittest.TestCase):
self.assertEqual(msg.get_int64(), 1234)
# Prevented from loading certificate that doesn't match
- key_path = test_path(os.path.join('cert_support', 'test_ed25519.key'))
+ key_path = _support(os.path.join('cert_support', 'test_ed25519.key'))
key1 = Ed25519Key.from_private_key_file(key_path)
self.assertRaises(
ValueError,
key1.load_certificate,
- test_path('test_rsa.key-cert.pub'),
+ _support('test_rsa.key-cert.pub'),
)
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index b3c7bf98..09a50453 100755..100644
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -32,16 +32,19 @@ import warnings
from binascii import hexlify
from tempfile import mkstemp
+import pytest
+
import paramiko
+import paramiko.util
from paramiko.py3compat import PY2, b, u, StringIO
from paramiko.common import o777, o600, o666, o644
-from tests import skipUnlessBuiltin
-from tests.stub_sftp import StubServer, StubSFTPServer
-from tests.loop import LoopSocket
-from tests.util import test_path
-import paramiko.util
from paramiko.sftp_attr import SFTPAttributes
+from .util import needs_builtin
+from .stub_sftp import StubServer, StubSFTPServer
+from .util import _support, slow
+
+
ARTICLE = '''
Insulin sensitivity and liver insulin receptor structure in ducks from two
genera
@@ -81,303 +84,201 @@ decreased compared with chicken.
# Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation byte"
NON_UTF8_DATA = b'\xC3\xC3'
-FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000')
-
-sftp = None
-tc = None
-g_big_file_test = True
-# we need to use eval(compile()) here because Py3.2 doesn't support the 'u' marker for unicode
-# this test is the only line in the entire program that has to be treated specially to support Py3.2
-unicode_folder = eval(compile(r"u'\u00fcnic\u00f8de'" if PY2 else r"'\u00fcnic\u00f8de'", 'test_sftp.py', 'eval'))
+unicode_folder = u'\u00fcnic\u00f8de' if PY2 else '\u00fcnic\u00f8de'
utf8_folder = b'/\xc3\xbcnic\xc3\xb8\x64\x65'
-def get_sftp():
- global sftp
- return sftp
-
-
-class SFTPTest (unittest.TestCase):
- @staticmethod
- def init(hostname, username, keyfile, passwd):
- global sftp, tc
-
- t = paramiko.Transport(hostname)
- tc = t
- try:
- key = paramiko.RSAKey.from_private_key_file(keyfile, passwd)
- except paramiko.PasswordRequiredException:
- sys.stderr.write('\n\nparamiko.RSAKey.from_private_key_file REQUIRES PASSWORD.\n')
- sys.stderr.write('You have two options:\n')
- sys.stderr.write('* Use the "-K" option to point to a different (non-password-protected)\n')
- sys.stderr.write(' private key file.\n')
- sys.stderr.write('* Use the "-P" option to provide the password needed to unlock this private\n')
- sys.stderr.write(' key.\n')
- sys.stderr.write('\n')
- sys.exit(1)
- try:
- t.connect(username=username, pkey=key)
- except paramiko.SSHException:
- t.close()
- sys.stderr.write('\n\nparamiko.Transport.connect FAILED.\n')
- sys.stderr.write('There are several possible reasons why it might fail so quickly:\n\n')
- sys.stderr.write('* The host to connect to (%s) is not a valid SSH server.\n' % hostname)
- sys.stderr.write(' (Use the "-H" option to change the host.)\n')
- sys.stderr.write('* The username to auth as (%s) is invalid.\n' % username)
- sys.stderr.write(' (Use the "-U" option to change the username.)\n')
- sys.stderr.write('* The private key given (%s) is not accepted by the server.\n' % keyfile)
- sys.stderr.write(' (Use the "-K" option to provide a different key file.)\n')
- sys.stderr.write('\n')
- sys.exit(1)
- sftp = paramiko.SFTP.from_transport(t)
-
- @staticmethod
- def init_loopback():
- global sftp, tc
-
- socks = LoopSocket()
- sockc = LoopSocket()
- sockc.link(socks)
- tc = paramiko.Transport(sockc)
- ts = paramiko.Transport(socks)
-
- host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
- ts.add_server_key(host_key)
- event = threading.Event()
- server = StubServer()
- ts.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer)
- ts.start_server(event, server)
- tc.connect(username='slowdive', password='pygmalion')
- event.wait(1.0)
-
- sftp = paramiko.SFTP.from_transport(tc)
-
- @staticmethod
- def set_big_file_test(onoff):
- global g_big_file_test
- g_big_file_test = onoff
-
- def setUp(self):
- global FOLDER
- for i in range(1000):
- FOLDER = FOLDER[:-3] + '%03d' % i
- try:
- sftp.mkdir(FOLDER)
- break
- except (IOError, OSError):
- pass
-
- def tearDown(self):
- #sftp.chdir()
- sftp.rmdir(FOLDER)
-
- def test_1_file(self):
+@slow
+class TestSFTP(object):
+ def test_1_file(self, sftp):
"""
verify that we can create a file.
"""
- f = sftp.open(FOLDER + '/test', 'w')
+ f = sftp.open(sftp.FOLDER + '/test', 'w')
try:
- self.assertEqual(f.stat().st_size, 0)
+ assert f.stat().st_size == 0
finally:
f.close()
- sftp.remove(FOLDER + '/test')
+ sftp.remove(sftp.FOLDER + '/test')
- def test_2_close(self):
+ def test_2_close(self, sftp):
"""
- verify that closing the sftp session doesn't do anything bad, and that
- a new one can be opened.
+ Verify that SFTP session close() causes a socket error on next action.
"""
- global sftp
sftp.close()
- try:
- sftp.open(FOLDER + '/test2', 'w')
- self.fail('expected exception')
- except:
- pass
- sftp = paramiko.SFTP.from_transport(tc)
+ with pytest.raises(socket.error, match='Socket is closed'):
+ sftp.open(sftp.FOLDER + '/test2', 'w')
- def test_2_sftp_can_be_used_as_context_manager(self):
+ def test_2_sftp_can_be_used_as_context_manager(self, sftp):
"""
verify that the sftp session is closed when exiting the context manager
"""
- global sftp
with sftp:
pass
- try:
- sftp.open(FOLDER + '/test2', 'w')
- self.fail('expected exception')
- except (EOFError, socket.error):
- pass
- finally:
- sftp = paramiko.SFTP.from_transport(tc)
+ with pytest.raises(socket.error, match='Socket is closed'):
+ sftp.open(sftp.FOLDER + '/test2', 'w')
- def test_3_write(self):
+ def test_3_write(self, sftp):
"""
verify that a file can be created and written, and the size is correct.
"""
try:
- with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f:
f.write(ARTICLE)
- self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
+ assert sftp.stat(sftp.FOLDER + '/duck.txt').st_size == 1483
finally:
- sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
- def test_3_sftp_file_can_be_used_as_context_manager(self):
+ def test_3_sftp_file_can_be_used_as_context_manager(self, sftp):
"""
verify that an opened file can be used as a context manager
"""
try:
- with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f:
f.write(ARTICLE)
- self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
+ assert sftp.stat(sftp.FOLDER + '/duck.txt').st_size == 1483
finally:
- sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
- def test_4_append(self):
+ def test_4_append(self, sftp):
"""
verify that a file can be opened for append, and tell() still works.
"""
try:
- with sftp.open(FOLDER + '/append.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/append.txt', 'w') as f:
f.write('first line\nsecond line\n')
- self.assertEqual(f.tell(), 23)
+ assert f.tell() == 23
- with sftp.open(FOLDER + '/append.txt', 'a+') as f:
+ with sftp.open(sftp.FOLDER + '/append.txt', 'a+') as f:
f.write('third line!!!\n')
- self.assertEqual(f.tell(), 37)
- self.assertEqual(f.stat().st_size, 37)
+ assert f.tell() == 37
+ assert f.stat().st_size == 37
f.seek(-26, f.SEEK_CUR)
- self.assertEqual(f.readline(), 'second line\n')
+ assert f.readline() == 'second line\n'
finally:
- sftp.remove(FOLDER + '/append.txt')
+ sftp.remove(sftp.FOLDER + '/append.txt')
- def test_5_rename(self):
+ def test_5_rename(self, sftp):
"""
verify that renaming a file works.
"""
try:
- with sftp.open(FOLDER + '/first.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/first.txt', 'w') as f:
f.write('content!\n')
- sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt')
- try:
- sftp.open(FOLDER + '/first.txt', 'r')
- self.assertTrue(False, 'no exception on reading nonexistent file')
- except IOError:
- pass
- with sftp.open(FOLDER + '/second.txt', 'r') as f:
+ sftp.rename(sftp.FOLDER + '/first.txt', sftp.FOLDER + '/second.txt')
+ with pytest.raises(IOError, match='No such file'):
+ sftp.open(sftp.FOLDER + '/first.txt', 'r')
+ with sftp.open(sftp.FOLDER + '/second.txt', 'r') as f:
f.seek(-6, f.SEEK_END)
- self.assertEqual(u(f.read(4)), 'tent')
+ assert u(f.read(4)) == 'tent'
finally:
+ # TODO: this is gross, make some sort of 'remove if possible' / 'rm
+ # -f' a-like, jeez
try:
- sftp.remove(FOLDER + '/first.txt')
+ sftp.remove(sftp.FOLDER + '/first.txt')
except:
pass
try:
- sftp.remove(FOLDER + '/second.txt')
+ sftp.remove(sftp.FOLDER + '/second.txt')
except:
pass
- def test_5a_posix_rename(self):
+ def test_5a_posix_rename(self, sftp):
"""Test posix-rename@openssh.com protocol extension."""
try:
# first check that the normal rename works as specified
- with sftp.open(FOLDER + '/a', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/a', 'w') as f:
f.write('one')
- sftp.rename(FOLDER + '/a', FOLDER + '/b')
- with sftp.open(FOLDER + '/a', 'w') as f:
+ sftp.rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b')
+ with sftp.open(sftp.FOLDER + '/a', 'w') as f:
f.write('two')
- try:
- sftp.rename(FOLDER + '/a', FOLDER + '/b')
- self.assertTrue(False, 'no exception when rename-ing onto existing file')
- except (OSError, IOError):
- pass
+ with pytest.raises(IOError): # actual message seems generic
+ sftp.rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b')
# now check with the posix_rename
- sftp.posix_rename(FOLDER + '/a', FOLDER + '/b')
- with sftp.open(FOLDER + '/b', 'r') as f:
+ sftp.posix_rename(sftp.FOLDER + '/a', sftp.FOLDER + '/b')
+ with sftp.open(sftp.FOLDER + '/b', 'r') as f:
data = u(f.read())
- self.assertEqual('two', data, "Contents of renamed file not the same as original file")
+ err = "Contents of renamed file not the same as original file"
+ assert 'two' == data, err
finally:
try:
- sftp.remove(FOLDER + '/a')
+ sftp.remove(sftp.FOLDER + '/a')
except:
pass
try:
- sftp.remove(FOLDER + '/b')
+ sftp.remove(sftp.FOLDER + '/b')
except:
pass
- def test_6_folder(self):
+ def test_6_folder(self, sftp):
"""
create a temporary folder, verify that we can create a file in it, then
remove the folder and verify that we can't create a file in it anymore.
"""
- sftp.mkdir(FOLDER + '/subfolder')
- sftp.open(FOLDER + '/subfolder/test', 'w').close()
- sftp.remove(FOLDER + '/subfolder/test')
- sftp.rmdir(FOLDER + '/subfolder')
- try:
- sftp.open(FOLDER + '/subfolder/test')
- # shouldn't be able to create that file
- self.assertTrue(False, 'no exception at dummy file creation')
- except IOError:
- pass
+ sftp.mkdir(sftp.FOLDER + '/subfolder')
+ sftp.open(sftp.FOLDER + '/subfolder/test', 'w').close()
+ sftp.remove(sftp.FOLDER + '/subfolder/test')
+ sftp.rmdir(sftp.FOLDER + '/subfolder')
+ # shouldn't be able to create that file if dir removed
+ with pytest.raises(IOError, match="No such file"):
+ sftp.open(sftp.FOLDER + '/subfolder/test')
- def test_7_listdir(self):
+ def test_7_listdir(self, sftp):
"""
verify that a folder can be created, a bunch of files can be placed in
it, and those files show up in sftp.listdir.
"""
try:
- sftp.open(FOLDER + '/duck.txt', 'w').close()
- sftp.open(FOLDER + '/fish.txt', 'w').close()
- sftp.open(FOLDER + '/tertiary.py', 'w').close()
-
- x = sftp.listdir(FOLDER)
- self.assertEqual(len(x), 3)
- self.assertTrue('duck.txt' in x)
- self.assertTrue('fish.txt' in x)
- self.assertTrue('tertiary.py' in x)
- self.assertTrue('random' not in x)
+ sftp.open(sftp.FOLDER + '/duck.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/fish.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/tertiary.py', 'w').close()
+
+ x = sftp.listdir(sftp.FOLDER)
+ assert len(x) == 3
+ assert 'duck.txt' in x
+ assert 'fish.txt' in x
+ assert 'tertiary.py' in x
+ assert 'random' not in x
finally:
- sftp.remove(FOLDER + '/duck.txt')
- sftp.remove(FOLDER + '/fish.txt')
- sftp.remove(FOLDER + '/tertiary.py')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/fish.txt')
+ sftp.remove(sftp.FOLDER + '/tertiary.py')
- def test_7_5_listdir_iter(self):
+ def test_7_5_listdir_iter(self, sftp):
"""
listdir_iter version of above test
"""
try:
- sftp.open(FOLDER + '/duck.txt', 'w').close()
- sftp.open(FOLDER + '/fish.txt', 'w').close()
- sftp.open(FOLDER + '/tertiary.py', 'w').close()
-
- x = [x.filename for x in sftp.listdir_iter(FOLDER)]
- self.assertEqual(len(x), 3)
- self.assertTrue('duck.txt' in x)
- self.assertTrue('fish.txt' in x)
- self.assertTrue('tertiary.py' in x)
- self.assertTrue('random' not in x)
+ sftp.open(sftp.FOLDER + '/duck.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/fish.txt', 'w').close()
+ sftp.open(sftp.FOLDER + '/tertiary.py', 'w').close()
+
+ x = [x.filename for x in sftp.listdir_iter(sftp.FOLDER)]
+ assert len(x) == 3
+ assert 'duck.txt' in x
+ assert 'fish.txt' in x
+ assert 'tertiary.py' in x
+ assert 'random' not in x
finally:
- sftp.remove(FOLDER + '/duck.txt')
- sftp.remove(FOLDER + '/fish.txt')
- sftp.remove(FOLDER + '/tertiary.py')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/fish.txt')
+ sftp.remove(sftp.FOLDER + '/tertiary.py')
- def test_8_setstat(self):
+ def test_8_setstat(self, sftp):
"""
verify that the setstat functions (chown, chmod, utime, truncate) work.
"""
try:
- with sftp.open(FOLDER + '/special', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/special', 'w') as f:
f.write('x' * 1024)
- stat = sftp.stat(FOLDER + '/special')
- sftp.chmod(FOLDER + '/special', (stat.st_mode & ~o777) | o600)
- stat = sftp.stat(FOLDER + '/special')
+ stat = sftp.stat(sftp.FOLDER + '/special')
+ sftp.chmod(sftp.FOLDER + '/special', (stat.st_mode & ~o777) | o600)
+ stat = sftp.stat(sftp.FOLDER + '/special')
expected_mode = o600
if sys.platform == 'win32':
# chmod not really functional on windows
@@ -385,35 +286,35 @@ class SFTPTest (unittest.TestCase):
if sys.platform == 'cygwin':
# even worse.
expected_mode = o644
- self.assertEqual(stat.st_mode & o777, expected_mode)
- self.assertEqual(stat.st_size, 1024)
+ assert stat.st_mode & o777 == expected_mode
+ assert stat.st_size == 1024
mtime = stat.st_mtime - 3600
atime = stat.st_atime - 1800
- sftp.utime(FOLDER + '/special', (atime, mtime))
- stat = sftp.stat(FOLDER + '/special')
- self.assertEqual(stat.st_mtime, mtime)
+ sftp.utime(sftp.FOLDER + '/special', (atime, mtime))
+ stat = sftp.stat(sftp.FOLDER + '/special')
+ assert stat.st_mtime == mtime
if sys.platform not in ('win32', 'cygwin'):
- self.assertEqual(stat.st_atime, atime)
+ assert stat.st_atime == atime
# can't really test chown, since we'd have to know a valid uid.
- sftp.truncate(FOLDER + '/special', 512)
- stat = sftp.stat(FOLDER + '/special')
- self.assertEqual(stat.st_size, 512)
+ sftp.truncate(sftp.FOLDER + '/special', 512)
+ stat = sftp.stat(sftp.FOLDER + '/special')
+ assert stat.st_size == 512
finally:
- sftp.remove(FOLDER + '/special')
+ sftp.remove(sftp.FOLDER + '/special')
- def test_9_fsetstat(self):
+ def test_9_fsetstat(self, sftp):
"""
verify that the fsetstat functions (chown, chmod, utime, truncate)
work on open files.
"""
try:
- with sftp.open(FOLDER + '/special', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/special', 'w') as f:
f.write('x' * 1024)
- with sftp.open(FOLDER + '/special', 'r+') as f:
+ with sftp.open(sftp.FOLDER + '/special', 'r+') as f:
stat = f.stat()
f.chmod((stat.st_mode & ~o777) | o600)
stat = f.stat()
@@ -425,26 +326,26 @@ class SFTPTest (unittest.TestCase):
if sys.platform == 'cygwin':
# even worse.
expected_mode = o644
- self.assertEqual(stat.st_mode & o777, expected_mode)
- self.assertEqual(stat.st_size, 1024)
+ assert stat.st_mode & o777 == expected_mode
+ assert stat.st_size == 1024
mtime = stat.st_mtime - 3600
atime = stat.st_atime - 1800
f.utime((atime, mtime))
stat = f.stat()
- self.assertEqual(stat.st_mtime, mtime)
+ assert stat.st_mtime == mtime
if sys.platform not in ('win32', 'cygwin'):
- self.assertEqual(stat.st_atime, atime)
+ assert stat.st_atime == atime
# can't really test chown, since we'd have to know a valid uid.
f.truncate(512)
stat = f.stat()
- self.assertEqual(stat.st_size, 512)
+ assert stat.st_size == 512
finally:
- sftp.remove(FOLDER + '/special')
+ sftp.remove(sftp.FOLDER + '/special')
- def test_A_readline_seek(self):
+ def test_A_readline_seek(self, sftp):
"""
create a text file and write a bunch of text into it. then count the lines
in the file, and seek around to retrieve particular lines. this should
@@ -452,10 +353,10 @@ class SFTPTest (unittest.TestCase):
buffering is reset on 'seek'.
"""
try:
- with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'w') as f:
f.write(ARTICLE)
- with sftp.open(FOLDER + '/duck.txt', 'r+') as f:
+ with sftp.open(sftp.FOLDER + '/duck.txt', 'r+') as f:
line_number = 0
loc = 0
pos_list = []
@@ -463,35 +364,35 @@ class SFTPTest (unittest.TestCase):
line_number += 1
pos_list.append(loc)
loc = f.tell()
- self.assertTrue(f.seekable())
+ assert f.seekable()
f.seek(pos_list[6], f.SEEK_SET)
- self.assertEqual(f.readline(), 'Nouzilly, France.\n')
+ assert f.readline(), 'Nouzilly == France.\n'
f.seek(pos_list[17], f.SEEK_SET)
- self.assertEqual(f.readline()[:4], 'duck')
+ assert f.readline()[:4] == 'duck'
f.seek(pos_list[10], f.SEEK_SET)
- self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n')
+ assert f.readline() == 'duck types were equally resistant to exogenous insulin compared with chicken.\n'
finally:
- sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(sftp.FOLDER + '/duck.txt')
- def test_B_write_seek(self):
+ def test_B_write_seek(self, sftp):
"""
create a text file, seek back and change part of it, and verify that the
changes worked.
"""
try:
- with sftp.open(FOLDER + '/testing.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/testing.txt', 'w') as f:
f.write('hello kitty.\n')
f.seek(-5, f.SEEK_CUR)
f.write('dd')
- self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13)
- with sftp.open(FOLDER + '/testing.txt', 'r') as f:
+ assert sftp.stat(sftp.FOLDER + '/testing.txt').st_size == 13
+ with sftp.open(sftp.FOLDER + '/testing.txt', 'r') as f:
data = f.read(20)
- self.assertEqual(data, b'hello kiddy.\n')
+ assert data == b'hello kiddy.\n'
finally:
- sftp.remove(FOLDER + '/testing.txt')
+ sftp.remove(sftp.FOLDER + '/testing.txt')
- def test_C_symlink(self):
+ def test_C_symlink(self, sftp):
"""
create a symlink and then check that lstat doesn't follow it.
"""
@@ -500,97 +401,85 @@ class SFTPTest (unittest.TestCase):
return
try:
- with sftp.open(FOLDER + '/original.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/original.txt', 'w') as f:
f.write('original\n')
- sftp.symlink('original.txt', FOLDER + '/link.txt')
- self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt')
+ sftp.symlink('original.txt', sftp.FOLDER + '/link.txt')
+ assert sftp.readlink(sftp.FOLDER + '/link.txt') == 'original.txt'
- with sftp.open(FOLDER + '/link.txt', 'r') as f:
- self.assertEqual(f.readlines(), ['original\n'])
+ with sftp.open(sftp.FOLDER + '/link.txt', 'r') as f:
+ assert f.readlines() == ['original\n']
cwd = sftp.normalize('.')
if cwd[-1] == '/':
cwd = cwd[:-1]
- abs_path = cwd + '/' + FOLDER + '/original.txt'
- sftp.symlink(abs_path, FOLDER + '/link2.txt')
- self.assertEqual(abs_path, sftp.readlink(FOLDER + '/link2.txt'))
+ abs_path = cwd + '/' + sftp.FOLDER + '/original.txt'
+ sftp.symlink(abs_path, sftp.FOLDER + '/link2.txt')
+ assert abs_path == sftp.readlink(sftp.FOLDER + '/link2.txt')
- self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12)
- self.assertEqual(sftp.stat(FOLDER + '/link.txt').st_size, 9)
+ assert sftp.lstat(sftp.FOLDER + '/link.txt').st_size == 12
+ assert sftp.stat(sftp.FOLDER + '/link.txt').st_size == 9
# the sftp server may be hiding extra path members from us, so the
# length may be longer than we expect:
- self.assertTrue(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path))
- self.assertEqual(sftp.stat(FOLDER + '/link2.txt').st_size, 9)
- self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9)
+ assert sftp.lstat(sftp.FOLDER + '/link2.txt').st_size >= len(abs_path)
+ assert sftp.stat(sftp.FOLDER + '/link2.txt').st_size == 9
+ assert sftp.stat(sftp.FOLDER + '/original.txt').st_size == 9
finally:
try:
- sftp.remove(FOLDER + '/link.txt')
+ sftp.remove(sftp.FOLDER + '/link.txt')
except:
pass
try:
- sftp.remove(FOLDER + '/link2.txt')
+ sftp.remove(sftp.FOLDER + '/link2.txt')
except:
pass
try:
- sftp.remove(FOLDER + '/original.txt')
+ sftp.remove(sftp.FOLDER + '/original.txt')
except:
pass
- def test_D_flush_seek(self):
+ def test_D_flush_seek(self, sftp):
"""
verify that buffered writes are automatically flushed on seek.
"""
try:
- with sftp.open(FOLDER + '/happy.txt', 'w', 1) as f:
+ with sftp.open(sftp.FOLDER + '/happy.txt', 'w', 1) as f:
f.write('full line.\n')
f.write('partial')
f.seek(9, f.SEEK_SET)
f.write('?\n')
- with sftp.open(FOLDER + '/happy.txt', 'r') as f:
- self.assertEqual(f.readline(), u('full line?\n'))
- self.assertEqual(f.read(7), b'partial')
+ with sftp.open(sftp.FOLDER + '/happy.txt', 'r') as f:
+ assert f.readline() == u('full line?\n')
+ assert f.read(7) == b'partial'
finally:
try:
- sftp.remove(FOLDER + '/happy.txt')
+ sftp.remove(sftp.FOLDER + '/happy.txt')
except:
pass
- def test_E_realpath(self):
+ def test_E_realpath(self, sftp):
"""
test that realpath is returning something non-empty and not an
error.
"""
pwd = sftp.normalize('.')
- self.assertTrue(len(pwd) > 0)
- f = sftp.normalize('./' + FOLDER)
- self.assertTrue(len(f) > 0)
- self.assertEqual(os.path.join(pwd, FOLDER), f)
+ assert len(pwd) > 0
+ f = sftp.normalize('./' + sftp.FOLDER)
+ assert len(f) > 0
+ assert os.path.join(pwd, sftp.FOLDER) == f
- def test_F_mkdir(self):
+ def test_F_mkdir(self, sftp):
"""
verify that mkdir/rmdir work.
"""
- try:
- sftp.mkdir(FOLDER + '/subfolder')
- except:
- self.assertTrue(False, 'exception creating subfolder')
- try:
- sftp.mkdir(FOLDER + '/subfolder')
- self.assertTrue(False, 'no exception overwriting subfolder')
- except IOError:
- pass
- try:
- sftp.rmdir(FOLDER + '/subfolder')
- except:
- self.assertTrue(False, 'exception removing subfolder')
- try:
- sftp.rmdir(FOLDER + '/subfolder')
- self.assertTrue(False, 'no exception removing nonexistent subfolder')
- except IOError:
- pass
+ sftp.mkdir(sftp.FOLDER + '/subfolder')
+ with pytest.raises(IOError): # generic msg only
+ sftp.mkdir(sftp.FOLDER + '/subfolder')
+ sftp.rmdir(sftp.FOLDER + '/subfolder')
+ with pytest.raises(IOError, match="No such file"):
+ sftp.rmdir(sftp.FOLDER + '/subfolder')
- def test_G_chdir(self):
+ def test_G_chdir(self, sftp):
"""
verify that chdir/getcwd work.
"""
@@ -598,35 +487,35 @@ class SFTPTest (unittest.TestCase):
if root[-1] != '/':
root += '/'
try:
- sftp.mkdir(FOLDER + '/alpha')
- sftp.chdir(FOLDER + '/alpha')
+ sftp.mkdir(sftp.FOLDER + '/alpha')
+ sftp.chdir(sftp.FOLDER + '/alpha')
sftp.mkdir('beta')
- self.assertEqual(root + FOLDER + '/alpha', sftp.getcwd())
- self.assertEqual(['beta'], sftp.listdir('.'))
+ assert root + sftp.FOLDER + '/alpha' == sftp.getcwd()
+ assert ['beta'] == sftp.listdir('.')
sftp.chdir('beta')
with sftp.open('fish', 'w') as f:
f.write('hello\n')
sftp.chdir('..')
- self.assertEqual(['fish'], sftp.listdir('beta'))
+ assert ['fish'] == sftp.listdir('beta')
sftp.chdir('..')
- self.assertEqual(['fish'], sftp.listdir('alpha/beta'))
+ assert ['fish'] == sftp.listdir('alpha/beta')
finally:
sftp.chdir(root)
try:
- sftp.unlink(FOLDER + '/alpha/beta/fish')
+ sftp.unlink(sftp.FOLDER + '/alpha/beta/fish')
except:
pass
try:
- sftp.rmdir(FOLDER + '/alpha/beta')
+ sftp.rmdir(sftp.FOLDER + '/alpha/beta')
except:
pass
try:
- sftp.rmdir(FOLDER + '/alpha')
+ sftp.rmdir(sftp.FOLDER + '/alpha')
except:
pass
- def test_H_get_put(self):
+ def test_H_get_put(self, sftp):
"""
verify that get/put work.
"""
@@ -641,103 +530,102 @@ class SFTPTest (unittest.TestCase):
def progress_callback(x, y):
saved_progress.append((x, y))
- sftp.put(localname, FOLDER + '/bunny.txt', progress_callback)
+ sftp.put(localname, sftp.FOLDER + '/bunny.txt', progress_callback)
- with sftp.open(FOLDER + '/bunny.txt', 'rb') as f:
- self.assertEqual(text, f.read(128))
- self.assertEqual([(41, 41)], saved_progress)
+ with sftp.open(sftp.FOLDER + '/bunny.txt', 'rb') as f:
+ assert text == f.read(128)
+ assert [(41, 41)] == saved_progress
os.unlink(localname)
fd, localname = mkstemp()
os.close(fd)
saved_progress = []
- sftp.get(FOLDER + '/bunny.txt', localname, progress_callback)
+ sftp.get(sftp.FOLDER + '/bunny.txt', localname, progress_callback)
with open(localname, 'rb') as f:
- self.assertEqual(text, f.read(128))
- self.assertEqual([(41, 41)], saved_progress)
+ assert text == f.read(128)
+ assert [(41, 41)] == saved_progress
os.unlink(localname)
- sftp.unlink(FOLDER + '/bunny.txt')
+ sftp.unlink(sftp.FOLDER + '/bunny.txt')
- def test_I_check(self):
+ def test_I_check(self, sftp):
"""
verify that file.check() works against our own server.
(it's an sftp extension that we support, and may be the only ones who
support it.)
"""
- with sftp.open(FOLDER + '/kitty.txt', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/kitty.txt', 'w') as f:
f.write('here kitty kitty' * 64)
try:
- with sftp.open(FOLDER + '/kitty.txt', 'r') as f:
+ with sftp.open(sftp.FOLDER + '/kitty.txt', 'r') as f:
sum = f.check('sha1')
- self.assertEqual('91059CFC6615941378D413CB5ADAF4C5EB293402', u(hexlify(sum)).upper())
+ assert '91059CFC6615941378D413CB5ADAF4C5EB293402' == u(hexlify(sum)).upper()
sum = f.check('md5', 0, 512)
- self.assertEqual('93DE4788FCA28D471516963A1FE3856A', u(hexlify(sum)).upper())
+ assert '93DE4788FCA28D471516963A1FE3856A' == u(hexlify(sum)).upper()
sum = f.check('md5', 0, 0, 510)
- self.assertEqual('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6',
- u(hexlify(sum)).upper())
+ assert u(hexlify(sum)).upper() == 'EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6' # noqa
finally:
- sftp.unlink(FOLDER + '/kitty.txt')
+ sftp.unlink(sftp.FOLDER + '/kitty.txt')
- def test_J_x_flag(self):
+ def test_J_x_flag(self, sftp):
"""
verify that the 'x' flag works when opening a file.
"""
- sftp.open(FOLDER + '/unusual.txt', 'wx').close()
+ sftp.open(sftp.FOLDER + '/unusual.txt', 'wx').close()
try:
try:
- sftp.open(FOLDER + '/unusual.txt', 'wx')
+ sftp.open(sftp.FOLDER + '/unusual.txt', 'wx')
self.fail('expected exception')
except IOError:
pass
finally:
- sftp.unlink(FOLDER + '/unusual.txt')
+ sftp.unlink(sftp.FOLDER + '/unusual.txt')
- def test_K_utf8(self):
+ def test_K_utf8(self, sftp):
"""
verify that unicode strings are encoded into utf8 correctly.
"""
- with sftp.open(FOLDER + '/something', 'w') as f:
+ with sftp.open(sftp.FOLDER + '/something', 'w') as f:
f.write('okay')
try:
- sftp.rename(FOLDER + '/something', FOLDER + '/' + unicode_folder)
- sftp.open(b(FOLDER) + utf8_folder, 'r')
+ sftp.rename(sftp.FOLDER + '/something', sftp.FOLDER + '/' + unicode_folder)
+ sftp.open(b(sftp.FOLDER) + utf8_folder, 'r')
except Exception as e:
self.fail('exception ' + str(e))
- sftp.unlink(b(FOLDER) + utf8_folder)
+ sftp.unlink(b(sftp.FOLDER) + utf8_folder)
- def test_L_utf8_chdir(self):
- sftp.mkdir(FOLDER + '/' + unicode_folder)
+ def test_L_utf8_chdir(self, sftp):
+ sftp.mkdir(sftp.FOLDER + '/' + unicode_folder)
try:
- sftp.chdir(FOLDER + '/' + unicode_folder)
+ sftp.chdir(sftp.FOLDER + '/' + unicode_folder)
with sftp.open('something', 'w') as f:
f.write('okay')
sftp.unlink('something')
finally:
sftp.chdir()
- sftp.rmdir(FOLDER + '/' + unicode_folder)
+ sftp.rmdir(sftp.FOLDER + '/' + unicode_folder)
- def test_M_bad_readv(self):
+ def test_M_bad_readv(self, sftp):
"""
verify that readv at the end of the file doesn't essplode.
"""
- sftp.open(FOLDER + '/zero', 'w').close()
+ sftp.open(sftp.FOLDER + '/zero', 'w').close()
try:
- with sftp.open(FOLDER + '/zero', 'r') as f:
+ with sftp.open(sftp.FOLDER + '/zero', 'r') as f:
f.readv([(0, 12)])
- with sftp.open(FOLDER + '/zero', 'r') as f:
+ with sftp.open(sftp.FOLDER + '/zero', 'r') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
f.read(100)
finally:
- sftp.unlink(FOLDER + '/zero')
+ sftp.unlink(sftp.FOLDER + '/zero')
- def test_N_put_without_confirm(self):
+ def test_N_put_without_confirm(self, sftp):
"""
verify that get/put work without confirmation.
"""
@@ -752,138 +640,132 @@ class SFTPTest (unittest.TestCase):
def progress_callback(x, y):
saved_progress.append((x, y))
- res = sftp.put(localname, FOLDER + '/bunny.txt', progress_callback, False)
+ res = sftp.put(localname, sftp.FOLDER + '/bunny.txt', progress_callback, False)
- self.assertEqual(SFTPAttributes().attr, res.attr)
+ assert SFTPAttributes().attr == res.attr
- with sftp.open(FOLDER + '/bunny.txt', 'r') as f:
- self.assertEqual(text, f.read(128))
- self.assertEqual((41, 41), saved_progress[-1])
+ with sftp.open(sftp.FOLDER + '/bunny.txt', 'r') as f:
+ assert text == f.read(128)
+ assert (41, 41) == saved_progress[-1]
os.unlink(localname)
- sftp.unlink(FOLDER + '/bunny.txt')
+ sftp.unlink(sftp.FOLDER + '/bunny.txt')
- def test_O_getcwd(self):
+ def test_O_getcwd(self, sftp):
"""
verify that chdir/getcwd work.
"""
- self.assertEqual(None, sftp.getcwd())
+ assert sftp.getcwd() == None
root = sftp.normalize('.')
if root[-1] != '/':
root += '/'
try:
- sftp.mkdir(FOLDER + '/alpha')
- sftp.chdir(FOLDER + '/alpha')
- self.assertEqual('/' + FOLDER + '/alpha', sftp.getcwd())
+ sftp.mkdir(sftp.FOLDER + '/alpha')
+ sftp.chdir(sftp.FOLDER + '/alpha')
+ assert sftp.getcwd() == '/' + sftp.FOLDER + '/alpha'
finally:
sftp.chdir(root)
try:
- sftp.rmdir(FOLDER + '/alpha')
+ sftp.rmdir(sftp.FOLDER + '/alpha')
except:
pass
- def XXX_test_M_seek_append(self):
+ def XXX_test_M_seek_append(self, sftp):
"""
verify that seek does't affect writes during append.
does not work except through paramiko. :( openssh fails.
"""
try:
- with sftp.open(FOLDER + '/append.txt', 'a') as f:
+ with sftp.open(sftp.FOLDER + '/append.txt', 'a') as f:
f.write('first line\nsecond line\n')
f.seek(11, f.SEEK_SET)
f.write('third line\n')
- with sftp.open(FOLDER + '/append.txt', 'r') as f:
- self.assertEqual(f.stat().st_size, 34)
- self.assertEqual(f.readline(), 'first line\n')
- self.assertEqual(f.readline(), 'second line\n')
- self.assertEqual(f.readline(), 'third line\n')
+ with sftp.open(sftp.FOLDER + '/append.txt', 'r') as f:
+ assert f.stat().st_size == 34
+ assert f.readline() == 'first line\n'
+ assert f.readline() == 'second line\n'
+ assert f.readline() == 'third line\n'
finally:
- sftp.remove(FOLDER + '/append.txt')
+ sftp.remove(sftp.FOLDER + '/append.txt')
- def test_putfo_empty_file(self):
+ def test_putfo_empty_file(self, sftp):
"""
Send an empty file and confirm it is sent.
"""
- target = FOLDER + '/empty file.txt'
+ target = sftp.FOLDER + '/empty file.txt'
stream = StringIO()
try:
attrs = sftp.putfo(stream, target)
# the returned attributes should not be null
- self.assertNotEqual(attrs, None)
+ assert attrs is not None
finally:
sftp.remove(target)
-
- def test_N_file_with_percent(self):
+ # TODO: this test doesn't actually fail if the regression (removing '%'
+ # expansion to '%%' within sftp.py's def _log()) is removed - stacktraces
+ # appear but they're clearly emitted from subthreads that have no error
+ # handling. No point running it until that is fixed somehow.
+ @pytest.mark.skip("Doesn't prove anything right now")
+ def test_N_file_with_percent(self, sftp):
"""
verify that we can create a file with a '%' in the filename.
( it needs to be properly escaped by _log() )
"""
- self.assertTrue( paramiko.util.get_logger("paramiko").handlers, "This unit test requires logging to be enabled" )
- f = sftp.open(FOLDER + '/test%file', 'w')
+ f = sftp.open(sftp.FOLDER + '/test%file', 'w')
try:
- self.assertEqual(f.stat().st_size, 0)
+ assert f.stat().st_size == 0
finally:
f.close()
- sftp.remove(FOLDER + '/test%file')
-
+ sftp.remove(sftp.FOLDER + '/test%file')
- def test_O_non_utf8_data(self):
+ def test_O_non_utf8_data(self, sftp):
"""Test write() and read() of non utf8 data"""
try:
- with sftp.open('%s/nonutf8data' % FOLDER, 'w') as f:
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'w') as f:
f.write(NON_UTF8_DATA)
- with sftp.open('%s/nonutf8data' % FOLDER, 'r') as f:
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'r') as f:
data = f.read()
- self.assertEqual(data, NON_UTF8_DATA)
- with sftp.open('%s/nonutf8data' % FOLDER, 'wb') as f:
+ assert data == NON_UTF8_DATA
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'wb') as f:
f.write(NON_UTF8_DATA)
- with sftp.open('%s/nonutf8data' % FOLDER, 'rb') as f:
+ with sftp.open('%s/nonutf8data' % sftp.FOLDER, 'rb') as f:
data = f.read()
- self.assertEqual(data, NON_UTF8_DATA)
+ assert data == NON_UTF8_DATA
finally:
- sftp.remove('%s/nonutf8data' % FOLDER)
+ sftp.remove('%s/nonutf8data' % sftp.FOLDER)
- def test_sftp_attributes_empty_str(self):
+ def test_sftp_attributes_empty_str(self, sftp):
sftp_attributes = SFTPAttributes()
- self.assertEqual(str(sftp_attributes), "?--------- 1 0 0 0 (unknown date) ?")
+ assert str(sftp_attributes) == "?--------- 1 0 0 0 (unknown date) ?"
- @skipUnlessBuiltin('buffer')
- def test_write_buffer(self):
+ @needs_builtin('buffer')
+ def test_write_buffer(self, sftp):
"""Test write() using a buffer instance."""
data = 3 * b'A potentially large block of data to chunk up.\n'
try:
- with sftp.open('%s/write_buffer' % FOLDER, 'wb') as f:
+ with sftp.open('%s/write_buffer' % sftp.FOLDER, 'wb') as f:
for offset in range(0, len(data), 8):
f.write(buffer(data, offset, 8))
- with sftp.open('%s/write_buffer' % FOLDER, 'rb') as f:
- self.assertEqual(f.read(), data)
+ with sftp.open('%s/write_buffer' % sftp.FOLDER, 'rb') as f:
+ assert f.read() == data
finally:
- sftp.remove('%s/write_buffer' % FOLDER)
+ sftp.remove('%s/write_buffer' % sftp.FOLDER)
- @skipUnlessBuiltin('memoryview')
- def test_write_memoryview(self):
+ @needs_builtin('memoryview')
+ def test_write_memoryview(self, sftp):
"""Test write() using a memoryview instance."""
data = 3 * b'A potentially large block of data to chunk up.\n'
try:
- with sftp.open('%s/write_memoryview' % FOLDER, 'wb') as f:
+ with sftp.open('%s/write_memoryview' % sftp.FOLDER, 'wb') as f:
view = memoryview(data)
for offset in range(0, len(data), 8):
f.write(view[offset:offset+8])
- with sftp.open('%s/write_memoryview' % FOLDER, 'rb') as f:
- self.assertEqual(f.read(), data)
+ with sftp.open('%s/write_memoryview' % sftp.FOLDER, 'rb') as f:
+ assert f.read() == data
finally:
- sftp.remove('%s/write_memoryview' % FOLDER)
-
-
-if __name__ == '__main__':
- SFTPTest.init_loopback()
- # logging is required by test_N_file_with_percent
- paramiko.util.log_to_file('test_sftp.log')
- from unittest import main
- main()
+ sftp.remove('%s/write_memoryview' % sftp.FOLDER)
diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py
index cfad5682..a659098d 100644
--- a/tests/test_sftp_big.py
+++ b/tests/test_sftp_big.py
@@ -31,94 +31,75 @@ import time
import unittest
from paramiko.common import o660
-from tests.test_sftp import get_sftp
-FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000')
+from .util import slow
-class BigSFTPTest (unittest.TestCase):
-
- def setUp(self):
- global FOLDER
- sftp = get_sftp()
- for i in range(1000):
- FOLDER = FOLDER[:-3] + '%03d' % i
- try:
- sftp.mkdir(FOLDER)
- break
- except (IOError, OSError):
- pass
-
- def tearDown(self):
- sftp = get_sftp()
- sftp.rmdir(FOLDER)
-
- def test_1_lots_of_files(self):
+@slow
+class TestBigSFTP(object):
+ def test_1_lots_of_files(self, sftp):
"""
create a bunch of files over the same session.
"""
- sftp = get_sftp()
numfiles = 100
try:
for i in range(numfiles):
- with sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1) as f:
+ with sftp.open('%s/file%d.txt' % (sftp.FOLDER, i), 'w', 1) as f:
f.write('this is file #%d.\n' % i)
- sftp.chmod('%s/file%d.txt' % (FOLDER, i), o660)
+ sftp.chmod('%s/file%d.txt' % (sftp.FOLDER, i), o660)
# now make sure every file is there, by creating a list of filenmes
# and reading them in random order.
numlist = list(range(numfiles))
while len(numlist) > 0:
r = numlist[random.randint(0, len(numlist) - 1)]
- with sftp.open('%s/file%d.txt' % (FOLDER, r)) as f:
- self.assertEqual(f.readline(), 'this is file #%d.\n' % r)
+ with sftp.open('%s/file%d.txt' % (sftp.FOLDER, r)) as f:
+ assert f.readline() == 'this is file #%d.\n' % r
numlist.remove(r)
finally:
for i in range(numfiles):
try:
- sftp.remove('%s/file%d.txt' % (FOLDER, i))
+ sftp.remove('%s/file%d.txt' % (sftp.FOLDER, i))
except:
pass
- def test_2_big_file(self):
+ def test_2_big_file(self, sftp):
"""
write a 1MB file with no buffering.
"""
- sftp = get_sftp()
kblob = (1024 * b'x')
start = time.time()
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w') as f:
for n in range(1024):
f.write(kblob)
if n % 128 == 0:
sys.stderr.write('.')
sys.stderr.write(' ')
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
start = time.time()
- with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f:
for n in range(1024):
data = f.read(1024)
- self.assertEqual(data, kblob)
+ assert data == kblob
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_3_big_file_pipelined(self):
+ def test_3_big_file_pipelined(self, sftp):
"""
write a 1MB file, with no linefeeds, using pipelining.
"""
- sftp = get_sftp()
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
start = time.time()
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -126,12 +107,12 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write('.')
sys.stderr.write(' ')
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
start = time.time()
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
@@ -145,19 +126,18 @@ class BigSFTPTest (unittest.TestCase):
chunk = size - n
data = f.read(chunk)
offset = n % 1024
- self.assertEqual(data, k2blob[offset:offset + chunk])
+ assert data == k2blob[offset:offset + chunk]
n += chunk
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_4_prefetch_seek(self):
- sftp = get_sftp()
+ def test_4_prefetch_seek(self, sftp):
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -165,13 +145,13 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write('.')
sys.stderr.write(' ')
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
start = time.time()
k2blob = kblob + kblob
chunk = 793
for i in range(10):
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
@@ -183,18 +163,17 @@ class BigSFTPTest (unittest.TestCase):
f.seek(offset)
data = f.read(chunk)
n_offset = offset % 1024
- self.assertEqual(data, k2blob[n_offset:n_offset + chunk])
+ assert data == k2blob[n_offset:n_offset + chunk]
offset += chunk
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_5_readv_seek(self):
- sftp = get_sftp()
+ def test_5_readv_seek(self, sftp):
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -202,13 +181,13 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write('.')
sys.stderr.write(' ')
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
start = time.time()
k2blob = kblob + kblob
chunk = 793
for i in range(10):
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
base_offset = (512 * 1024) + 17 * random.randint(1000, 2000)
# make a bunch of offsets and put them in random order
offsets = [base_offset + j * chunk for j in range(100)]
@@ -221,21 +200,20 @@ class BigSFTPTest (unittest.TestCase):
for i in range(len(readv_list)):
offset = readv_list[i][0]
n_offset = offset % 1024
- self.assertEqual(next(ret), k2blob[n_offset:n_offset + chunk])
+ assert next(ret) == k2blob[n_offset:n_offset + chunk]
end = time.time()
sys.stderr.write('%ds ' % round(end - start))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_6_lots_of_prefetching(self):
+ def test_6_lots_of_prefetching(self, sftp):
"""
prefetch a 1MB file a bunch of times, discarding the file object
without using it, to verify that paramiko doesn't get confused.
"""
- sftp = get_sftp()
kblob = (1024 * b'x')
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -243,32 +221,31 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write('.')
sys.stderr.write(' ')
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
for i in range(10):
- with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
- with sftp.open('%s/hongry.txt' % FOLDER, 'r') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
for n in range(1024):
data = f.read(1024)
- self.assertEqual(data, kblob)
+ assert data == kblob
if n % 128 == 0:
sys.stderr.write('.')
sys.stderr.write(' ')
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_7_prefetch_readv(self):
+ def test_7_prefetch_readv(self, sftp):
"""
verify that prefetch and readv don't conflict with each other.
"""
- sftp = get_sftp()
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -276,13 +253,13 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write('.')
sys.stderr.write(' ')
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
file_size = f.stat().st_size
f.prefetch(file_size)
data = f.read(1024)
- self.assertEqual(data, kblob)
+ assert data == kblob
chunk_size = 793
base_offset = 512 * 1024
@@ -290,23 +267,22 @@ class BigSFTPTest (unittest.TestCase):
chunks = [(base_offset + (chunk_size * i), chunk_size) for i in range(20)]
for data in f.readv(chunks):
offset = base_offset % 1024
- self.assertEqual(chunk_size, len(data))
- self.assertEqual(k2blob[offset:offset + chunk_size], data)
+ assert chunk_size == len(data)
+ assert k2blob[offset:offset + chunk_size] == data
base_offset += chunk_size
sys.stderr.write(' ')
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_8_large_readv(self):
+ def test_8_large_readv(self, sftp):
"""
verify that a very large readv is broken up correctly and still
returned as a single blob.
"""
- sftp = get_sftp()
kblob = bytes().join([struct.pack('>H', n) for n in range(512)])
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'wb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'wb') as f:
f.set_pipelined(True)
for n in range(1024):
f.write(kblob)
@@ -314,62 +290,53 @@ class BigSFTPTest (unittest.TestCase):
sys.stderr.write('.')
sys.stderr.write(' ')
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
- with sftp.open('%s/hongry.txt' % FOLDER, 'rb') as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'rb') as f:
data = list(f.readv([(23 * 1024, 128 * 1024)]))
- self.assertEqual(1, len(data))
+ assert len(data) == 1
data = data[0]
- self.assertEqual(128 * 1024, len(data))
+ assert len(data) == 128 * 1024
sys.stderr.write(' ')
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_9_big_file_big_buffer(self):
+ def test_9_big_file_big_buffer(self, sftp):
"""
write a 1MB file, with no linefeeds, and a big buffer.
"""
- sftp = get_sftp()
mblob = (1024 * 1024 * 'x')
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w', 128 * 1024) as f:
f.write(mblob)
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
- def test_A_big_file_renegotiate(self):
+ def test_A_big_file_renegotiate(self, sftp):
"""
write a 1MB file, forcing key renegotiation in the middle.
"""
- sftp = get_sftp()
t = sftp.sock.get_transport()
t.packetizer.REKEY_BYTES = 512 * 1024
k32blob = (32 * 1024 * 'x')
try:
- with sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'w', 128 * 1024) as f:
for i in range(32):
f.write(k32blob)
- self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
- self.assertNotEqual(t.H, t.session_id)
+ assert sftp.stat('%s/hongry.txt' % sftp.FOLDER).st_size == 1024 * 1024
+ assert t.H != t.session_id
# try to read it too.
- with sftp.open('%s/hongry.txt' % FOLDER, 'r', 128 * 1024) as f:
+ with sftp.open('%s/hongry.txt' % sftp.FOLDER, 'r', 128 * 1024) as f:
file_size = f.stat().st_size
f.prefetch(file_size)
total = 0
while total < 1024 * 1024:
total += len(f.read(32 * 1024))
finally:
- sftp.remove('%s/hongry.txt' % FOLDER)
+ sftp.remove('%s/hongry.txt' % sftp.FOLDER)
t.packetizer.REKEY_BYTES = pow(2, 30)
-
-
-if __name__ == '__main__':
- from tests.test_sftp import SFTPTest
- SFTPTest.init_loopback()
- from unittest import main
- main()
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index d8d05d2b..f0645e0e 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -29,17 +29,20 @@ import unittest
import paramiko
-from tests.util import test_path
-from tests.test_client import FINGERPRINTS
+from .util import _support, needs_gssapi
+from .test_client import FINGERPRINTS
-class NullServer (paramiko.ServerInterface):
+class NullServer (paramiko.ServerInterface):
def get_allowed_auths(self, username):
return 'gssapi-with-mic,publickey'
- def check_auth_gssapi_with_mic(self, username,
- gss_authenticated=paramiko.AUTH_FAILED,
- cc_file=None):
+ def check_auth_gssapi_with_mic(
+ self,
+ username,
+ gss_authenticated=paramiko.AUTH_FAILED,
+ cc_file=None,
+ ):
if gss_authenticated == paramiko.AUTH_SUCCESSFUL:
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
@@ -66,18 +69,15 @@ class NullServer (paramiko.ServerInterface):
return True
+@needs_gssapi
class GSSAuthTest(unittest.TestCase):
- @staticmethod
- def init(username, hostname):
- global krb5_principal, targ_name
- krb5_principal = username
- targ_name = hostname
-
def setUp(self):
- self.username = krb5_principal
- self.hostname = socket.getfqdn(targ_name)
+ # TODO: username and targ_name should come from os.environ or whatever
+ # the approved pytest method is for runtime-configuring test data.
+ self.username = "krb5_principal"
+ self.hostname = socket.getfqdn("targ_name")
self.sockl = socket.socket()
- self.sockl.bind((targ_name, 0))
+ self.sockl.bind(("targ_name", 0))
self.sockl.listen(1)
self.addr, self.port = self.sockl.getsockname()
self.event = threading.Event()
@@ -148,6 +148,6 @@ class GSSAuthTest(unittest.TestCase):
Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding
"""
self.hostname = "this_host_does_not_exists_and_causes_a_GSSAPI-exception"
- self._test_connection(key_filename=[test_path('test_rsa.key')],
+ self._test_connection(key_filename=[_support('test_rsa.key')],
allow_agent=False,
look_for_keys=False)
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 99cbc3e0..9474acfc 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -43,9 +43,9 @@ from paramiko.common import (
)
from paramiko.py3compat import bytes
from paramiko.message import Message
-from tests import skipUnlessBuiltin
-from tests.loop import LoopSocket
-from tests.util import test_path
+
+from .util import needs_builtin, _support, slow
+from .loop import LoopSocket
LONG_BANNER = """\
@@ -64,7 +64,7 @@ Maybe.
class NullServer (ServerInterface):
paranoid_did_password = False
paranoid_did_public_key = False
- paranoid_key = DSSKey.from_private_key_file(test_path('test_dss.key'))
+ paranoid_key = DSSKey.from_private_key_file(_support('test_dss.key'))
def get_allowed_auths(self, username):
if username == 'slowdive':
@@ -136,7 +136,7 @@ class TransportTest(unittest.TestCase):
def setup_test_server(
self, client_options=None, server_options=None, connect_kwargs=None,
):
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
@@ -200,7 +200,7 @@ class TransportTest(unittest.TestCase):
loopback sockets. this is hardly "simple" but it's simpler than the
later tests. :)
"""
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -225,7 +225,7 @@ class TransportTest(unittest.TestCase):
"""
verify that a long banner doesn't mess up the handshake.
"""
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -257,6 +257,7 @@ class TransportTest(unittest.TestCase):
self.tc.renegotiate_keys()
self.ts.send_ignore(1024)
+ @slow
def test_5_keepalive(self):
"""
verify that the keepalive will be sent.
@@ -820,6 +821,7 @@ class TransportTest(unittest.TestCase):
(2**32, MAX_WINDOW_SIZE)]:
self.assertEqual(self.tc._sanitize_window_size(val), correct)
+ @slow
def test_L_handshake_timeout(self):
"""
verify that we can get a hanshake timeout.
@@ -840,7 +842,7 @@ class TransportTest(unittest.TestCase):
# be fine. Even tho it's a bit squicky.
self.tc.packetizer = SlowPacketizer(self.tc.sock)
# Continue with regular test red tape.
- host_key = RSAKey.from_private_key_file(test_path('test_rsa.key'))
+ host_key = RSAKey.from_private_key_file(_support('test_rsa.key'))
public_host_key = RSAKey(data=host_key.asbytes())
self.ts.add_server_key(host_key)
event = threading.Event()
@@ -892,7 +894,7 @@ class TransportTest(unittest.TestCase):
expected = text.encode("utf-8")
self.assertEqual(sfile.read(len(expected)), expected)
- @skipUnlessBuiltin('buffer')
+ @needs_builtin('buffer')
def test_channel_send_buffer(self):
"""
verify sending buffer instances to a channel
@@ -915,7 +917,7 @@ class TransportTest(unittest.TestCase):
chan.sendall(buffer(data))
self.assertEqual(sfile.read(len(data)), data)
- @skipUnlessBuiltin('memoryview')
+ @needs_builtin('memoryview')
def test_channel_send_memoryview(self):
"""
verify sending memoryview instances to a channel
diff --git a/tests/test_util.py b/tests/test_util.py
index 7880e156..90473f43 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -30,6 +30,7 @@ import paramiko.util
from paramiko.util import lookup_ssh_host_config as host_config, safe_string
from paramiko.py3compat import StringIO, byte_ord, b
+
# Note some lines in this configuration have trailing spaces on purpose
test_config_file = """\
Host *
@@ -366,7 +367,7 @@ IdentityFile something_%l_using_fqdn
def test_get_hostnames(self):
f = StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
- self.assertEqual(config.get_hostnames(), set(['*', '*.example.com', 'spoo.example.com']))
+ self.assertEqual(config.get_hostnames(), {'*', '*.example.com', 'spoo.example.com'})
def test_quoted_host_names(self):
test_config_file = """\
@@ -469,12 +470,12 @@ Host param3 parara
self.assertRaises(Exception, conf._get_hosts, host)
def test_safe_string(self):
- vanilla = b("vanilla")
- has_bytes = b("has \7\3 bytes")
+ vanilla = b"vanilla"
+ has_bytes = b"has \7\3 bytes"
safe_vanilla = safe_string(vanilla)
safe_has_bytes = safe_string(has_bytes)
- expected_bytes = b("has %07%03 bytes")
- err = "{0!r} != {1!r}"
+ expected_bytes = b"has %07%03 bytes"
+ err = "{!r} != {!r}"
msg = err.format(safe_vanilla, vanilla)
assert safe_vanilla == vanilla, msg
msg = err.format(safe_has_bytes, expected_bytes)
diff --git a/tests/util.py b/tests/util.py
index c1b43da8..4ca02374 100644
--- a/tests/util.py
+++ b/tests/util.py
@@ -1,6 +1,27 @@
-import os
+from os.path import dirname, realpath, join
-root_path = os.path.dirname(os.path.realpath(__file__))
+import pytest
-def test_path(filename):
- return os.path.join(root_path, filename)
+from paramiko.py3compat import builtins
+
+
+def _support(filename):
+ return join(dirname(realpath(__file__)), filename)
+
+
+# TODO: consider using pytest.importorskip('gssapi') instead? We presumably
+# still need CLI configurability for the Kerberos parameters, though, so can't
+# JUST key off presence of GSSAPI optional dependency...
+# TODO: anyway, s/True/os.environ.get('RUN_GSSAPI', False)/ or something.
+needs_gssapi = pytest.mark.skipif(True, reason="No GSSAPI to test")
+
+
+def needs_builtin(name):
+ """
+ Skip decorated test if builtin name does not exist.
+ """
+ reason = "Test requires a builtin '{}'".format(name)
+ return pytest.mark.skipif(not hasattr(builtins, name), reason=reason)
+
+
+slow = pytest.mark.slow