From 162213fa1a4551bd955134c97ca5276a5f29e907 Mon Sep 17 00:00:00 2001 From: Jeff Forcier Date: Thu, 27 Apr 2023 18:00:16 -0400 Subject: Migrate rest of main keys and update suite to be more pytest-relaxed compat Main branch as of today: 350 passed, 21 skipped, 52 deselected, 3 warnings in 11.10s This branch as of this commit: 361 passed, 21 skipped, 52 deselected, 3 warnings in 10.51s Of those 11 "new" tests, 8 are ones I wrote (tests/pkey.py). Hard to figure out what the other 3 are given pytest-relaxed's output is very different from regular verbose pytest. oops. --- tests/_loop.py | 98 ++++++++++++++++++ tests/_stub_sftp.py | 232 +++++++++++++++++++++++++++++++++++++++++++ tests/_support/ecdsa-256.key | 5 + tests/_support/ed25519.key | 8 ++ tests/_support/rsa.key | 15 +++ tests/_util.py | 178 +++++++++++++++++++++++++++++++++ tests/conftest.py | 63 +++++++++++- tests/loop.py | 98 ------------------ tests/pkey.py | 13 +++ tests/stub_sftp.py | 232 ------------------------------------------- tests/test_auth.py | 6 +- tests/test_client.py | 39 ++++---- tests/test_config.py | 2 +- tests/test_ecdsa_256.key | 5 - tests/test_ed25519.key | 8 -- tests/test_file.py | 2 +- tests/test_gssapi.py | 2 +- tests/test_kex_gss.py | 6 +- tests/test_packetizer.py | 2 +- tests/test_pkey.py | 35 ++++--- tests/test_rsa.key | 15 --- tests/test_sftp.py | 4 +- tests/test_sftp_big.py | 2 +- tests/test_ssh_gss.py | 8 +- tests/test_transport.py | 34 +++---- tests/util.py | 178 --------------------------------- 26 files changed, 680 insertions(+), 610 deletions(-) create mode 100644 tests/_loop.py create mode 100644 tests/_stub_sftp.py create mode 100644 tests/_support/ecdsa-256.key create mode 100644 tests/_support/ed25519.key create mode 100644 tests/_support/rsa.key create mode 100644 tests/_util.py delete mode 100644 tests/loop.py create mode 100644 tests/pkey.py delete mode 100644 tests/stub_sftp.py delete mode 100644 tests/test_ecdsa_256.key delete mode 100644 tests/test_ed25519.key delete mode 100644 tests/test_rsa.key delete mode 100644 tests/util.py (limited to 'tests') diff --git a/tests/_loop.py b/tests/_loop.py new file mode 100644 index 00000000..a3740013 --- /dev/null +++ b/tests/_loop.py @@ -0,0 +1,98 @@ +# Copyright (C) 2003-2009 Robey Pointer +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import socket +import threading + +from paramiko.util import asbytes + + +class LoopSocket: + """ + A LoopSocket looks like a normal socket, but all data written to it is + delivered on the read-end of another LoopSocket, and vice versa. It's + like a software "socketpair". + """ + + def __init__(self): + self.__in_buffer = bytes() + self.__lock = threading.Lock() + self.__cv = threading.Condition(self.__lock) + self.__timeout = None + self.__mate = None + self._closed = False + + def close(self): + self.__unlink() + self._closed = True + try: + self.__lock.acquire() + self.__in_buffer = bytes() + finally: + self.__lock.release() + + def send(self, data): + data = asbytes(data) + if self.__mate is None: + # EOF + raise EOFError() + self.__mate.__feed(data) + return len(data) + + def recv(self, n): + self.__lock.acquire() + try: + if self.__mate is None: + # EOF + return bytes() + if len(self.__in_buffer) == 0: + self.__cv.wait(self.__timeout) + if len(self.__in_buffer) == 0: + raise socket.timeout + out = self.__in_buffer[:n] + self.__in_buffer = self.__in_buffer[n:] + return out + finally: + self.__lock.release() + + def settimeout(self, n): + self.__timeout = n + + def link(self, other): + self.__mate = other + self.__mate.__mate = self + + def __feed(self, data): + self.__lock.acquire() + try: + self.__in_buffer += data + self.__cv.notify_all() + finally: + self.__lock.release() + + def __unlink(self): + m = None + self.__lock.acquire() + try: + if self.__mate is not None: + m = self.__mate + self.__mate = None + finally: + self.__lock.release() + if m is not None: + m.__unlink() diff --git a/tests/_stub_sftp.py b/tests/_stub_sftp.py new file mode 100644 index 00000000..0c0372e9 --- /dev/null +++ b/tests/_stub_sftp.py @@ -0,0 +1,232 @@ +# Copyright (C) 2003-2009 Robey Pointer +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +""" +A stub SFTP server for loopback SFTP testing. +""" + +import os + +from paramiko import ( + AUTH_SUCCESSFUL, + OPEN_SUCCEEDED, + SFTPAttributes, + SFTPHandle, + SFTPServer, + SFTPServerInterface, + SFTP_FAILURE, + SFTP_OK, + ServerInterface, +) +from paramiko.common import o666 + + +class StubServer(ServerInterface): + def check_auth_password(self, username, password): + # all are allowed + return AUTH_SUCCESSFUL + + def check_channel_request(self, kind, chanid): + return OPEN_SUCCEEDED + + +class StubSFTPHandle(SFTPHandle): + def stat(self): + try: + return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno())) + except OSError as e: + return SFTPServer.convert_errno(e.errno) + + def chattr(self, attr): + # python doesn't have equivalents to fchown or fchmod, so we have to + # use the stored filename + try: + SFTPServer.set_file_attr(self.filename, attr) + return SFTP_OK + except OSError as e: + return SFTPServer.convert_errno(e.errno) + + +class StubSFTPServer(SFTPServerInterface): + # assume current folder is a fine root + # (the tests always create and eventually delete a subfolder, so there + # shouldn't be any mess) + ROOT = os.getcwd() + + def _realpath(self, path): + return self.ROOT + self.canonicalize(path) + + def list_folder(self, path): + path = self._realpath(path) + try: + out = [] + flist = os.listdir(path) + for fname in flist: + attr = SFTPAttributes.from_stat( + os.stat(os.path.join(path, fname)) + ) + attr.filename = fname + out.append(attr) + return out + except OSError as e: + return SFTPServer.convert_errno(e.errno) + + def stat(self, path): + path = self._realpath(path) + try: + return SFTPAttributes.from_stat(os.stat(path)) + except OSError as e: + return SFTPServer.convert_errno(e.errno) + + def lstat(self, path): + path = self._realpath(path) + try: + return SFTPAttributes.from_stat(os.lstat(path)) + except OSError as e: + return SFTPServer.convert_errno(e.errno) + + def open(self, path, flags, attr): + path = self._realpath(path) + try: + binary_flag = getattr(os, "O_BINARY", 0) + flags |= binary_flag + mode = getattr(attr, "st_mode", None) + if mode is not None: + fd = os.open(path, flags, mode) + else: + # os.open() defaults to 0777 which is + # an odd default mode for files + fd = os.open(path, flags, o666) + except OSError as e: + return SFTPServer.convert_errno(e.errno) + if (flags & os.O_CREAT) and (attr is not None): + attr._flags &= ~attr.FLAG_PERMISSIONS + SFTPServer.set_file_attr(path, attr) + if flags & os.O_WRONLY: + if flags & os.O_APPEND: + fstr = "ab" + else: + fstr = "wb" + elif flags & os.O_RDWR: + if flags & os.O_APPEND: + fstr = "a+b" + else: + fstr = "r+b" + else: + # O_RDONLY (== 0) + fstr = "rb" + try: + f = os.fdopen(fd, fstr) + except OSError as e: + return SFTPServer.convert_errno(e.errno) + fobj = StubSFTPHandle(flags) + fobj.filename = path + fobj.readfile = f + fobj.writefile = f + return fobj + + def remove(self, path): + path = self._realpath(path) + try: + os.remove(path) + except OSError as e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def rename(self, oldpath, newpath): + oldpath = self._realpath(oldpath) + newpath = self._realpath(newpath) + if os.path.exists(newpath): + return SFTP_FAILURE + try: + os.rename(oldpath, newpath) + except OSError as e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def posix_rename(self, oldpath, newpath): + oldpath = self._realpath(oldpath) + newpath = self._realpath(newpath) + try: + os.rename(oldpath, newpath) + except OSError as e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def mkdir(self, path, attr): + path = self._realpath(path) + try: + os.mkdir(path) + if attr is not None: + SFTPServer.set_file_attr(path, attr) + except OSError as e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def rmdir(self, path): + path = self._realpath(path) + try: + os.rmdir(path) + except OSError as e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def chattr(self, path, attr): + path = self._realpath(path) + try: + SFTPServer.set_file_attr(path, attr) + except OSError as e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def symlink(self, target_path, path): + path = self._realpath(path) + if (len(target_path) > 0) and (target_path[0] == "/"): + # absolute symlink + target_path = os.path.join(self.ROOT, target_path[1:]) + if target_path[:2] == "//": + # bug in os.path.join + target_path = target_path[1:] + else: + # compute relative to path + abspath = os.path.join(os.path.dirname(path), target_path) + if abspath[: len(self.ROOT)] != self.ROOT: + # this symlink isn't going to work anyway -- just break it + # immediately + target_path = "" + try: + os.symlink(target_path, path) + except OSError as e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def readlink(self, path): + path = self._realpath(path) + try: + symlink = os.readlink(path) + except OSError as e: + return SFTPServer.convert_errno(e.errno) + # if it's absolute, remove the root + if os.path.isabs(symlink): + if symlink[: len(self.ROOT)] == self.ROOT: + symlink = symlink[len(self.ROOT) :] + if (len(symlink) == 0) or (symlink[0] != "/"): + symlink = "/" + symlink + else: + symlink = "" + return symlink diff --git a/tests/_support/ecdsa-256.key b/tests/_support/ecdsa-256.key new file mode 100644 index 00000000..42d44734 --- /dev/null +++ b/tests/_support/ecdsa-256.key @@ -0,0 +1,5 @@ +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIKB6ty3yVyKEnfF/zprx0qwC76MsMlHY4HXCnqho2eKioAoGCCqGSM49 +AwEHoUQDQgAElI9mbdlaS+T9nHxY/59lFnn80EEecZDBHq4gLpccY8Mge5ZTMiMD +ADRvOqQ5R98Sxst765CAqXmRtz8vwoD96g== +-----END EC PRIVATE KEY----- diff --git a/tests/_support/ed25519.key b/tests/_support/ed25519.key new file mode 100644 index 00000000..eb9f94c2 --- /dev/null +++ b/tests/_support/ed25519.key @@ -0,0 +1,8 @@ +-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW +QyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXwAAAKhjwAdrY8AH +awAAAAtzc2gtZWQyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXw +AAAEA9tGQi2IrprbOSbDCF+RmAHd6meNSXBUQ2ekKXm4/8xnr1K9komH/1WBIvQbbtvnFV +hryd62EfcgRFuLRiokNfAAAAI2FsZXhfZ2F5bm9yQEFsZXhzLU1hY0Jvb2stQWlyLmxvY2 +FsAQI= +-----END OPENSSH PRIVATE KEY----- diff --git a/tests/_support/rsa.key b/tests/_support/rsa.key new file mode 100644 index 00000000..f50e9c53 --- /dev/null +++ b/tests/_support/rsa.key @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz +oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/ +d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB +gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0 +EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon +soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H +tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU +avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA +4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g +H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv +qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV +HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc +nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7 +-----END RSA PRIVATE KEY----- diff --git a/tests/_util.py b/tests/_util.py new file mode 100644 index 00000000..2f1c5ac2 --- /dev/null +++ b/tests/_util.py @@ -0,0 +1,178 @@ +from os.path import dirname, realpath, join +import builtins +import os +from pathlib import Path +import struct +import sys +import unittest + +import pytest + +from paramiko.ssh_gss import GSS_AUTH_AVAILABLE + +from cryptography.exceptions import UnsupportedAlgorithm, _Reasons +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import padding, rsa + +tests_dir = dirname(realpath(__file__)) + + +def _support(filename): + base = Path(tests_dir) + top = base / filename + deeper = base / "_support" / filename + return str(deeper if deeper.exists() else top) + + +def _config(name): + return join(tests_dir, "configs", name) + + +needs_gssapi = pytest.mark.skipif( + not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test" +) + + +def needs_builtin(name): + """ + Skip decorated test if builtin name does not exist. + """ + reason = "Test requires a builtin '{}'".format(name) + return pytest.mark.skipif(not hasattr(builtins, name), reason=reason) + + +slow = pytest.mark.slow + +# GSSAPI / Kerberos related tests need a working Kerberos environment. +# The class `KerberosTestCase` provides such an environment or skips all tests. +# There are 3 distinct cases: +# +# - A Kerberos environment has already been created and the environment +# contains the required information. +# +# - We can use the package 'k5test' to setup an working kerberos environment on +# the fly. +# +# - We skip all tests. +# +# ToDo: add a Windows specific implementation? + +if ( + os.environ.get("K5TEST_USER_PRINC", None) + and os.environ.get("K5TEST_HOSTNAME", None) + and os.environ.get("KRB5_KTNAME", None) +): # add other vars as needed + + # The environment provides the required information + class DummyK5Realm: + def __init__(self): + for k in os.environ: + if not k.startswith("K5TEST_"): + continue + setattr(self, k[7:].lower(), os.environ[k]) + self.env = {} + + class KerberosTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.realm = DummyK5Realm() + + @classmethod + def tearDownClass(cls): + del cls.realm + +else: + try: + # Try to setup a kerberos environment + from k5test import KerberosTestCase + except Exception: + # Use a dummy, that skips all tests + class KerberosTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + raise unittest.SkipTest( + "Missing extension package k5test. " + 'Please run "pip install k5test" ' + "to install it." + ) + + +def update_env(testcase, mapping, env=os.environ): + """Modify os.environ during a test case and restore during cleanup.""" + saved_env = env.copy() + + def replace(target, source): + target.update(source) + for k in list(target): + if k not in source: + target.pop(k, None) + + testcase.addCleanup(replace, env, saved_env) + env.update(mapping) + return testcase + + +def k5shell(args=None): + """Create a shell with an kerberos environment + + This can be used to debug paramiko or to test the old GSSAPI. + To test a different GSSAPI, simply activate a suitable venv + within the shell. + """ + import k5test + import atexit + import subprocess + + k5 = k5test.K5Realm() + atexit.register(k5.stop) + os.environ.update(k5.env) + for n in ("realm", "user_princ", "hostname"): + os.environ["K5TEST_" + n.upper()] = getattr(k5, n) + + if not args: + args = sys.argv[1:] + if not args: + args = [os.environ.get("SHELL", "bash")] + sys.exit(subprocess.call(args)) + + +def is_low_entropy(): + """ + Attempts to detect whether running interpreter is low-entropy. + + "low-entropy" is defined as being in 32-bit mode and with the hash seed set + to zero. + """ + is_32bit = struct.calcsize("P") == 32 / 8 + # I don't see a way to tell internally if the hash seed was set this + # way, but env should be plenty sufficient, this is only for testing. + return is_32bit and os.environ.get("PYTHONHASHSEED", None) == "0" + + +def sha1_signing_unsupported(): + """ + This is used to skip tests in environments where SHA-1 signing is + not supported by the backend. + """ + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + message = b"Some dummy text" + try: + private_key.sign( + message, + padding.PSS( + mgf=padding.MGF1(hashes.SHA1()), + salt_length=padding.PSS.MAX_LENGTH, + ), + hashes.SHA1(), + ) + return False + except UnsupportedAlgorithm as e: + return e._reason is _Reasons.UNSUPPORTED_HASH + + +requires_sha1_signing = unittest.skipIf( + sha1_signing_unsupported(), "SHA-1 signing not supported" +) diff --git a/tests/conftest.py b/tests/conftest.py index b28d2a17..beef87c2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,13 +2,24 @@ import logging import os import shutil import threading +from pathlib import Path + +from invoke.vendor.lexicon import Lexicon import pytest -from paramiko import RSAKey, SFTPServer, SFTP, Transport +from paramiko import ( + SFTPServer, + SFTP, + Transport, + DSSKey, + RSAKey, + Ed25519Key, + ECDSAKey, +) -from .loop import LoopSocket -from .stub_sftp import StubServer, StubSFTPServer -from .util import _support +from ._loop import LoopSocket +from ._stub_sftp import StubServer, StubSFTPServer +from ._util import _support from icecream import ic, install as install_ic @@ -71,7 +82,7 @@ def sftp_server(): tc = Transport(sockc) ts = Transport(socks) # Auth - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) ts.add_server_key(host_key) # Server setup event = threading.Event() @@ -103,3 +114,45 @@ def sftp(sftp_server): yield client # Clean up - as in make_sftp_folder, we assume local-only exec for now. shutil.rmtree(client.FOLDER, ignore_errors=True) + + +key_data = [ + ["ssh-rsa", RSAKey, "SHA256:OhNL391d/beeFnxxg18AwWVYTAHww+D4djEE7Co0Yng"], + ["ssh-dss", DSSKey, "SHA256:uHwwykG099f4M4kfzvFpKCTino0/P03DRbAidpAmPm0"], + [ + "ssh-ed25519", + Ed25519Key, + "SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow", + ], + [ + "ecdsa-sha2-nistp256", + ECDSAKey, + "SHA256:BrQG04oNKUETjKCeL4ifkARASg3yxS/pUHl3wWM26Yg", + ], +] +for datum in key_data: + short = datum[0].replace("ssh-", "").replace("sha2-nistp", "") + datum.insert(0, short) + + +@pytest.fixture(scope="session", params=key_data, ids=lambda x: x[0]) +def key(request): + """ + Yield an object for each known type of key, with attributes: + + - ``short_type``: short identifier, eg ``rsa`` or ``ecdsa-256`` + - ``full_type``: the "message style" key identifier, eg ``ssh-rsa``, or + ``ecdsa-sha2-nistp256``. + - ``path``: a pathlib Path object to the fixture key file + - ``pkey``: an instantiated PKey subclass object + - ``fingerprint``: the expected fingerprint of said key + """ + short_type, key_type, key_class, fingerprint = request.param + bag = Lexicon() + bag.short_type = short_type + bag.full_type = key_type + bag.path = Path(_support(f"{short_type}.key")) + with bag.path.open() as fd: + bag.pkey = key_class.from_private_key(fd) + bag.fingerprint = fingerprint + yield bag diff --git a/tests/loop.py b/tests/loop.py deleted file mode 100644 index a3740013..00000000 --- a/tests/loop.py +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright (C) 2003-2009 Robey Pointer -# -# This file is part of paramiko. -# -# Paramiko is free software; you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation; either version 2.1 of the License, or (at your option) -# any later version. -# -# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY -# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR -# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - -import socket -import threading - -from paramiko.util import asbytes - - -class LoopSocket: - """ - A LoopSocket looks like a normal socket, but all data written to it is - delivered on the read-end of another LoopSocket, and vice versa. It's - like a software "socketpair". - """ - - def __init__(self): - self.__in_buffer = bytes() - self.__lock = threading.Lock() - self.__cv = threading.Condition(self.__lock) - self.__timeout = None - self.__mate = None - self._closed = False - - def close(self): - self.__unlink() - self._closed = True - try: - self.__lock.acquire() - self.__in_buffer = bytes() - finally: - self.__lock.release() - - def send(self, data): - data = asbytes(data) - if self.__mate is None: - # EOF - raise EOFError() - self.__mate.__feed(data) - return len(data) - - def recv(self, n): - self.__lock.acquire() - try: - if self.__mate is None: - # EOF - return bytes() - if len(self.__in_buffer) == 0: - self.__cv.wait(self.__timeout) - if len(self.__in_buffer) == 0: - raise socket.timeout - out = self.__in_buffer[:n] - self.__in_buffer = self.__in_buffer[n:] - return out - finally: - self.__lock.release() - - def settimeout(self, n): - self.__timeout = n - - def link(self, other): - self.__mate = other - self.__mate.__mate = self - - def __feed(self, data): - self.__lock.acquire() - try: - self.__in_buffer += data - self.__cv.notify_all() - finally: - self.__lock.release() - - def __unlink(self): - m = None - self.__lock.acquire() - try: - if self.__mate is not None: - m = self.__mate - self.__mate = None - finally: - self.__lock.release() - if m is not None: - m.__unlink() diff --git a/tests/pkey.py b/tests/pkey.py new file mode 100644 index 00000000..b1cba825 --- /dev/null +++ b/tests/pkey.py @@ -0,0 +1,13 @@ +from paramiko import PKey + + +class PKey_: + class from_type_string: + def loads_from_type_and_bytes(self, key): + obj = PKey.from_type_string(key.full_type, key.pkey.asbytes()) + assert obj == key.pkey + + class from_path: + def loads_from_file_path(self, key): + obj = PKey.from_path(key.path) + assert obj == key.pkey diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py deleted file mode 100644 index 0c0372e9..00000000 --- a/tests/stub_sftp.py +++ /dev/null @@ -1,232 +0,0 @@ -# Copyright (C) 2003-2009 Robey Pointer -# -# This file is part of paramiko. -# -# Paramiko is free software; you can redistribute it and/or modify it under the -# terms of the GNU Lesser General Public License as published by the Free -# Software Foundation; either version 2.1 of the License, or (at your option) -# any later version. -# -# Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY -# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR -# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more -# details. -# -# You should have received a copy of the GNU Lesser General Public License -# along with Paramiko; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - -""" -A stub SFTP server for loopback SFTP testing. -""" - -import os - -from paramiko import ( - AUTH_SUCCESSFUL, - OPEN_SUCCEEDED, - SFTPAttributes, - SFTPHandle, - SFTPServer, - SFTPServerInterface, - SFTP_FAILURE, - SFTP_OK, - ServerInterface, -) -from paramiko.common import o666 - - -class StubServer(ServerInterface): - def check_auth_password(self, username, password): - # all are allowed - return AUTH_SUCCESSFUL - - def check_channel_request(self, kind, chanid): - return OPEN_SUCCEEDED - - -class StubSFTPHandle(SFTPHandle): - def stat(self): - try: - return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno())) - except OSError as e: - return SFTPServer.convert_errno(e.errno) - - def chattr(self, attr): - # python doesn't have equivalents to fchown or fchmod, so we have to - # use the stored filename - try: - SFTPServer.set_file_attr(self.filename, attr) - return SFTP_OK - except OSError as e: - return SFTPServer.convert_errno(e.errno) - - -class StubSFTPServer(SFTPServerInterface): - # assume current folder is a fine root - # (the tests always create and eventually delete a subfolder, so there - # shouldn't be any mess) - ROOT = os.getcwd() - - def _realpath(self, path): - return self.ROOT + self.canonicalize(path) - - def list_folder(self, path): - path = self._realpath(path) - try: - out = [] - flist = os.listdir(path) - for fname in flist: - attr = SFTPAttributes.from_stat( - os.stat(os.path.join(path, fname)) - ) - attr.filename = fname - out.append(attr) - return out - except OSError as e: - return SFTPServer.convert_errno(e.errno) - - def stat(self, path): - path = self._realpath(path) - try: - return SFTPAttributes.from_stat(os.stat(path)) - except OSError as e: - return SFTPServer.convert_errno(e.errno) - - def lstat(self, path): - path = self._realpath(path) - try: - return SFTPAttributes.from_stat(os.lstat(path)) - except OSError as e: - return SFTPServer.convert_errno(e.errno) - - def open(self, path, flags, attr): - path = self._realpath(path) - try: - binary_flag = getattr(os, "O_BINARY", 0) - flags |= binary_flag - mode = getattr(attr, "st_mode", None) - if mode is not None: - fd = os.open(path, flags, mode) - else: - # os.open() defaults to 0777 which is - # an odd default mode for files - fd = os.open(path, flags, o666) - except OSError as e: - return SFTPServer.convert_errno(e.errno) - if (flags & os.O_CREAT) and (attr is not None): - attr._flags &= ~attr.FLAG_PERMISSIONS - SFTPServer.set_file_attr(path, attr) - if flags & os.O_WRONLY: - if flags & os.O_APPEND: - fstr = "ab" - else: - fstr = "wb" - elif flags & os.O_RDWR: - if flags & os.O_APPEND: - fstr = "a+b" - else: - fstr = "r+b" - else: - # O_RDONLY (== 0) - fstr = "rb" - try: - f = os.fdopen(fd, fstr) - except OSError as e: - return SFTPServer.convert_errno(e.errno) - fobj = StubSFTPHandle(flags) - fobj.filename = path - fobj.readfile = f - fobj.writefile = f - return fobj - - def remove(self, path): - path = self._realpath(path) - try: - os.remove(path) - except OSError as e: - return SFTPServer.convert_errno(e.errno) - return SFTP_OK - - def rename(self, oldpath, newpath): - oldpath = self._realpath(oldpath) - newpath = self._realpath(newpath) - if os.path.exists(newpath): - return SFTP_FAILURE - try: - os.rename(oldpath, newpath) - except OSError as e: - return SFTPServer.convert_errno(e.errno) - return SFTP_OK - - def posix_rename(self, oldpath, newpath): - oldpath = self._realpath(oldpath) - newpath = self._realpath(newpath) - try: - os.rename(oldpath, newpath) - except OSError as e: - return SFTPServer.convert_errno(e.errno) - return SFTP_OK - - def mkdir(self, path, attr): - path = self._realpath(path) - try: - os.mkdir(path) - if attr is not None: - SFTPServer.set_file_attr(path, attr) - except OSError as e: - return SFTPServer.convert_errno(e.errno) - return SFTP_OK - - def rmdir(self, path): - path = self._realpath(path) - try: - os.rmdir(path) - except OSError as e: - return SFTPServer.convert_errno(e.errno) - return SFTP_OK - - def chattr(self, path, attr): - path = self._realpath(path) - try: - SFTPServer.set_file_attr(path, attr) - except OSError as e: - return SFTPServer.convert_errno(e.errno) - return SFTP_OK - - def symlink(self, target_path, path): - path = self._realpath(path) - if (len(target_path) > 0) and (target_path[0] == "/"): - # absolute symlink - target_path = os.path.join(self.ROOT, target_path[1:]) - if target_path[:2] == "//": - # bug in os.path.join - target_path = target_path[1:] - else: - # compute relative to path - abspath = os.path.join(os.path.dirname(path), target_path) - if abspath[: len(self.ROOT)] != self.ROOT: - # this symlink isn't going to work anyway -- just break it - # immediately - target_path = "" - try: - os.symlink(target_path, path) - except OSError as e: - return SFTPServer.convert_errno(e.errno) - return SFTP_OK - - def readlink(self, path): - path = self._realpath(path) - try: - symlink = os.readlink(path) - except OSError as e: - return SFTPServer.convert_errno(e.errno) - # if it's absolute, remove the root - if os.path.isabs(symlink): - if symlink[: len(self.ROOT)] == self.ROOT: - symlink = symlink[len(self.ROOT) :] - if (len(symlink) == 0) or (symlink[0] != "/"): - symlink = "/" + symlink - else: - symlink = "" - return symlink diff --git a/tests/test_auth.py b/tests/test_auth.py index 592e589f..02df8c12 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -37,8 +37,8 @@ from paramiko import ( from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL from paramiko.util import u -from .loop import LoopSocket -from .util import _support, slow +from ._loop import LoopSocket +from ._util import _support, slow _pwd = u("\u2022") @@ -129,7 +129,7 @@ class AuthTest(unittest.TestCase): self.sockc.close() def start_server(self): - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) self.public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) self.event = threading.Event() diff --git a/tests/test_client.py b/tests/test_client.py index 62c92b35..564cda00 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -41,7 +41,7 @@ from paramiko import SSHClient from paramiko.pkey import PublicBlob from paramiko.ssh_exception import SSHException, AuthenticationException -from .util import _support, requires_sha1_signing, slow +from ._util import _support, requires_sha1_signing, slow requires_gss_auth = unittest.skipUnless( @@ -171,10 +171,10 @@ class ClientTest(unittest.TestCase): self.ts = paramiko.Transport(self.socks) if server_name is not None: self.ts.local_version = server_name - keypath = _support("test_rsa.key") + keypath = _support("rsa.key") host_key = paramiko.RSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) - keypath = _support("test_ecdsa_256.key") + keypath = _support("ecdsa-256.key") host_key = paramiko.ECDSAKey.from_private_key_file(keypath) self.ts.add_server_key(host_key) server = NullServer(allowed_keys=allowed_keys, public_blob=public_blob) @@ -195,7 +195,7 @@ class ClientTest(unittest.TestCase): # Server setup threading.Thread(target=self._run, kwargs=run_kwargs).start() host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") + _support("rsa.key") ) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) @@ -263,18 +263,18 @@ class SSHClientTest(ClientTest): """ verify that SSHClient works with an RSA key. """ - self._test_connection(key_filename=_support("test_rsa.key")) + self._test_connection(key_filename=_support("rsa.key")) @requires_sha1_signing def test_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ - self._test_connection(key_filename=_support("test_ecdsa_256.key")) + self._test_connection(key_filename=_support("ecdsa-256.key")) @requires_sha1_signing def test_client_ed25519(self): - self._test_connection(key_filename=_support("test_ed25519.key")) + self._test_connection(key_filename=_support("ed25519.key")) @requires_sha1_signing def test_multiple_key_files(self): @@ -289,16 +289,17 @@ class SSHClientTest(ClientTest): } # Various combos of attempted & valid keys # TODO: try every possible combo using itertools functions + # TODO: use new key(s) fixture(s) for attempt, accept in ( (["rsa", "dss"], ["dss"]), # Original test #3 (["dss", "rsa"], ["dss"]), # Ordering matters sometimes, sadly - (["dss", "rsa", "ecdsa_256"], ["dss"]), # Try ECDSA but fail - (["rsa", "ecdsa_256"], ["ecdsa"]), # ECDSA success + (["dss", "rsa", "ecdsa-256"], ["dss"]), # Try ECDSA but fail + (["rsa", "ecdsa-256"], ["ecdsa"]), # ECDSA success ): try: self._test_connection( key_filename=[ - _support("test_{}.key".format(x)) for x in attempt + _support("{}.key".format(x)) for x in attempt ], allowed_keys=[types_[x] for x in accept], ) @@ -318,7 +319,7 @@ class SSHClientTest(ClientTest): self.assertRaises( SSHException, self._test_connection, - key_filename=[_support("test_rsa.key")], + key_filename=[_support("rsa.key")], allowed_keys=["ecdsa-sha2-nistp256"], ) @@ -338,7 +339,7 @@ class SSHClientTest(ClientTest): @requires_sha1_signing def test_certs_implicitly_loaded_alongside_key_filename_keys(self): - # NOTE: a regular test_connection() w/ test_rsa.key would incidentally + # NOTE: a regular test_connection() w/ rsa.key would incidentally # test this (because test_xxx.key-cert.pub exists) but incidental tests # stink, so NullServer and friends were updated to allow assertions # about the server-side key object's public blob. Thus, we can prove @@ -391,7 +392,7 @@ class SSHClientTest(ClientTest): """ threading.Thread(target=self._run).start() hostname = f"[{self.addr}]:{self.port}" - key_file = _support("test_ecdsa_256.key") + key_file = _support("ecdsa-256.key") public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) self.tc = SSHClient() @@ -415,7 +416,7 @@ class SSHClientTest(ClientTest): warnings.filterwarnings("ignore", "tempnam.*") host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") + _support("rsa.key") ) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) fd, localname = mkstemp() @@ -517,7 +518,7 @@ class SSHClientTest(ClientTest): # Start the thread with a 1 second wait. threading.Thread(target=self._run, kwargs={"delay": 1}).start() host_key = paramiko.RSAKey.from_private_key_file( - _support("test_rsa.key") + _support("rsa.key") ) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) @@ -593,7 +594,7 @@ class SSHClientTest(ClientTest): """ Failed gssapi-keyex doesn't prevent subsequent key from succeeding """ - kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")]) + kwargs = dict(gss_kex=True, key_filename=[_support("rsa.key")]) self._test_connection(**kwargs) @requires_gss_auth @@ -601,7 +602,7 @@ class SSHClientTest(ClientTest): """ Failed gssapi-with-mic doesn't prevent subsequent key from succeeding """ - kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")]) + kwargs = dict(gss_auth=True, key_filename=[_support("rsa.key")]) self._test_connection(**kwargs) def test_reject_policy(self): @@ -683,11 +684,11 @@ class SSHClientTest(ClientTest): self._client_host_key_bad(host_key) def test_host_key_negotiation_3(self): - self._client_host_key_good(paramiko.ECDSAKey, "test_ecdsa_256.key") + self._client_host_key_good(paramiko.ECDSAKey, "ecdsa-256.key") @requires_sha1_signing def test_host_key_negotiation_4(self): - self._client_host_key_good(paramiko.RSAKey, "test_rsa.key") + self._client_host_key_good(paramiko.RSAKey, "rsa.key") def _setup_for_env(self): threading.Thread(target=self._run).start() diff --git a/tests/test_config.py b/tests/test_config.py index a2c60a32..fcb120b6 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -19,7 +19,7 @@ from paramiko import ( ConfigParseError, ) -from .util import _config +from ._util import _config @fixture diff --git a/tests/test_ecdsa_256.key b/tests/test_ecdsa_256.key deleted file mode 100644 index 42d44734..00000000 --- a/tests/test_ecdsa_256.key +++ /dev/null @@ -1,5 +0,0 @@ ------BEGIN EC PRIVATE KEY----- -MHcCAQEEIKB6ty3yVyKEnfF/zprx0qwC76MsMlHY4HXCnqho2eKioAoGCCqGSM49 -AwEHoUQDQgAElI9mbdlaS+T9nHxY/59lFnn80EEecZDBHq4gLpccY8Mge5ZTMiMD -ADRvOqQ5R98Sxst765CAqXmRtz8vwoD96g== ------END EC PRIVATE KEY----- diff --git a/tests/test_ed25519.key b/tests/test_ed25519.key deleted file mode 100644 index eb9f94c2..00000000 --- a/tests/test_ed25519.key +++ /dev/null @@ -1,8 +0,0 @@ ------BEGIN OPENSSH PRIVATE KEY----- -b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW -QyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXwAAAKhjwAdrY8AH -awAAAAtzc2gtZWQyNTUxOQAAACB69SvZKJh/9VgSL0G27b5xVYa8nethH3IERbi0YqJDXw -AAAEA9tGQi2IrprbOSbDCF+RmAHd6meNSXBUQ2ekKXm4/8xnr1K9komH/1WBIvQbbtvnFV -hryd62EfcgRFuLRiokNfAAAAI2FsZXhfZ2F5bm9yQEFsZXhzLU1hY0Jvb2stQWlyLmxvY2 -FsAQI= ------END OPENSSH PRIVATE KEY----- diff --git a/tests/test_file.py b/tests/test_file.py index 456c0388..9344495b 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -26,7 +26,7 @@ from io import BytesIO from paramiko.common import linefeed_byte, crlf, cr_byte from paramiko.file import BufferedFile -from .util import needs_builtin +from ._util import needs_builtin class LoopbackFile(BufferedFile): diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index 671f1ba0..da62fd97 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -24,7 +24,7 @@ Test the used APIs for GSS-API / SSPI authentication import socket -from .util import needs_gssapi, KerberosTestCase, update_env +from ._util import needs_gssapi, KerberosTestCase, update_env # # NOTE: KerberosTestCase skips all tests if it was unable to import k5test diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index d4868f4a..c33f4c68 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -31,7 +31,7 @@ import unittest import paramiko -from .util import needs_gssapi, KerberosTestCase, update_env +from ._util import needs_gssapi, KerberosTestCase, update_env class NullServer(paramiko.ServerInterface): @@ -80,7 +80,7 @@ class GSSKexTest(KerberosTestCase): def _run(self): self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks, gss_kex=True) - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file("tests/rsa.key") self.ts.add_server_key(host_key) self.ts.set_gss_host(self.realm.hostname) try: @@ -96,7 +96,7 @@ class GSSKexTest(KerberosTestCase): Diffie-Hellman Key Exchange and user authentication with the GSS-API context created during key exchange. """ - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file("tests/rsa.key") public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py index d4dd58ad..aee21c21 100644 --- a/tests/test_packetizer.py +++ b/tests/test_packetizer.py @@ -30,7 +30,7 @@ from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes from paramiko import Message, Packetizer, util from paramiko.common import byte_chr, zero_byte -from .loop import LoopSocket +from ._loop import LoopSocket x55 = byte_chr(0x55) diff --git a/tests/test_pkey.py b/tests/test_pkey.py index 5dfaaff7..c5b20f91 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -45,7 +45,7 @@ from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateNumbers from unittest.mock import patch, Mock import pytest -from .util import _support, is_low_entropy, requires_sha1_signing +from ._util import _support, is_low_entropy, requires_sha1_signing # from openssh's ssh-keygen @@ -161,7 +161,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(exp, key) def test_load_rsa(self): - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) self.assertEqual("ssh-rsa", key.get_name()) exp_rsa = b(FINGER_RSA.split()[1].replace(":", "")) my_rsa = hexlify(key.get_fingerprint()) @@ -184,7 +184,7 @@ class KeyTest(unittest.TestCase): ) as loader: loader.side_effect = exception with pytest.raises(SSHException, match=str(exception)): - RSAKey.from_private_key_file(_support("test_rsa.key")) + RSAKey.from_private_key_file(_support("rsa.key")) def test_loading_empty_keys_errors_usefully(self): # #1599 - raise SSHException instead of IndexError @@ -231,7 +231,7 @@ class KeyTest(unittest.TestCase): def test_compare_rsa(self): # verify that the private & public keys compare equal - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) self.assertEqual(key, key) pub = RSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -248,7 +248,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key, pub) def _sign_and_verify_rsa(self, algorithm, saved_sig): - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) msg = key.sign_ssh_data(b"ice weasels", algorithm) assert isinstance(msg, Message) msg.rewind() @@ -329,7 +329,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key.get_name(), "ecdsa-sha2-nistp521") def test_load_ecdsa_256(self): - key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) + key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key")) self.assertEqual("ecdsa-sha2-nistp256", key.get_name()) exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(":", "")) my_ecdsa = hexlify(key.get_fingerprint()) @@ -357,7 +357,7 @@ class KeyTest(unittest.TestCase): def test_compare_ecdsa_256(self): # verify that the private & public keys compare equal - key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) + key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key")) self.assertEqual(key, key) pub = ECDSAKey(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -366,7 +366,7 @@ class KeyTest(unittest.TestCase): def test_sign_ecdsa_256(self): # verify that the rsa private key can sign and verify - key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) + key = ECDSAKey.from_private_key_file(_support("ecdsa-256.key")) msg = key.sign_ssh_data(b"ice weasels") self.assertTrue(type(msg) is Message) msg.rewind() @@ -408,7 +408,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(384, key.get_bits()) def test_load_ecdsa_transmutes_crypto_exceptions(self): - path = _support("test_ecdsa_256.key") + path = _support("ecdsa-256.key") # TODO: nix unittest for pytest for exception in (TypeError("onoz"), UnsupportedAlgorithm("oops")): with patch( @@ -569,12 +569,12 @@ class KeyTest(unittest.TestCase): RSAKey.from_private_key_file(_support("test_rsa_openssh_nopad.key")) def test_stringification(self): - key = RSAKey.from_private_key_file(_support("test_rsa.key")) + key = RSAKey.from_private_key_file(_support("rsa.key")) comparable = TEST_KEY_BYTESTR self.assertEqual(str(key), comparable) def test_ed25519(self): - key1 = Ed25519Key.from_private_key_file(_support("test_ed25519.key")) + key1 = Ed25519Key.from_private_key_file(_support("ed25519.key")) key2 = Ed25519Key.from_private_key_file( _support("test_ed25519_password.key"), b"abc123" ) @@ -594,7 +594,7 @@ class KeyTest(unittest.TestCase): def test_ed25519_compare(self): # verify that the private & public keys compare equal - key = Ed25519Key.from_private_key_file(_support("test_ed25519.key")) + key = Ed25519Key.from_private_key_file(_support("ed25519.key")) self.assertEqual(key, key) pub = Ed25519Key(data=key.asbytes()) self.assertTrue(key.can_sign()) @@ -616,12 +616,13 @@ class KeyTest(unittest.TestCase): ) assert original != generated + # TODO: use keys fixture def keys(self): for key_class, filename in [ - (RSAKey, "test_rsa.key"), + (RSAKey, "rsa.key"), (DSSKey, "dss.key"), - (ECDSAKey, "test_ecdsa_256.key"), - (Ed25519Key, "test_ed25519.key"), + (ECDSAKey, "ecdsa-256.key"), + (Ed25519Key, "ed25519.key"), ]: key1 = key_class.from_private_key_file(_support(filename)) key2 = key_class.from_private_key_file(_support(filename)) @@ -643,6 +644,7 @@ class KeyTest(unittest.TestCase): for key1, key2 in self.keys(): assert hash(key1) == hash(key2) + # TODO: use keys fixture def test_new_fingerprint(self): # Assumes the RSA, DSS, ECDSA, Ed25519 order seen in 'def keys'. fingerprints = [x.fingerprint for x, _ in self.keys()] @@ -653,6 +655,7 @@ class KeyTest(unittest.TestCase): "SHA256:J6VESFdD3xSChn8y9PzWzeF+1tl892mOy2TqkMLO4ow", ] + # TODO: use keys fixture def test_algorithm_property(self): # Assumes the RSA, DSS, ECDSA, Ed25519 order seen in 'def keys'. algorithms = [x.algorithm_name for x, _ in self.keys()] @@ -669,7 +672,7 @@ class KeyTest(unittest.TestCase): # No exception -> it's good. Meh. def test_ed25519_load_from_file_obj(self): - with open(_support("test_ed25519.key")) as pkey_fileobj: + with open(_support("ed25519.key")) as pkey_fileobj: key = Ed25519Key.from_private_key(pkey_fileobj) self.assertEqual(key, key) self.assertTrue(key.can_sign()) diff --git a/tests/test_rsa.key b/tests/test_rsa.key deleted file mode 100644 index f50e9c53..00000000 --- a/tests/test_rsa.key +++ /dev/null @@ -1,15 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz -oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/ -d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB -gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0 -EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon -soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H -tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU -avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA -4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g -H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv -qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV -HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc -nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7 ------END RSA PRIVATE KEY----- diff --git a/tests/test_sftp.py b/tests/test_sftp.py index be123de4..7fd274bc 100644 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -38,8 +38,8 @@ from paramiko.sftp_attr import SFTPAttributes from paramiko.util import b, u from tests import requireNonAsciiLocale -from .util import needs_builtin -from .util import slow +from ._util import needs_builtin +from ._util import slow ARTICLE = """ diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py index 5192f657..acfe71e3 100644 --- a/tests/test_sftp_big.py +++ b/tests/test_sftp_big.py @@ -30,7 +30,7 @@ import time from paramiko.common import o660 -from .util import slow +from ._util import slow @slow diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index a8175ccb..27976a8d 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -28,7 +28,7 @@ import threading import paramiko -from .util import _support, needs_gssapi, KerberosTestCase, update_env +from ._util import _support, needs_gssapi, KerberosTestCase, update_env from .test_client import FINGERPRINTS @@ -89,7 +89,7 @@ class GSSAuthTest(KerberosTestCase): def _run(self): self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks) - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file("tests/rsa.key") self.ts.add_server_key(host_key) server = NullServer() self.ts.start_server(self.event, server) @@ -100,7 +100,7 @@ class GSSAuthTest(KerberosTestCase): The exception is ... no exception yet """ - host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") + host_key = paramiko.RSAKey.from_private_key_file("tests/rsa.key") public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() @@ -154,7 +154,7 @@ class GSSAuthTest(KerberosTestCase): "this_host_does_not_exists_and_causes_a_GSSAPI-exception" ) self._test_connection( - key_filename=[_support("test_rsa.key")], + key_filename=[_support("rsa.key")], allow_agent=False, look_for_keys=False, ) diff --git a/tests/test_transport.py b/tests/test_transport.py index a6b15ee1..d8ac8a4b 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -60,8 +60,8 @@ from paramiko.common import ( ) from paramiko.message import Message -from .util import needs_builtin, _support, requires_sha1_signing, slow -from .loop import LoopSocket +from ._util import needs_builtin, _support, requires_sha1_signing, slow +from ._loop import LoopSocket LONG_BANNER = """\ @@ -168,7 +168,7 @@ class TransportTest(unittest.TestCase): def setup_test_server( self, client_options=None, server_options=None, connect_kwargs=None ): - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) @@ -234,7 +234,7 @@ class TransportTest(unittest.TestCase): loopback sockets. this is hardly "simple" but it's simpler than the later tests. :) """ - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -260,7 +260,7 @@ class TransportTest(unittest.TestCase): """ verify that a long banner doesn't mess up the handshake. """ - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -910,7 +910,7 @@ class TransportTest(unittest.TestCase): # be fine. Even tho it's a bit squicky. self.tc.packetizer = SlowPacketizer(self.tc.sock) # Continue with regular test red tape. - host_key = RSAKey.from_private_key_file(_support("test_rsa.key")) + host_key = RSAKey.from_private_key_file(_support("rsa.key")) public_host_key = RSAKey(data=host_key.asbytes()) self.ts.add_server_key(host_key) event = threading.Event() @@ -1204,7 +1204,7 @@ def server( :param hostkey: Host key to use for the server; if None, loads - ``test_rsa.key``. + ``rsa.key``. :param init: Default `Transport` constructor kwargs to use for both sides. :param server_init: @@ -1234,7 +1234,7 @@ def server( ts = Transport(socks, **dict(init, **server_init)) if hostkey is None: - hostkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + hostkey = RSAKey.from_private_key_file(_support("rsa.key")) ts.add_server_key(hostkey) event = threading.Event() server = NullServer(allowed_keys=pubkeys) @@ -1344,7 +1344,7 @@ class TestSHA2SignatureKeyExchange(unittest.TestCase): # (This is a regression test vs previous implementation which overwrote # the entire preferred-hostkeys structure when given an explicit key as # a client.) - hostkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + hostkey = RSAKey.from_private_key_file(_support("rsa.key")) with server(hostkey=hostkey, connect=dict(hostkey=hostkey)) as (tc, _): assert tc.host_key_type == "rsa-sha2-512" @@ -1359,7 +1359,7 @@ class TestExtInfo(unittest.TestCase): } def test_client_uses_server_sig_algs_for_pubkey_auth(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), @@ -1376,7 +1376,7 @@ class TestExtInfo(unittest.TestCase): # with this module anyways... class TestSHA2SignaturePubkeys(unittest.TestCase): def test_pubkey_auth_honors_disabled_algorithms(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), @@ -1391,7 +1391,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): assert "no RSA pubkey algorithms" in str(err) def test_client_sha2_disabled_server_sha1_disabled_no_match(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), @@ -1402,7 +1402,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): assert isinstance(err, AuthenticationException) def test_client_sha1_disabled_server_sha2_disabled_no_match(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), @@ -1414,7 +1414,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): @requires_sha1_signing def test_ssh_rsa_still_used_when_sha2_disabled(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) # NOTE: this works because key obj comparison uses public bytes # TODO: would be nice for PKey to grow a legit "give me another obj of # same class but just the public bits" using asbytes() @@ -1424,7 +1424,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): assert tc.is_authenticated() def test_sha2_512(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), @@ -1436,7 +1436,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): assert tc._agreed_pubkey_algorithm == "rsa-sha2-512" def test_sha2_256(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), @@ -1448,7 +1448,7 @@ class TestSHA2SignaturePubkeys(unittest.TestCase): assert tc._agreed_pubkey_algorithm == "rsa-sha2-256" def test_sha2_256_when_client_only_enables_256(self): - privkey = RSAKey.from_private_key_file(_support("test_rsa.key")) + privkey = RSAKey.from_private_key_file(_support("rsa.key")) with server( pubkeys=[privkey], connect=dict(pkey=privkey), diff --git a/tests/util.py b/tests/util.py deleted file mode 100644 index 2f1c5ac2..00000000 --- a/tests/util.py +++ /dev/null @@ -1,178 +0,0 @@ -from os.path import dirname, realpath, join -import builtins -import os -from pathlib import Path -import struct -import sys -import unittest - -import pytest - -from paramiko.ssh_gss import GSS_AUTH_AVAILABLE - -from cryptography.exceptions import UnsupportedAlgorithm, _Reasons -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.asymmetric import padding, rsa - -tests_dir = dirname(realpath(__file__)) - - -def _support(filename): - base = Path(tests_dir) - top = base / filename - deeper = base / "_support" / filename - return str(deeper if deeper.exists() else top) - - -def _config(name): - return join(tests_dir, "configs", name) - - -needs_gssapi = pytest.mark.skipif( - not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test" -) - - -def needs_builtin(name): - """ - Skip decorated test if builtin name does not exist. - """ - reason = "Test requires a builtin '{}'".format(name) - return pytest.mark.skipif(not hasattr(builtins, name), reason=reason) - - -slow = pytest.mark.slow - -# GSSAPI / Kerberos related tests need a working Kerberos environment. -# The class `KerberosTestCase` provides such an environment or skips all tests. -# There are 3 distinct cases: -# -# - A Kerberos environment has already been created and the environment -# contains the required information. -# -# - We can use the package 'k5test' to setup an working kerberos environment on -# the fly. -# -# - We skip all tests. -# -# ToDo: add a Windows specific implementation? - -if ( - os.environ.get("K5TEST_USER_PRINC", None) - and os.environ.get("K5TEST_HOSTNAME", None) - and os.environ.get("KRB5_KTNAME", None) -): # add other vars as needed - - # The environment provides the required information - class DummyK5Realm: - def __init__(self): - for k in os.environ: - if not k.startswith("K5TEST_"): - continue - setattr(self, k[7:].lower(), os.environ[k]) - self.env = {} - - class KerberosTestCase(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.realm = DummyK5Realm() - - @classmethod - def tearDownClass(cls): - del cls.realm - -else: - try: - # Try to setup a kerberos environment - from k5test import KerberosTestCase - except Exception: - # Use a dummy, that skips all tests - class KerberosTestCase(unittest.TestCase): - @classmethod - def setUpClass(cls): - raise unittest.SkipTest( - "Missing extension package k5test. " - 'Please run "pip install k5test" ' - "to install it." - ) - - -def update_env(testcase, mapping, env=os.environ): - """Modify os.environ during a test case and restore during cleanup.""" - saved_env = env.copy() - - def replace(target, source): - target.update(source) - for k in list(target): - if k not in source: - target.pop(k, None) - - testcase.addCleanup(replace, env, saved_env) - env.update(mapping) - return testcase - - -def k5shell(args=None): - """Create a shell with an kerberos environment - - This can be used to debug paramiko or to test the old GSSAPI. - To test a different GSSAPI, simply activate a suitable venv - within the shell. - """ - import k5test - import atexit - import subprocess - - k5 = k5test.K5Realm() - atexit.register(k5.stop) - os.environ.update(k5.env) - for n in ("realm", "user_princ", "hostname"): - os.environ["K5TEST_" + n.upper()] = getattr(k5, n) - - if not args: - args = sys.argv[1:] - if not args: - args = [os.environ.get("SHELL", "bash")] - sys.exit(subprocess.call(args)) - - -def is_low_entropy(): - """ - Attempts to detect whether running interpreter is low-entropy. - - "low-entropy" is defined as being in 32-bit mode and with the hash seed set - to zero. - """ - is_32bit = struct.calcsize("P") == 32 / 8 - # I don't see a way to tell internally if the hash seed was set this - # way, but env should be plenty sufficient, this is only for testing. - return is_32bit and os.environ.get("PYTHONHASHSEED", None) == "0" - - -def sha1_signing_unsupported(): - """ - This is used to skip tests in environments where SHA-1 signing is - not supported by the backend. - """ - private_key = rsa.generate_private_key( - public_exponent=65537, key_size=2048, backend=default_backend() - ) - message = b"Some dummy text" - try: - private_key.sign( - message, - padding.PSS( - mgf=padding.MGF1(hashes.SHA1()), - salt_length=padding.PSS.MAX_LENGTH, - ), - hashes.SHA1(), - ) - return False - except UnsupportedAlgorithm as e: - return e._reason is _Reasons.UNSUPPORTED_HASH - - -requires_sha1_signing = unittest.skipIf( - sha1_signing_unsupported(), "SHA-1 signing not supported" -) -- cgit v1.2.3