diff options
author | Jeff Forcier <jeff@bitprophet.org> | 2019-12-03 14:35:04 -0500 |
---|---|---|
committer | Jeff Forcier <jeff@bitprophet.org> | 2019-12-03 14:35:04 -0500 |
commit | 9e6fc80397582838de8124344efcde6b17472b73 (patch) | |
tree | 50fd8121c695ad5f322ebb377b9815711098e531 /tests | |
parent | 25de1a7a02a2189614787718739efbde86d5bb8e (diff) | |
parent | 84fa355a253d30d6c39adaea8bb095ced0c3b751 (diff) |
Merge branch 'master' into 1343-int
Diffstat (limited to 'tests')
70 files changed, 2296 insertions, 851 deletions
diff --git a/tests/configs/basic b/tests/configs/basic new file mode 100644 index 00000000..93fe3beb --- /dev/null +++ b/tests/configs/basic @@ -0,0 +1,4 @@ +CanonicalDomains paramiko.org + +Host www.paramiko.org + User rando diff --git a/tests/configs/canon b/tests/configs/canon new file mode 100644 index 00000000..7b979408 --- /dev/null +++ b/tests/configs/canon @@ -0,0 +1,8 @@ +CanonicalizeHostname yes +CanonicalDomains paramiko.org + +IdentityFile base.key + +Host www.paramiko.org + User rando + IdentityFile canonicalized.key diff --git a/tests/configs/canon-always b/tests/configs/canon-always new file mode 100644 index 00000000..f3f56b70 --- /dev/null +++ b/tests/configs/canon-always @@ -0,0 +1,5 @@ +CanonicalDomains paramiko.org +CanonicalizeHostname always + +Host www.paramiko.org + User rando diff --git a/tests/configs/canon-ipv4 b/tests/configs/canon-ipv4 new file mode 100644 index 00000000..92c3875f --- /dev/null +++ b/tests/configs/canon-ipv4 @@ -0,0 +1,6 @@ +CanonicalDomains paramiko.org +CanonicalizeHostname yes +AddressFamily inet + +Host www.paramiko.org + User rando diff --git a/tests/configs/canon-local b/tests/configs/canon-local new file mode 100644 index 00000000..dde9f77b --- /dev/null +++ b/tests/configs/canon-local @@ -0,0 +1,6 @@ +Host www.paramiko.org + User rando + +Host www + CanonicalDomains paramiko.org + CanonicalizeHostname yes diff --git a/tests/configs/canon-local-always b/tests/configs/canon-local-always new file mode 100644 index 00000000..0ad0535a --- /dev/null +++ b/tests/configs/canon-local-always @@ -0,0 +1,6 @@ +Host www.paramiko.org + User rando + +Host www + CanonicalDomains paramiko.org + CanonicalizeHostname always diff --git a/tests/configs/deep-canon b/tests/configs/deep-canon new file mode 100644 index 00000000..483823d5 --- /dev/null +++ b/tests/configs/deep-canon @@ -0,0 +1,11 @@ +CanonicalizeHostname yes +CanonicalDomains paramiko.org + +Host www.paramiko.org + User rando + +Host sub.www.paramiko.org + User deep + +Host subber.sub.www.paramiko.org + User deeper diff --git a/tests/configs/deep-canon-maxdots b/tests/configs/deep-canon-maxdots new file mode 100644 index 00000000..7785f660 --- /dev/null +++ b/tests/configs/deep-canon-maxdots @@ -0,0 +1,12 @@ +CanonicalizeHostname yes +CanonicalDomains paramiko.org +CanonicalizeMaxDots 2 + +Host www.paramiko.org + User rando + +Host sub.www.paramiko.org + User deep + +Host subber.sub.www.paramiko.org + User deeper diff --git a/tests/configs/empty-canon b/tests/configs/empty-canon new file mode 100644 index 00000000..19743ad6 --- /dev/null +++ b/tests/configs/empty-canon @@ -0,0 +1,6 @@ +CanonicalizeHostname yes +CanonicalDomains +AddressFamily inet + +Host www.paramiko.org + User rando diff --git a/tests/configs/fallback-no b/tests/configs/fallback-no new file mode 100644 index 00000000..ec8d13ee --- /dev/null +++ b/tests/configs/fallback-no @@ -0,0 +1,6 @@ +CanonicalizeHostname yes +CanonicalDomains paramiko.org +CanonicalizeFallbackLocal no + +Host www.paramiko.org + User rando diff --git a/tests/configs/fallback-yes b/tests/configs/fallback-yes new file mode 100644 index 00000000..bc4f4eee --- /dev/null +++ b/tests/configs/fallback-yes @@ -0,0 +1,6 @@ +CanonicalizeHostname yes +CanonicalDomains paramiko.org +CanonicalizeFallbackLocal yes + +Host www.paramiko.org + User rando diff --git a/tests/configs/hostname-exec-tokenized b/tests/configs/hostname-exec-tokenized new file mode 100644 index 00000000..1cae2c03 --- /dev/null +++ b/tests/configs/hostname-exec-tokenized @@ -0,0 +1,2 @@ +Match exec "ping %h" + HostName pingable.%h diff --git a/tests/configs/hostname-tokenized b/tests/configs/hostname-tokenized new file mode 100644 index 00000000..1905c0cc --- /dev/null +++ b/tests/configs/hostname-tokenized @@ -0,0 +1 @@ +HostName prefix.%h diff --git a/tests/configs/invalid b/tests/configs/invalid new file mode 100644 index 00000000..81332fe8 --- /dev/null +++ b/tests/configs/invalid @@ -0,0 +1 @@ +lolwut diff --git a/tests/configs/match-all b/tests/configs/match-all new file mode 100644 index 00000000..7673e0a0 --- /dev/null +++ b/tests/configs/match-all @@ -0,0 +1,2 @@ +Match all + User awesome diff --git a/tests/configs/match-all-after-canonical b/tests/configs/match-all-after-canonical new file mode 100644 index 00000000..531112cb --- /dev/null +++ b/tests/configs/match-all-after-canonical @@ -0,0 +1,5 @@ +CanonicalizeHostname yes +CanonicalDomains paramiko.org + +Match canonical all + User awesome diff --git a/tests/configs/match-all-and-more b/tests/configs/match-all-and-more new file mode 100644 index 00000000..bb50696e --- /dev/null +++ b/tests/configs/match-all-and-more @@ -0,0 +1,2 @@ +Match all exec "lol nope" + HostName whatever diff --git a/tests/configs/match-all-and-more-before b/tests/configs/match-all-and-more-before new file mode 100644 index 00000000..4d5b2e34 --- /dev/null +++ b/tests/configs/match-all-and-more-before @@ -0,0 +1,2 @@ +Match exec "lol nope" all + HostName whatever diff --git a/tests/configs/match-all-before-canonical b/tests/configs/match-all-before-canonical new file mode 100644 index 00000000..35e3b0e2 --- /dev/null +++ b/tests/configs/match-all-before-canonical @@ -0,0 +1,5 @@ +CanonicalizeHostname yes +CanonicalDomains paramiko.org + +Match all canonical + User oops diff --git a/tests/configs/match-canonical-no b/tests/configs/match-canonical-no new file mode 100644 index 00000000..e528dc64 --- /dev/null +++ b/tests/configs/match-canonical-no @@ -0,0 +1,7 @@ +CanonicalizeHostname no + +Match canonical all + User awesome + +Match !canonical host specific + User overload diff --git a/tests/configs/match-canonical-yes b/tests/configs/match-canonical-yes new file mode 100644 index 00000000..d6c20928 --- /dev/null +++ b/tests/configs/match-canonical-yes @@ -0,0 +1,5 @@ +CanonicalizeHostname yes +CanonicalDomains paramiko.org + +Match !canonical host www* + User hidden diff --git a/tests/configs/match-complex b/tests/configs/match-complex new file mode 100644 index 00000000..63634039 --- /dev/null +++ b/tests/configs/match-complex @@ -0,0 +1,17 @@ +HostName bogus + +Match originalhost target host bogus + User rand + +Match originalhost remote localuser rando + User calrissian + +# Just to set user for subsequent match +Match originalhost www + User calrissian + +Match !canonical originalhost www host bogus localuser rando user calrissian + Port 7777 + +Match !canonical !originalhost www host bogus localuser rando !user calrissian + Port 1234 diff --git a/tests/configs/match-exec b/tests/configs/match-exec new file mode 100644 index 00000000..763346ea --- /dev/null +++ b/tests/configs/match-exec @@ -0,0 +1,16 @@ +Match exec "quoted" + User benjamin + +Match exec unquoted + User rando + +Match exec "quoted spaced" + User neil + +# Just to prepopulate values for tokenizing subsequent exec +Host target + User intermediate + HostName configured + +Match exec "%d %h %L %l %n %p %r %u" + Port 1337 diff --git a/tests/configs/match-exec-canonical b/tests/configs/match-exec-canonical new file mode 100644 index 00000000..794ee9d5 --- /dev/null +++ b/tests/configs/match-exec-canonical @@ -0,0 +1,10 @@ +CanonicalDomains paramiko.org +CanonicalizeHostname always + +# This will match in the first, uncanonicalized pass +Match !canonical exec uncanonicalized + User defenseless + +# And this will match the second time +Match canonical exec canonicalized + Port 8007 diff --git a/tests/configs/match-exec-negation b/tests/configs/match-exec-negation new file mode 100644 index 00000000..937c910e --- /dev/null +++ b/tests/configs/match-exec-negation @@ -0,0 +1,5 @@ +Match !exec "this succeeds" + User nope + +Match !exec "this fails" + User yup diff --git a/tests/configs/match-exec-no-arg b/tests/configs/match-exec-no-arg new file mode 100644 index 00000000..20c16d16 --- /dev/null +++ b/tests/configs/match-exec-no-arg @@ -0,0 +1,2 @@ +Match exec + User uh-oh diff --git a/tests/configs/match-host b/tests/configs/match-host new file mode 100644 index 00000000..86cbff5d --- /dev/null +++ b/tests/configs/match-host @@ -0,0 +1,2 @@ +Match host target + User rand diff --git a/tests/configs/match-host-canonicalized b/tests/configs/match-host-canonicalized new file mode 100644 index 00000000..52dadeae --- /dev/null +++ b/tests/configs/match-host-canonicalized @@ -0,0 +1,8 @@ +CanonicalizeHostname yes +CanonicalDomains paramiko.org + +Match host www.paramiko.org + User rand + +Match canonical host docs.paramiko.org + User eric diff --git a/tests/configs/match-host-from-match b/tests/configs/match-host-from-match new file mode 100644 index 00000000..172ee116 --- /dev/null +++ b/tests/configs/match-host-from-match @@ -0,0 +1,5 @@ +Match host original-host + HostName substituted-host + +Match host substituted-host + User inner diff --git a/tests/configs/match-host-glob b/tests/configs/match-host-glob new file mode 100644 index 00000000..3d53cf48 --- /dev/null +++ b/tests/configs/match-host-glob @@ -0,0 +1,2 @@ +Match host *ever + User matrim diff --git a/tests/configs/match-host-glob-list b/tests/configs/match-host-glob-list new file mode 100644 index 00000000..3617d136 --- /dev/null +++ b/tests/configs/match-host-glob-list @@ -0,0 +1,8 @@ +Match host *ever + User matrim + +Match host somehost,someotherhost + User thom + +Match host goo*,!goof + User perrin diff --git a/tests/configs/match-host-name b/tests/configs/match-host-name new file mode 100644 index 00000000..783d939e --- /dev/null +++ b/tests/configs/match-host-name @@ -0,0 +1,4 @@ +HostName default-host + +Match host default-host + User silly diff --git a/tests/configs/match-host-negated b/tests/configs/match-host-negated new file mode 100644 index 00000000..7c5d3f3e --- /dev/null +++ b/tests/configs/match-host-negated @@ -0,0 +1,2 @@ +Match !host www + User jeff diff --git a/tests/configs/match-host-no-arg b/tests/configs/match-host-no-arg new file mode 100644 index 00000000..191cebb5 --- /dev/null +++ b/tests/configs/match-host-no-arg @@ -0,0 +1,2 @@ +Match host + User oops diff --git a/tests/configs/match-localuser b/tests/configs/match-localuser new file mode 100644 index 00000000..fe4a276c --- /dev/null +++ b/tests/configs/match-localuser @@ -0,0 +1,14 @@ +Match localuser gandalf + HostName gondor + +Match localuser b* + HostName shire + +Match localuser aragorn,frodo + HostName moria + +Match localuser gimli,!legolas + Port 7373 + +Match !localuser sauron + HostName mordor diff --git a/tests/configs/match-localuser-no-arg b/tests/configs/match-localuser-no-arg new file mode 100644 index 00000000..6623553a --- /dev/null +++ b/tests/configs/match-localuser-no-arg @@ -0,0 +1,2 @@ +Match localuser + User oops diff --git a/tests/configs/match-orighost b/tests/configs/match-orighost new file mode 100644 index 00000000..10541993 --- /dev/null +++ b/tests/configs/match-orighost @@ -0,0 +1,16 @@ +HostName bogus + +Match originalhost target + User tuon + +Match originalhost what* + User matrim + +Match originalhost comma,sep* + User chameleon + +Match originalhost yep,!nope + User skipped + +Match !originalhost www !originalhost nope + User thom diff --git a/tests/configs/match-orighost-canonical b/tests/configs/match-orighost-canonical new file mode 100644 index 00000000..737345e8 --- /dev/null +++ b/tests/configs/match-orighost-canonical @@ -0,0 +1,5 @@ +CanonicalizeHostname yes +CanonicalDomains paramiko.org + +Match originalhost www + User tuon diff --git a/tests/configs/match-orighost-no-arg b/tests/configs/match-orighost-no-arg new file mode 100644 index 00000000..427382ba --- /dev/null +++ b/tests/configs/match-orighost-no-arg @@ -0,0 +1,2 @@ +Match originalhost + User oops diff --git a/tests/configs/match-user b/tests/configs/match-user new file mode 100644 index 00000000..14d6ac12 --- /dev/null +++ b/tests/configs/match-user @@ -0,0 +1,14 @@ +Match user gandalf + HostName gondor + +Match user b* + HostName shire + +Match user aragorn,frodo + HostName moria + +Match user gimli,!legolas + Port 7373 + +Match !user sauron + HostName mordor diff --git a/tests/configs/match-user-explicit b/tests/configs/match-user-explicit new file mode 100644 index 00000000..9a2b1d82 --- /dev/null +++ b/tests/configs/match-user-explicit @@ -0,0 +1,4 @@ +User explicit + +Match user explicit + HostName dumb diff --git a/tests/configs/match-user-no-arg b/tests/configs/match-user-no-arg new file mode 100644 index 00000000..65a11ab4 --- /dev/null +++ b/tests/configs/match-user-no-arg @@ -0,0 +1,2 @@ +Match user + User oops diff --git a/tests/configs/multi-canon-domains b/tests/configs/multi-canon-domains new file mode 100644 index 00000000..5674b442 --- /dev/null +++ b/tests/configs/multi-canon-domains @@ -0,0 +1,5 @@ +CanonicalizeHostname yes +CanonicalDomains not-a-real-tld paramiko.org + +Host www.paramiko.org + User rando diff --git a/tests/configs/no-canon b/tests/configs/no-canon new file mode 100644 index 00000000..033f8c53 --- /dev/null +++ b/tests/configs/no-canon @@ -0,0 +1,5 @@ +CanonicalizeHostname no +CanonicalDomains paramiko.org + +Host www.paramiko.org + User rando diff --git a/tests/configs/robey b/tests/configs/robey new file mode 100644 index 00000000..b2026224 --- /dev/null +++ b/tests/configs/robey @@ -0,0 +1,17 @@ +# A timeless classic? +# NOTE: some lines in here have 'extra' whitespace (incl trailing, and mixed +# tabs/spaces!) on purpose. + +Host * + User robey + IdentityFile =~/.ssh/id_rsa + +# comment +Host *.example.com + User bjork +Port=3333 +Host * + Crazy something dumb +Host spoo.example.com +Crazy something else + diff --git a/tests/configs/zero-maxdots b/tests/configs/zero-maxdots new file mode 100644 index 00000000..dc00054c --- /dev/null +++ b/tests/configs/zero-maxdots @@ -0,0 +1,9 @@ +CanonicalizeHostname yes +CanonicalDomains paramiko.org +CanonicalizeMaxDots 0 + +Host www.paramiko.org + User rando + +Host sub.www.paramiko.org + User deep diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py index 100076d6..1528a0b8 100644 --- a/tests/stub_sftp.py +++ b/tests/stub_sftp.py @@ -21,18 +21,17 @@ A stub SFTP server for loopback SFTP testing. """ import os -import sys from paramiko import ( - ServerInterface, - SFTPServerInterface, - SFTPServer, + AUTH_SUCCESSFUL, + OPEN_SUCCEEDED, SFTPAttributes, SFTPHandle, - SFTP_OK, + SFTPServer, + SFTPServerInterface, SFTP_FAILURE, - AUTH_SUCCESSFUL, - OPEN_SUCCEEDED, + SFTP_OK, + ServerInterface, ) from paramiko.common import o666 @@ -65,7 +64,8 @@ class StubSFTPHandle(SFTPHandle): class StubSFTPServer(SFTPServerInterface): # assume current folder is a fine root - # (the tests always create and eventually delete a subfolder, so there shouldn't be any mess) + # (the tests always create and eventually delete a subfolder, so there + # shouldn't be any mess) ROOT = os.getcwd() def _realpath(self, path): @@ -206,7 +206,8 @@ class StubSFTPServer(SFTPServerInterface): # compute relative to path abspath = os.path.join(os.path.dirname(path), target_path) if abspath[: len(self.ROOT)] != self.ROOT: - # this symlink isn't going to work anyway -- just break it immediately + # this symlink isn't going to work anyway -- just break it + # immediately target_path = "<error>" try: os.symlink(target_path, path) diff --git a/tests/test_auth.py b/tests/test_auth.py index d98a00c4..01fbac5b 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -250,7 +250,7 @@ class AuthTest(unittest.TestCase): self.start_server() self.tc.connect(hostkey=self.public_host_key) try: - remain = self.tc.auth_password("bad-server", "hello") + self.tc.auth_password("bad-server", "hello") except: etype, evalue, etb = sys.exc_info() self.assertTrue(issubclass(etype, AuthenticationException)) @@ -265,7 +265,7 @@ class AuthTest(unittest.TestCase): self.start_server() self.tc.connect() try: - remain = self.tc.auth_password("unresponsive-server", "hello") + self.tc.auth_password("unresponsive-server", "hello") except: etype, evalue, etb = sys.exc_info() self.assertTrue(issubclass(etype, AuthenticationException)) diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py index 28d6e4a2..61c99cc0 100644 --- a/tests/test_buffered_pipe.py +++ b/tests/test_buffered_pipe.py @@ -26,7 +26,6 @@ import unittest from paramiko.buffered_pipe import BufferedPipe, PipeTimeout from paramiko import pipe -from paramiko.py3compat import b def delay_thread(p): @@ -42,7 +41,7 @@ def close_thread(p): class BufferedPipeTest(unittest.TestCase): - def test_1_buffered_pipe(self): + def test_buffered_pipe(self): p = BufferedPipe() self.assertTrue(not p.read_ready()) p.feed("hello.") @@ -59,7 +58,7 @@ class BufferedPipeTest(unittest.TestCase): self.assertTrue(not p.read_ready()) self.assertEqual(b"", p.read(1)) - def test_2_delay(self): + def test_delay(self): p = BufferedPipe() self.assertTrue(not p.read_ready()) threading.Thread(target=delay_thread, args=(p,)).start() @@ -72,13 +71,13 @@ class BufferedPipeTest(unittest.TestCase): self.assertEqual(b"b", p.read(1, 1.0)) self.assertEqual(b"", p.read(1)) - def test_3_close_while_reading(self): + def test_close_while_reading(self): p = BufferedPipe() threading.Thread(target=close_thread, args=(p,)).start() data = p.read(1, 1.0) self.assertEqual(b"", data) - def test_4_or_pipe(self): + def test_or_pipe(self): p = pipe.make_pipe() p1, p2 = pipe.make_or_pipe(p) self.assertFalse(p._set) diff --git a/tests/test_channelfile.py b/tests/test_channelfile.py new file mode 100644 index 00000000..4448fdfb --- /dev/null +++ b/tests/test_channelfile.py @@ -0,0 +1,60 @@ +from mock import patch, MagicMock + +from paramiko import Channel, ChannelFile, ChannelStderrFile, ChannelStdinFile + + +class ChannelFileBase(object): + @patch("paramiko.channel.ChannelFile._set_mode") + def test_defaults_to_unbuffered_reading(self, setmode): + self.klass(Channel(None)) + setmode.assert_called_once_with("r", -1) + + @patch("paramiko.channel.ChannelFile._set_mode") + def test_can_override_mode_and_bufsize(self, setmode): + self.klass(Channel(None), mode="w", bufsize=25) + setmode.assert_called_once_with("w", 25) + + def test_read_recvs_from_channel(self): + chan = MagicMock() + cf = self.klass(chan) + cf.read(100) + chan.recv.assert_called_once_with(100) + + def test_write_calls_channel_sendall(self): + chan = MagicMock() + cf = self.klass(chan, mode="w") + cf.write("ohai") + chan.sendall.assert_called_once_with(b"ohai") + + +class TestChannelFile(ChannelFileBase): + klass = ChannelFile + + +class TestChannelStderrFile(object): + def test_read_calls_channel_recv_stderr(self): + chan = MagicMock() + cf = ChannelStderrFile(chan) + cf.read(100) + chan.recv_stderr.assert_called_once_with(100) + + def test_write_calls_channel_sendall(self): + chan = MagicMock() + cf = ChannelStderrFile(chan, mode="w") + cf.write("ohai") + chan.sendall_stderr.assert_called_once_with(b"ohai") + + +class TestChannelStdinFile(ChannelFileBase): + klass = ChannelStdinFile + + def test_close_calls_channel_shutdown_write(self): + chan = MagicMock() + cf = ChannelStdinFile(chan, mode="wb") + cf.flush = MagicMock() + cf.close() + # Sanity check that we still call BufferedFile.close() + cf.flush.assert_called_once_with() + assert cf._closed is True + # Actual point of test + chan.shutdown_write.assert_called_once_with() diff --git a/tests/test_client.py b/tests/test_client.py index 80b28adf..60ad310c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -34,10 +34,11 @@ import weakref from tempfile import mkstemp from pytest_relaxed import raises +from mock import patch, Mock import paramiko +from paramiko import SSHClient from paramiko.pkey import PublicBlob -from paramiko.common import PY2 from paramiko.ssh_exception import SSHException, AuthenticationException from .util import _support, slow @@ -48,9 +49,9 @@ requires_gss_auth = unittest.skipUnless( ) FINGERPRINTS = { - "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", - "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", - "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", + "ssh-dss": b"\x44\x78\xf0\xb9\xa2\x3c\xc5\x18\x20\x09\xff\x75\x5b\xc1\xd2\x6c", # noqa + "ssh-rsa": b"\x60\x73\x38\x44\xcb\x51\x86\x65\x7f\xde\xda\xa2\x2b\x5a\x57\xd5", # noqa + "ecdsa-sha2-nistp256": b"\x25\x19\xeb\x55\xe6\xa1\x47\xff\x4f\x38\xd2\x75\x6f\xa5\xd5\x60", # noqa "ssh-ed25519": b'\xb3\xd5"\xaa\xf9u^\xe8\xcd\x0e\xea\x02\xb9)\xa2\x80', } @@ -192,7 +193,7 @@ class ClientTest(unittest.TestCase): public_host_key = paramiko.RSAKey(data=host_key.asbytes()) # Client setup - self.tc = paramiko.SSHClient() + self.tc = SSHClient() self.tc.get_host_keys().add( "[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key ) @@ -212,6 +213,12 @@ class ClientTest(unittest.TestCase): stdin, stdout, stderr = self.tc.exec_command("yes") schan = self.ts.accept(1.0) + # Nobody else tests the API of exec_command so let's do it here for + # now. :weary: + assert isinstance(stdin, paramiko.ChannelStdinFile) + assert isinstance(stdout, paramiko.ChannelFile) + assert isinstance(stderr, paramiko.ChannelStderrFile) + schan.send("Hello there.\n") schan.send_stderr("This is on stderr.\n") schan.close() @@ -228,13 +235,13 @@ class ClientTest(unittest.TestCase): class SSHClientTest(ClientTest): - def test_1_client(self): + def test_client(self): """ verify that the SSHClient stuff works too. """ self._test_connection(password="pygmalion") - def test_2_client_dsa(self): + def test_client_dsa(self): """ verify that SSHClient works with a DSA key. """ @@ -246,7 +253,7 @@ class SSHClientTest(ClientTest): """ self._test_connection(key_filename=_support("test_rsa.key")) - def test_2_5_client_ecdsa(self): + def test_client_ecdsa(self): """ verify that SSHClient works with an ECDSA key. """ @@ -255,7 +262,7 @@ class SSHClientTest(ClientTest): def test_client_ed25519(self): self._test_connection(key_filename=_support("test_ed25519.key")) - def test_3_multiple_key_files(self): + def test_multiple_key_files(self): """ verify that SSHClient accepts and tries multiple key files. """ @@ -335,7 +342,7 @@ class SSHClientTest(ClientTest): # code path (!) so we're punting too, sob. pass - def test_4_auto_add_policy(self): + def test_auto_add_policy(self): """ verify that SSHClient's AutoAddPolicy works. """ @@ -344,7 +351,7 @@ class SSHClientTest(ClientTest): key_file = _support("test_ecdsa_256.key") public_host_key = paramiko.ECDSAKey.from_private_key_file(key_file) - self.tc = paramiko.SSHClient() + self.tc = SSHClient() self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.assertEqual(0, len(self.tc.get_host_keys())) self.tc.connect(password="pygmalion", **self.connect_kwargs) @@ -358,7 +365,7 @@ class SSHClientTest(ClientTest): new_host_key = list(self.tc.get_host_keys()[hostname].values())[0] self.assertEqual(public_host_key, new_host_key) - def test_5_save_host_keys(self): + def test_save_host_keys(self): """ verify that SSHClient correctly saves a known_hosts file. """ @@ -371,16 +378,14 @@ class SSHClientTest(ClientTest): fd, localname = mkstemp() os.close(fd) - client = paramiko.SSHClient() - self.assertEquals(0, len(client.get_host_keys())) + client = SSHClient() + assert len(client.get_host_keys()) == 0 host_id = "[%s]:%d" % (self.addr, self.port) client.get_host_keys().add(host_id, "ssh-rsa", public_host_key) - self.assertEquals(1, len(client.get_host_keys())) - self.assertEquals( - public_host_key, client.get_host_keys()[host_id]["ssh-rsa"] - ) + assert len(client.get_host_keys()) == 1 + assert public_host_key == client.get_host_keys()[host_id]["ssh-rsa"] client.save_host_keys(localname) @@ -389,7 +394,7 @@ class SSHClientTest(ClientTest): os.unlink(localname) - def test_6_cleanup(self): + def test_cleanup(self): """ verify that when an SSHClient is collected, its transport (and the transport's packetizer) is closed. @@ -400,17 +405,17 @@ class SSHClientTest(ClientTest): threading.Thread(target=self._run).start() - self.tc = paramiko.SSHClient() + self.tc = SSHClient() self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - self.assertEqual(0, len(self.tc.get_host_keys())) + assert len(self.tc.get_host_keys()) == 0 self.tc.connect(**dict(self.connect_kwargs, password="pygmalion")) self.event.wait(1.0) - self.assertTrue(self.event.is_set()) - self.assertTrue(self.ts.is_active()) + assert self.event.is_set() + assert self.ts.is_active() p = weakref.ref(self.tc._transport.packetizer) - self.assertTrue(p() is not None) + assert p() is not None self.tc.close() del self.tc @@ -420,7 +425,7 @@ class SSHClientTest(ClientTest): gc.collect() gc.collect() - self.assertTrue(p() is None) + assert p() is None def test_client_can_be_used_as_context_manager(self): """ @@ -428,10 +433,10 @@ class SSHClientTest(ClientTest): """ threading.Thread(target=self._run).start() - with paramiko.SSHClient() as tc: + with SSHClient() as tc: self.tc = tc self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - self.assertEquals(0, len(self.tc.get_host_keys())) + assert len(self.tc.get_host_keys()) == 0 self.tc.connect(**dict(self.connect_kwargs, password="pygmalion")) self.event.wait(1.0) @@ -442,7 +447,7 @@ class SSHClientTest(ClientTest): self.assertTrue(self.tc._transport is None) - def test_7_banner_timeout(self): + def test_banner_timeout(self): """ verify that the SSHClient has a configurable banner timeout. """ @@ -453,7 +458,7 @@ class SSHClientTest(ClientTest): ) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) - self.tc = paramiko.SSHClient() + self.tc = SSHClient() self.tc.get_host_keys().add( "[%s]:%d" % (self.addr, self.port), "ssh-rsa", public_host_key ) @@ -461,7 +466,7 @@ class SSHClientTest(ClientTest): kwargs = dict(self.connect_kwargs, banner_timeout=0.5) self.assertRaises(paramiko.SSHException, self.tc.connect, **kwargs) - def test_8_auth_trickledown(self): + def test_auth_trickledown(self): """ Failed key auth doesn't prevent subsequent pw auth from succeeding """ @@ -482,7 +487,7 @@ class SSHClientTest(ClientTest): self._test_connection(**kwargs) @slow - def test_9_auth_timeout(self): + def test_auth_timeout(self): """ verify that the SSHClient has a configurable auth timeout """ @@ -495,28 +500,28 @@ class SSHClientTest(ClientTest): ) @requires_gss_auth - def test_10_auth_trickledown_gsskex(self): + def test_auth_trickledown_gsskex(self): """ - Failed gssapi-keyex auth doesn't prevent subsequent key auth from succeeding + Failed gssapi-keyex doesn't prevent subsequent key from succeeding """ kwargs = dict(gss_kex=True, key_filename=[_support("test_rsa.key")]) self._test_connection(**kwargs) @requires_gss_auth - def test_11_auth_trickledown_gssauth(self): + def test_auth_trickledown_gssauth(self): """ - Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding + Failed gssapi-with-mic doesn't prevent subsequent key from succeeding """ kwargs = dict(gss_auth=True, key_filename=[_support("test_rsa.key")]) self._test_connection(**kwargs) - def test_12_reject_policy(self): + def test_reject_policy(self): """ verify that SSHClient's RejectPolicy works. """ threading.Thread(target=self._run).start() - self.tc = paramiko.SSHClient() + self.tc = SSHClient() self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) self.assertEqual(0, len(self.tc.get_host_keys())) self.assertRaises( @@ -527,15 +532,16 @@ class SSHClientTest(ClientTest): ) @requires_gss_auth - def test_13_reject_policy_gsskex(self): + def test_reject_policy_gsskex(self): """ verify that SSHClient's RejectPolicy works, even if gssapi-keyex was enabled but not used. """ - # Test for a bug present in paramiko versions released before 2017-08-01 + # Test for a bug present in paramiko versions released before + # 2017-08-01 threading.Thread(target=self._run).start() - self.tc = paramiko.SSHClient() + self.tc = SSHClient() self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) self.assertEqual(0, len(self.tc.get_host_keys())) self.assertRaises( @@ -550,7 +556,7 @@ class SSHClientTest(ClientTest): threading.Thread(target=self._run).start() hostname = "[%s]:%d" % (self.addr, self.port) - self.tc = paramiko.SSHClient() + self.tc = SSHClient() self.tc.set_missing_host_key_policy(paramiko.WarningPolicy()) known_hosts = self.tc.get_host_keys() known_hosts.add(hostname, host_key.get_name(), host_key) @@ -566,7 +572,7 @@ class SSHClientTest(ClientTest): threading.Thread(target=self._run).start() hostname = "[%s]:%d" % (self.addr, self.port) - self.tc = paramiko.SSHClient() + self.tc = SSHClient() self.tc.set_missing_host_key_policy(paramiko.RejectPolicy()) host_key = ktype.from_private_key_file(_support(kfile)) known_hosts = self.tc.get_host_keys() @@ -595,7 +601,7 @@ class SSHClientTest(ClientTest): def _setup_for_env(self): threading.Thread(target=self._run).start() - self.tc = paramiko.SSHClient() + self.tc = SSHClient() self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.assertEqual(0, len(self.tc.get_host_keys())) self.tc.connect( @@ -639,7 +645,7 @@ class SSHClientTest(ClientTest): """ # AN ACTUAL UNIT TEST?! GOOD LORD # (But then we have to test a private API...meh.) - client = paramiko.SSHClient() + client = SSHClient() # Default assert isinstance(client._policy, paramiko.RejectPolicy) # Hand in an instance (classic behavior) @@ -649,6 +655,22 @@ class SSHClientTest(ClientTest): client.set_missing_host_key_policy(paramiko.AutoAddPolicy) assert isinstance(client._policy, paramiko.AutoAddPolicy) + @patch("paramiko.client.Transport") + def test_disabled_algorithms_defaults_to_None(self, Transport): + SSHClient().connect("host", sock=Mock(), password="no") + assert Transport.call_args[1]["disabled_algorithms"] is None + + @patch("paramiko.client.Transport") + def test_disabled_algorithms_passed_directly_if_given(self, Transport): + SSHClient().connect( + "host", + sock=Mock(), + password="no", + disabled_algorithms={"keys": ["ssh-dss"]}, + ) + call_arg = Transport.call_args[1]["disabled_algorithms"] + assert call_arg == {"keys": ["ssh-dss"]} + class PasswordPassphraseTests(ClientTest): # TODO: most of these could reasonably be set up to use mocks/assertions @@ -684,9 +706,9 @@ class PasswordPassphraseTests(ClientTest): ) @raises(AuthenticationException) # TODO: more granular - def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( + def test_password_kwarg_not_used_for_passphrase_when_passphrase_kwarg_given( # noqa self - ): # noqa + ): # Sanity: if we're given both fields, the password field is NOT used as # a passphrase. self._test_connection( diff --git a/tests/test_config.py b/tests/test_config.py index cbd3f623..5e9aa059 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,74 +1,992 @@ # This file is part of Paramiko and subject to the license in /LICENSE in this # repository -import pytest +from os.path import expanduser +from socket import gaierror -from paramiko import config -from paramiko.util import parse_ssh_config -from paramiko.py3compat import StringIO +from paramiko.py3compat import string_types +from invoke import Result +from mock import patch +from pytest import raises, mark, fixture -def test_SSHConfigDict_construct_empty(): - assert not config.SSHConfigDict() +from paramiko import ( + SSHConfig, + SSHConfigDict, + CouldNotCanonicalize, + ConfigParseError, +) +from .util import _config -def test_SSHConfigDict_construct_from_list(): - assert config.SSHConfigDict([(1, 2)])[1] == 2 +@fixture +def socket(): + """ + Patch all of socket.* in our config module to prevent eg real DNS lookups. + + Also forces getaddrinfo (used in our addressfamily lookup stuff) to always + fail by default to mimic usual lack of AddressFamily related crap. -def test_SSHConfigDict_construct_from_dict(): - assert config.SSHConfigDict({1: 2})[1] == 2 + Callers who want to mock DNS lookups can then safely assume gethostbyname() + will be in use. + """ + with patch("paramiko.config.socket") as mocket: + # Reinstate gaierror as an actual exception and not a sub-mock. + # (Presumably this would work with any exception, but why not use the + # real one?) + mocket.gaierror = gaierror + # Patch out getaddrinfo, used to detect family-specific IP lookup - + # only useful for a few specific tests. + mocket.getaddrinfo.side_effect = mocket.gaierror + # Patch out getfqdn to return some real string for when it gets called; + # some code (eg tokenization) gets mad w/ MagicMocks + mocket.getfqdn.return_value = "some.fake.fqdn" + yield mocket -@pytest.mark.parametrize("true_ish", ("yes", "YES", "Yes", True)) -def test_SSHConfigDict_as_bool_true_ish(true_ish): - assert config.SSHConfigDict({"key": true_ish}).as_bool("key") is True +def load_config(name): + return SSHConfig.from_path(_config(name)) -@pytest.mark.parametrize("false_ish", ("no", "NO", "No", False)) -def test_SSHConfigDict_as_bool(false_ish): - assert config.SSHConfigDict({"key": false_ish}).as_bool("key") is False +class TestSSHConfig(object): + def setup(self): + self.config = load_config("robey") + def test_init(self): + # No args! + with raises(TypeError): + SSHConfig("uh oh!") + # No args. + assert not SSHConfig()._config -@pytest.mark.parametrize("int_val", ("42", 42)) -def test_SSHConfigDict_as_int(int_val): - assert config.SSHConfigDict({"key": int_val}).as_int("key") == 42 + def test_from_text(self): + config = SSHConfig.from_text("User foo") + assert config.lookup("foo.example.com")["user"] == "foo" + def test_from_file(self): + with open(_config("robey")) as flo: + config = SSHConfig.from_file(flo) + assert config.lookup("whatever")["user"] == "robey" -@pytest.mark.parametrize("non_int", ("not an int", None, object())) -def test_SSHConfigDict_as_int_failures(non_int): - conf = config.SSHConfigDict({"key": non_int}) + def test_from_path(self): + # NOTE: DO NOT replace with use of load_config() :D + config = SSHConfig.from_path(_config("robey")) + assert config.lookup("meh.example.com")["port"] == "3333" - try: - int(non_int) - except Exception as e: - exception_type = type(e) + def test_parse_config(self): + expected = [ + {"host": ["*"], "config": {}}, + { + "host": ["*"], + "config": {"identityfile": ["~/.ssh/id_rsa"], "user": "robey"}, + }, + { + "host": ["*.example.com"], + "config": {"user": "bjork", "port": "3333"}, + }, + {"host": ["*"], "config": {"crazy": "something dumb"}}, + { + "host": ["spoo.example.com"], + "config": {"crazy": "something else"}, + }, + ] + assert self.config._config == expected - with pytest.raises(exception_type): - conf.as_int("key") + @mark.parametrize( + "host,values", + ( + ( + "irc.danger.com", + { + "crazy": "something dumb", + "hostname": "irc.danger.com", + "user": "robey", + }, + ), + ( + "irc.example.com", + { + "crazy": "something dumb", + "hostname": "irc.example.com", + "user": "robey", + "port": "3333", + }, + ), + ( + "spoo.example.com", + { + "crazy": "something dumb", + "hostname": "spoo.example.com", + "user": "robey", + "port": "3333", + }, + ), + ), + ) + def test_host_config(self, host, values): + expected = dict( + values, hostname=host, identityfile=[expanduser("~/.ssh/id_rsa")] + ) + assert self.config.lookup(host) == expected + def test_fabric_issue_33(self): + config = SSHConfig.from_text( + """ +Host www13.* + Port 22 -def test_SSHConfig_host_dicts_are_SSHConfigDict_instances(): - test_config_file = """ Host *.example.com Port 2222 Host * Port 3333 - """ - f = StringIO(test_config_file) - config = parse_ssh_config(f) - assert config.lookup("foo.example.com").as_int("port") == 2222 +""" + ) + host = "www13.example.com" + expected = {"hostname": host, "port": "22"} + assert config.lookup(host) == expected + + def test_proxycommand_config_equals_parsing(self): + """ + ProxyCommand should not split on equals signs within the value. + """ + config = SSHConfig.from_text( + """ +Host space-delimited + ProxyCommand foo bar=biz baz + +Host equals-delimited + ProxyCommand=foo bar=biz baz +""" + ) + for host in ("space-delimited", "equals-delimited"): + value = config.lookup(host)["proxycommand"] + assert value == "foo bar=biz baz" + + def test_proxycommand_interpolation(self): + """ + ProxyCommand should perform interpolation on the value + """ + config = SSHConfig.from_text( + """ +Host specific + Port 37 + ProxyCommand host %h port %p lol + +Host portonly + Port 155 + +Host * + Port 25 + ProxyCommand host %h port %p +""" + ) + for host, val in ( + ("foo.com", "host foo.com port 25"), + ("specific", "host specific port 37 lol"), + ("portonly", "host portonly port 155"), + ): + assert config.lookup(host)["proxycommand"] == val + + def test_proxycommand_tilde_expansion(self): + """ + Tilde (~) should be expanded inside ProxyCommand + """ + config = SSHConfig.from_text( + """ +Host test + ProxyCommand ssh -F ~/.ssh/test_config bastion nc %h %p +""" + ) + expected = "ssh -F {}/.ssh/test_config bastion nc test 22".format( + expanduser("~") + ) + got = config.lookup("test")["proxycommand"] + assert got == expected + + @patch("paramiko.config.getpass") + def test_controlpath_token_expansion(self, getpass): + getpass.getuser.return_value = "gandalf" + config = SSHConfig.from_text( + """ +Host explicit_user + User root + ControlPath user %u remoteuser %r + +Host explicit_host + HostName ohai + ControlPath remoteuser %r host %h orighost %n + """ + ) + result = config.lookup("explicit_user")["controlpath"] + # Remote user is User val, local user is User val + assert result == "user gandalf remoteuser root" + result = config.lookup("explicit_host")["controlpath"] + # Remote user falls back to local user; host and orighost may differ + assert result == "remoteuser gandalf host ohai orighost explicit_host" + + def test_negation(self): + config = SSHConfig.from_text( + """ +Host www13.* !*.example.com + Port 22 + +Host *.example.com !www13.* + Port 2222 + +Host www13.* + Port 8080 + +Host * + Port 3333 +""" + ) + host = "www13.example.com" + expected = {"hostname": host, "port": "8080"} + assert config.lookup(host) == expected + + def test_proxycommand(self): + config = SSHConfig.from_text( + """ +Host proxy-with-equal-divisor-and-space +ProxyCommand = foo=bar + +Host proxy-with-equal-divisor-and-no-space +ProxyCommand=foo=bar + +Host proxy-without-equal-divisor +ProxyCommand foo=bar:%h-%p +""" + ) + for host, values in { + "proxy-with-equal-divisor-and-space": { + "hostname": "proxy-with-equal-divisor-and-space", + "proxycommand": "foo=bar", + }, + "proxy-with-equal-divisor-and-no-space": { + "hostname": "proxy-with-equal-divisor-and-no-space", + "proxycommand": "foo=bar", + }, + "proxy-without-equal-divisor": { + "hostname": "proxy-without-equal-divisor", + "proxycommand": "foo=bar:proxy-without-equal-divisor-22", + }, + }.items(): + + assert config.lookup(host) == values + + def test_identityfile(self): + config = SSHConfig.from_text( + """ + +IdentityFile id_dsa0 + +Host * +IdentityFile id_dsa1 + +Host dsa2 +IdentityFile id_dsa2 + +Host dsa2* +IdentityFile id_dsa22 +""" + ) + for host, values in { + "foo": {"hostname": "foo", "identityfile": ["id_dsa0", "id_dsa1"]}, + "dsa2": { + "hostname": "dsa2", + "identityfile": ["id_dsa0", "id_dsa1", "id_dsa2", "id_dsa22"], + }, + "dsa22": { + "hostname": "dsa22", + "identityfile": ["id_dsa0", "id_dsa1", "id_dsa22"], + }, + }.items(): + + assert config.lookup(host) == values + + def test_config_addressfamily_and_lazy_fqdn(self): + """ + Ensure the code path honoring non-'all' AddressFamily doesn't asplode + """ + config = SSHConfig.from_text( + """ +AddressFamily inet +IdentityFile something_%l_using_fqdn +""" + ) + assert config.lookup( + "meh" + ) # will die during lookup() if bug regresses + + def test_config_dos_crlf_succeeds(self): + config = SSHConfig.from_text( + """ +Host abcqwerty\r\nHostName 127.0.0.1\r\n +""" + ) + assert config.lookup("abcqwerty")["hostname"] == "127.0.0.1" + + def test_get_hostnames(self): + expected = {"*", "*.example.com", "spoo.example.com"} + assert self.config.get_hostnames() == expected + + def test_quoted_host_names(self): + config = SSHConfig.from_text( + """ +Host "param pam" param "pam" + Port 1111 + +Host "param2" + Port 2222 + +Host param3 parara + Port 3333 +Host param4 "p a r" "p" "par" para + Port 4444 +""" + ) + res = { + "param pam": {"hostname": "param pam", "port": "1111"}, + "param": {"hostname": "param", "port": "1111"}, + "pam": {"hostname": "pam", "port": "1111"}, + "param2": {"hostname": "param2", "port": "2222"}, + "param3": {"hostname": "param3", "port": "3333"}, + "parara": {"hostname": "parara", "port": "3333"}, + "param4": {"hostname": "param4", "port": "4444"}, + "p a r": {"hostname": "p a r", "port": "4444"}, + "p": {"hostname": "p", "port": "4444"}, + "par": {"hostname": "par", "port": "4444"}, + "para": {"hostname": "para", "port": "4444"}, + } + for host, values in res.items(): + assert config.lookup(host) == values -def test_SSHConfig_wildcard_host_dicts_are_SSHConfigDict_instances(): - test_config_file = """\ + def test_quoted_params_in_config(self): + config = SSHConfig.from_text( + """ +Host "param pam" param "pam" + IdentityFile id_rsa + +Host "param2" + IdentityFile "test rsa key" + +Host param3 parara + IdentityFile id_rsa + IdentityFile "test rsa key" +""" + ) + res = { + "param pam": {"hostname": "param pam", "identityfile": ["id_rsa"]}, + "param": {"hostname": "param", "identityfile": ["id_rsa"]}, + "pam": {"hostname": "pam", "identityfile": ["id_rsa"]}, + "param2": {"hostname": "param2", "identityfile": ["test rsa key"]}, + "param3": { + "hostname": "param3", + "identityfile": ["id_rsa", "test rsa key"], + }, + "parara": { + "hostname": "parara", + "identityfile": ["id_rsa", "test rsa key"], + }, + } + for host, values in res.items(): + assert config.lookup(host) == values + + def test_quoted_host_in_config(self): + conf = SSHConfig() + correct_data = { + "param": ["param"], + '"param"': ["param"], + "param pam": ["param", "pam"], + '"param" "pam"': ["param", "pam"], + '"param" pam': ["param", "pam"], + 'param "pam"': ["param", "pam"], + 'param "pam" p': ["param", "pam", "p"], + '"param" pam "p"': ["param", "pam", "p"], + '"pa ram"': ["pa ram"], + '"pa ram" pam': ["pa ram", "pam"], + 'param "p a m"': ["param", "p a m"], + } + incorrect_data = ['param"', '"param', 'param "pam', 'param "pam" "p a'] + for host, values in correct_data.items(): + assert conf._get_hosts(host) == values + for host in incorrect_data: + with raises(ConfigParseError): + conf._get_hosts(host) + + def test_invalid_line_format_excepts(self): + with raises(ConfigParseError): + load_config("invalid") + + def test_proxycommand_none_issue_418(self): + config = SSHConfig.from_text( + """ +Host proxycommand-standard-none + ProxyCommand None + +Host proxycommand-with-equals-none + ProxyCommand=None +""" + ) + for host, values in { + "proxycommand-standard-none": { + "hostname": "proxycommand-standard-none" + }, + "proxycommand-with-equals-none": { + "hostname": "proxycommand-with-equals-none" + }, + }.items(): + + assert config.lookup(host) == values + + def test_proxycommand_none_masking(self): + # Re: https://github.com/paramiko/paramiko/issues/670 + config = SSHConfig.from_text( + """ +Host specific-host + ProxyCommand none + +Host other-host + ProxyCommand other-proxy + +Host * + ProxyCommand default-proxy +""" + ) + # When bug is present, the full stripping-out of specific-host's + # ProxyCommand means it actually appears to pick up the default + # ProxyCommand value instead, due to cascading. It should (for + # backwards compatibility reasons in 1.x/2.x) appear completely blank, + # as if the host had no ProxyCommand whatsoever. + # Threw another unrelated host in there just for sanity reasons. + assert "proxycommand" not in config.lookup("specific-host") + assert config.lookup("other-host")["proxycommand"] == "other-proxy" + cmd = config.lookup("some-random-host")["proxycommand"] + assert cmd == "default-proxy" + + def test_hostname_tokenization(self): + result = load_config("hostname-tokenized").lookup("whatever") + assert result["hostname"] == "prefix.whatever" + + +class TestSSHConfigDict(object): + def test_SSHConfigDict_construct_empty(self): + assert not SSHConfigDict() + + def test_SSHConfigDict_construct_from_list(self): + assert SSHConfigDict([(1, 2)])[1] == 2 + + def test_SSHConfigDict_construct_from_dict(self): + assert SSHConfigDict({1: 2})[1] == 2 + + @mark.parametrize("true_ish", ("yes", "YES", "Yes", True)) + def test_SSHConfigDict_as_bool_true_ish(self, true_ish): + assert SSHConfigDict({"key": true_ish}).as_bool("key") is True + + @mark.parametrize("false_ish", ("no", "NO", "No", False)) + def test_SSHConfigDict_as_bool(self, false_ish): + assert SSHConfigDict({"key": false_ish}).as_bool("key") is False + + @mark.parametrize("int_val", ("42", 42)) + def test_SSHConfigDict_as_int(self, int_val): + assert SSHConfigDict({"key": int_val}).as_int("key") == 42 + + @mark.parametrize("non_int", ("not an int", None, object())) + def test_SSHConfigDict_as_int_failures(self, non_int): + conf = SSHConfigDict({"key": non_int}) + + try: + int(non_int) + except Exception as e: + exception_type = type(e) + + with raises(exception_type): + conf.as_int("key") + + def test_SSHConfig_host_dicts_are_SSHConfigDict_instances(self): + config = SSHConfig.from_text( + """ +Host *.example.com + Port 2222 + +Host * + Port 3333 +""" + ) + assert config.lookup("foo.example.com").as_int("port") == 2222 + + def test_SSHConfig_wildcard_host_dicts_are_SSHConfigDict_instances(self): + config = SSHConfig.from_text( + """ Host *.example.com Port 2222 Host * Port 3333 +""" + ) + assert config.lookup("anything-else").as_int("port") == 3333 + + +class TestHostnameCanonicalization(object): + # NOTE: this class uses on-disk configs, and ones with real (at time of + # writing) DNS names, so that one can easily test OpenSSH's behavior using + # "ssh -F path/to/file.config -G <target>". + + def test_off_by_default(self, socket): + result = load_config("basic").lookup("www") + assert result["hostname"] == "www" + assert "user" not in result + assert not socket.gethostbyname.called + + def test_explicit_no_same_as_default(self, socket): + result = load_config("no-canon").lookup("www") + assert result["hostname"] == "www" + assert "user" not in result + assert not socket.gethostbyname.called + + @mark.parametrize( + "config_name", + ("canon", "canon-always", "canon-local", "canon-local-always"), + ) + def test_canonicalization_base_cases(self, socket, config_name): + result = load_config(config_name).lookup("www") + assert result["hostname"] == "www.paramiko.org" + assert result["user"] == "rando" + socket.gethostbyname.assert_called_once_with("www.paramiko.org") + + def test_uses_getaddrinfo_when_AddressFamily_given(self, socket): + # Undo default 'always fails' mock + socket.getaddrinfo.side_effect = None + socket.getaddrinfo.return_value = [True] # just need 1st value truthy + result = load_config("canon-ipv4").lookup("www") + assert result["hostname"] == "www.paramiko.org" + assert result["user"] == "rando" + assert not socket.gethostbyname.called + gai_args = socket.getaddrinfo.call_args[0] + assert gai_args[0] == "www.paramiko.org" + assert gai_args[2] is socket.AF_INET # Mocked, but, still useful + + @mark.skip + def test_empty_CanonicalDomains_canonicalizes_despite_noop(self, socket): + # Confirmed this is how OpenSSH behaves as well. Bit silly, but. + # TODO: this requires modifying SETTINGS_REGEX, which is a mite scary + # (honestly I'd prefer to move to a real parser lib anyhow) and since + # this is a very dumb corner case, it's marked skip for now. + result = load_config("empty-canon").lookup("www") + assert result["hostname"] == "www" # no paramiko.org + assert "user" not in result # did not discover canonicalized block + + def test_CanonicalDomains_may_be_set_to_space_separated_list(self, socket): + # Test config has a bogus domain, followed by paramiko.org + socket.gethostbyname.side_effect = [socket.gaierror, True] + result = load_config("multi-canon-domains").lookup("www") + assert result["hostname"] == "www.paramiko.org" + assert result["user"] == "rando" + assert [x[0][0] for x in socket.gethostbyname.call_args_list] == [ + "www.not-a-real-tld", + "www.paramiko.org", + ] + + def test_canonicalization_applies_to_single_dot_by_default(self, socket): + result = load_config("deep-canon").lookup("sub.www") + assert result["hostname"] == "sub.www.paramiko.org" + assert result["user"] == "deep" + + def test_canonicalization_not_applied_to_two_dots_by_default(self, socket): + result = load_config("deep-canon").lookup("subber.sub.www") + assert result["hostname"] == "subber.sub.www" + assert "user" not in result + + def test_hostname_depth_controllable_with_max_dots_directive(self, socket): + # This config sets MaxDots of 2, so now canonicalization occurs + result = load_config("deep-canon-maxdots").lookup("subber.sub.www") + assert result["hostname"] == "subber.sub.www.paramiko.org" + assert result["user"] == "deeper" + + def test_max_dots_may_be_zero(self, socket): + result = load_config("zero-maxdots").lookup("sub.www") + assert result["hostname"] == "sub.www" + assert "user" not in result + + def test_fallback_yes_does_not_canonicalize_or_error(self, socket): + socket.gethostbyname.side_effect = socket.gaierror + result = load_config("fallback-yes").lookup("www") + assert result["hostname"] == "www" + assert "user" not in result + + def test_fallback_no_causes_errors_for_unresolvable_names(self, socket): + socket.gethostbyname.side_effect = socket.gaierror + with raises(CouldNotCanonicalize) as info: + load_config("fallback-no").lookup("doesnotexist") + assert str(info.value) == "doesnotexist" + + def test_identityfile_continues_being_appended_to(self, socket): + result = load_config("canon").lookup("www") + assert result["identityfile"] == ["base.key", "canonicalized.key"] + + +@mark.skip +class TestCanonicalizationOfCNAMEs(object): + def test_permitted_cnames_may_be_one_to_one_mapping(self): + # CanonicalizePermittedCNAMEs *.foo.com:*.bar.com + pass + + def test_permitted_cnames_may_be_one_to_many_mapping(self): + # CanonicalizePermittedCNAMEs *.foo.com:*.bar.com,*.biz.com + pass + + def test_permitted_cnames_may_be_many_to_one_mapping(self): + # CanonicalizePermittedCNAMEs *.foo.com,*.bar.com:*.biz.com + pass + + def test_permitted_cnames_may_be_many_to_many_mapping(self): + # CanonicalizePermittedCNAMEs *.foo.com,*.bar.com:*.biz.com,*.baz.com + pass + + def test_permitted_cnames_may_be_multiple_mappings(self): + # CanonicalizePermittedCNAMEs *.foo.com,*.bar.com *.biz.com:*.baz.com + pass + + def test_permitted_cnames_may_be_multiple_complex_mappings(self): + # Same as prev but with multiple patterns on both ends in both args + pass + + +class TestMatchAll(object): + def test_always_matches(self): + result = load_config("match-all").lookup("general") + assert result["user"] == "awesome" + + def test_may_not_mix_with_non_canonical_keywords(self): + for config in ("match-all-and-more", "match-all-and-more-before"): + with raises(ConfigParseError): + load_config(config).lookup("whatever") + + def test_may_come_after_canonical(self, socket): + result = load_config("match-all-after-canonical").lookup("www") + assert result["user"] == "awesome" + + def test_may_not_come_before_canonical(self, socket): + with raises(ConfigParseError): + load_config("match-all-before-canonical") + + def test_after_canonical_not_loaded_when_non_canonicalized(self, socket): + result = load_config("match-canonical-no").lookup("a-host") + assert "user" not in result + + +def _expect(success_on): + """ + Returns a side_effect-friendly Invoke success result for given command(s). + + Ensures that any other commands fail; this is useful for testing 'Match + exec' because it means all other such clauses under test act like no-ops. + + :param success_on: + Single string or list of strings, noting commands that should appear to + succeed. """ - f = StringIO(test_config_file) - config = parse_ssh_config(f) - assert config.lookup("anything-else").as_int("port") == 3333 + if isinstance(success_on, string_types): + success_on = [success_on] + + def inner(command, *args, **kwargs): + # Sanity checking - we always expect that invoke.run is called with + # these. + assert kwargs.get("hide", None) == "stdout" + assert kwargs.get("warn", None) is True + # Fake exit + exit = 0 if command in success_on else 1 + return Result(exited=exit) + + return inner + + +class TestMatchExec(object): + @patch("paramiko.config.invoke", new=None) + @patch("paramiko.config.invoke_import_error", new=ImportError("meh")) + def test_raises_invoke_ImportErrors_at_runtime(self): + # Not an ideal test, but I don't know of a non-bad way to fake out + # module-time ImportErrors. So we mock the symptoms. Meh! + with raises(ImportError) as info: + load_config("match-exec").lookup("oh-noes") + assert str(info.value) == "meh" + + @patch("paramiko.config.invoke.run") + @mark.parametrize( + "cmd,user", + [ + ("unquoted", "rando"), + ("quoted", "benjamin"), + ("quoted spaced", "neil"), + ], + ) + def test_accepts_single_possibly_quoted_argument(self, run, cmd, user): + run.side_effect = _expect(cmd) + result = load_config("match-exec").lookup("whatever") + assert result["user"] == user + + @patch("paramiko.config.invoke.run") + def test_does_not_match_nonzero_exit_codes(self, run): + # Nothing will succeed -> no User ever gets loaded + run.return_value = Result(exited=1) + result = load_config("match-exec").lookup("whatever") + assert "user" not in result + + @patch("paramiko.config.getpass") + @patch("paramiko.config.invoke.run") + def test_tokenizes_argument(self, run, getpass, socket): + socket.gethostname.return_value = "local.fqdn" + getpass.getuser.return_value = "gandalf" + # Actual exec value is "%d %h %L %l %n %p %r %u" + parts = ( + expanduser("~"), + "configured", + "local", + "some.fake.fqdn", + "target", + "22", + "intermediate", + "gandalf", + ) + run.side_effect = _expect(" ".join(parts)) + result = load_config("match-exec").lookup("target") + assert result["port"] == "1337" + + @patch("paramiko.config.invoke.run") + def test_works_with_canonical(self, run, socket): + # Ensure both stanzas' exec components appear to match + run.side_effect = _expect(["uncanonicalized", "canonicalized"]) + result = load_config("match-exec-canonical").lookup("who-cares") + # Prove both config values got loaded up, across the two passes + assert result["user"] == "defenseless" + assert result["port"] == "8007" + + @patch("paramiko.config.invoke.run") + def test_may_be_negated(self, run): + run.side_effect = _expect("this succeeds") + result = load_config("match-exec-negation").lookup("so-confusing") + # If negation did not work, the first of the two Match exec directives + # would have set User to 'nope' (and/or the second would have NOT set + # User to 'yup') + assert result["user"] == "yup" + + def test_requires_an_argument(self): + with raises(ConfigParseError): + load_config("match-exec-no-arg") + + @patch("paramiko.config.invoke.run") + def test_works_with_tokenized_hostname(self, run): + run.side_effect = _expect("ping target") + result = load_config("hostname-exec-tokenized").lookup("target") + assert result["hostname"] == "pingable.target" + + +class TestMatchHost(object): + def test_matches_target_name_when_no_hostname(self): + result = load_config("match-host").lookup("target") + assert result["user"] == "rand" + + def test_matches_hostname_from_global_setting(self): + # Also works for ones set in regular Host stanzas + result = load_config("match-host-name").lookup("anything") + assert result["user"] == "silly" + + def test_matches_hostname_from_earlier_match(self): + # Corner case: one Match matches original host, sets HostName, + # subsequent Match matches the latter. + result = load_config("match-host-from-match").lookup("original-host") + assert result["user"] == "inner" + + def test_may_be_globbed(self): + result = load_config("match-host-glob-list").lookup("whatever") + assert result["user"] == "matrim" + + def test_may_be_comma_separated_list(self): + for target in ("somehost", "someotherhost"): + result = load_config("match-host-glob-list").lookup(target) + assert result["user"] == "thom" + + def test_comma_separated_list_may_have_internal_negation(self): + conf = load_config("match-host-glob-list") + assert conf.lookup("good")["user"] == "perrin" + assert "user" not in conf.lookup("goof") + + def test_matches_canonicalized_name(self, socket): + # Without 'canonical' explicitly declared, mind. + result = load_config("match-host-canonicalized").lookup("www") + assert result["user"] == "rand" + + def test_works_with_canonical_keyword(self, socket): + # NOTE: distinct from 'happens to be canonicalized' above + result = load_config("match-host-canonicalized").lookup("docs") + assert result["user"] == "eric" + + def test_may_be_negated(self): + conf = load_config("match-host-negated") + assert conf.lookup("docs")["user"] == "jeff" + assert "user" not in conf.lookup("www") + + def test_requires_an_argument(self): + with raises(ConfigParseError): + load_config("match-host-no-arg") + + +class TestMatchOriginalHost(object): + def test_matches_target_host_not_hostname(self): + result = load_config("match-orighost").lookup("target") + assert result["hostname"] == "bogus" + assert result["user"] == "tuon" + + def test_matches_target_host_not_canonicalized_name(self, socket): + result = load_config("match-orighost-canonical").lookup("www") + assert result["hostname"] == "www.paramiko.org" + assert result["user"] == "tuon" + + def test_may_be_globbed(self): + result = load_config("match-orighost").lookup("whatever") + assert result["user"] == "matrim" + + def test_may_be_comma_separated_list(self): + for target in ("comma", "separated"): + result = load_config("match-orighost").lookup(target) + assert result["user"] == "chameleon" + + def test_comma_separated_list_may_have_internal_negation(self): + result = load_config("match-orighost").lookup("nope") + assert "user" not in result + + def test_may_be_negated(self): + result = load_config("match-orighost").lookup("docs") + assert result["user"] == "thom" + + def test_requires_an_argument(self): + with raises(ConfigParseError): + load_config("match-orighost-no-arg") + + +class TestMatchUser(object): + def test_matches_configured_username(self): + result = load_config("match-user-explicit").lookup("anything") + assert result["hostname"] == "dumb" + + @patch("paramiko.config.getpass.getuser") + def test_matches_local_username_by_default(self, getuser): + getuser.return_value = "gandalf" + result = load_config("match-user").lookup("anything") + assert result["hostname"] == "gondor" + + @patch("paramiko.config.getpass.getuser") + def test_may_be_globbed(self, getuser): + for user in ("bilbo", "bombadil"): + getuser.return_value = user + result = load_config("match-user").lookup("anything") + assert result["hostname"] == "shire" + + @patch("paramiko.config.getpass.getuser") + def test_may_be_comma_separated_list(self, getuser): + for user in ("aragorn", "frodo"): + getuser.return_value = user + result = load_config("match-user").lookup("anything") + assert result["hostname"] == "moria" + + @patch("paramiko.config.getpass.getuser") + def test_comma_separated_list_may_have_internal_negation(self, getuser): + getuser.return_value = "legolas" + result = load_config("match-user").lookup("anything") + assert "port" not in result + getuser.return_value = "gimli" + result = load_config("match-user").lookup("anything") + assert result["port"] == "7373" + + @patch("paramiko.config.getpass.getuser") + def test_may_be_negated(self, getuser): + getuser.return_value = "saruman" + result = load_config("match-user").lookup("anything") + assert result["hostname"] == "mordor" + + def test_requires_an_argument(self): + with raises(ConfigParseError): + load_config("match-user-no-arg") + + +# NOTE: highly derivative of previous suite due to the former's use of +# localuser fallback. Doesn't seem worth conflating/refactoring right now. +class TestMatchLocalUser(object): + @patch("paramiko.config.getpass.getuser") + def test_matches_local_username(self, getuser): + getuser.return_value = "gandalf" + result = load_config("match-localuser").lookup("anything") + assert result["hostname"] == "gondor" + + @patch("paramiko.config.getpass.getuser") + def test_may_be_globbed(self, getuser): + for user in ("bilbo", "bombadil"): + getuser.return_value = user + result = load_config("match-localuser").lookup("anything") + assert result["hostname"] == "shire" + + @patch("paramiko.config.getpass.getuser") + def test_may_be_comma_separated_list(self, getuser): + for user in ("aragorn", "frodo"): + getuser.return_value = user + result = load_config("match-localuser").lookup("anything") + assert result["hostname"] == "moria" + + @patch("paramiko.config.getpass.getuser") + def test_comma_separated_list_may_have_internal_negation(self, getuser): + getuser.return_value = "legolas" + result = load_config("match-localuser").lookup("anything") + assert "port" not in result + getuser.return_value = "gimli" + result = load_config("match-localuser").lookup("anything") + assert result["port"] == "7373" + + @patch("paramiko.config.getpass.getuser") + def test_may_be_negated(self, getuser): + getuser.return_value = "saruman" + result = load_config("match-localuser").lookup("anything") + assert result["hostname"] == "mordor" + + def test_requires_an_argument(self): + with raises(ConfigParseError): + load_config("match-localuser-no-arg") + + +class TestComplexMatching(object): + # NOTE: this is still a cherry-pick of a few levels of complexity, there's + # no point testing literally all possible combinations. + + def test_originalhost_host(self): + result = load_config("match-complex").lookup("target") + assert result["hostname"] == "bogus" + assert result["user"] == "rand" + + @patch("paramiko.config.getpass.getuser") + def test_originalhost_localuser(self, getuser): + getuser.return_value = "rando" + result = load_config("match-complex").lookup("remote") + assert result["user"] == "calrissian" + + @patch("paramiko.config.getpass.getuser") + def test_everything_but_all(self, getuser): + getuser.return_value = "rando" + result = load_config("match-complex").lookup("www") + assert result["port"] == "7777" + + @patch("paramiko.config.getpass.getuser") + def test_everything_but_all_with_some_negated(self, getuser): + getuser.return_value = "rando" + result = load_config("match-complex").lookup("docs") + assert result["port"] == "1234" + + def test_negated_canonical(self, socket): + # !canonical in a config that is not canonicalized - does match + result = load_config("match-canonical-no").lookup("specific") + assert result["user"] == "overload" + # !canonical in a config that is canonicalized - does NOT match + result = load_config("match-canonical-yes").lookup("www") + assert result["user"] == "hidden" diff --git a/tests/test_ed25519-funky-padding.key b/tests/test_ed25519-funky-padding.key new file mode 100644 index 00000000..f178ca45 --- /dev/null +++ b/tests/test_ed25519-funky-padding.key @@ -0,0 +1,7 @@ +-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW +QyNTUxOQAAACAHzPvYoDSkMVX52/CbA2M2aSBS7R0wt/9b2n5n+osNygAAAJAHZ1meB2dZ +ngAAAAtzc2gtZWQyNTUxOQAAACAHzPvYoDSkMVX52/CbA2M2aSBS7R0wt/9b2n5n+osNyg +AAAEAIyamvYUpzCovQuUtLhz+fwE4qYQo+rTuUVIX4fmTzMAfM+9igNKQxVfnb8JsDYzZp +IFLtHTC3/1vafmf6iw3KAAAADW15IGNvbW1lbnQgaXM= +-----END OPENSSH PRIVATE KEY----- diff --git a/tests/test_ed25519-funky-padding_password.key b/tests/test_ed25519-funky-padding_password.key new file mode 100644 index 00000000..1b135d69 --- /dev/null +++ b/tests/test_ed25519-funky-padding_password.key @@ -0,0 +1,8 @@ +-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAACmFlczI1Ni1jdHIAAAAGYmNyeXB0AAAAGAAAABDo3dGRlE +xKndv32nDnz2mHAAAAEAAAAAEAAAAzAAAAC3NzaC1lZDI1NTE5AAAAIDcAVH8yDxoiqj0O +rX3YTRMsnvJr+XdKJW16YQpxx8UvAAAAoI78IY+u8lYOzxAEO2N8qEVQH8b/m27yQhcSbK +q1RvvuHmql3NoQvjYQe9/om4oqE+uesNRnoQGNplBHCeroD3ZcksXhLGDhwTh577NR+NQ+ +GNYAK5Ex7Va3Xgao5HUYtBQXlXbtzY1Q+71hcOlRVNnLUDvwShdCa9o6ETIOGcZl04fbzv +Z3vC1C68G3+JMNFenAGYU+iQq0XENtpT6xAIU= +-----END OPENSSH PRIVATE KEY----- diff --git a/tests/test_file.py b/tests/test_file.py index fba14b1b..2a3da74b 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -52,7 +52,7 @@ class LoopbackFile(BufferedFile): class BufferedFileTest(unittest.TestCase): - def test_1_simple(self): + def test_simple(self): f = LoopbackFile("r") try: f.write(b"hi") @@ -69,7 +69,7 @@ class BufferedFileTest(unittest.TestCase): pass f.close() - def test_2_readline(self): + def test_readline(self): f = LoopbackFile("r+U") f.write( b"First line.\nSecond line.\r\nThird line.\n" @@ -96,7 +96,7 @@ class BufferedFileTest(unittest.TestCase): self.assertTrue(crlf in f.newlines) self.assertTrue(cr_byte not in f.newlines) - def test_3_lf(self): + def test_lf(self): """ try to trick the linefeed detector. """ @@ -108,7 +108,7 @@ class BufferedFileTest(unittest.TestCase): f.close() self.assertEqual(f.newlines, crlf) - def test_4_write(self): + def test_write(self): """ verify that write buffering is on. """ @@ -120,7 +120,7 @@ class BufferedFileTest(unittest.TestCase): self.assertEqual(f.readline(), "Incomplete line...\n") f.close() - def test_5_flush(self): + def test_flush(self): """ verify that flush will force a write. """ @@ -134,7 +134,7 @@ class BufferedFileTest(unittest.TestCase): self.assertEqual(f.read(3), b"") f.close() - def test_6_buffering(self): + def test_buffering_flushes(self): """ verify that flushing happens automatically on buffer crossing. """ @@ -147,7 +147,7 @@ class BufferedFileTest(unittest.TestCase): self.assertEqual(f.read(20), b"Too small. Enough.") f.close() - def test_7_read_all(self): + def test_read_all(self): """ verify that read(-1) returns everything left in the file. """ @@ -162,30 +162,30 @@ class BufferedFileTest(unittest.TestCase): ) f.close() - def test_8_buffering(self): + def test_buffering_writes(self): """ verify that buffered objects can be written """ if sys.version_info[0] == 2: f = LoopbackFile("r+", 16) - f.write(buffer(b"Too small.")) + f.write(buffer(b"Too small.")) # noqa f.close() - def test_9_readable(self): + def test_readable(self): f = LoopbackFile("r") self.assertTrue(f.readable()) self.assertFalse(f.writable()) self.assertFalse(f.seekable()) f.close() - def test_A_writable(self): + def test_writable(self): f = LoopbackFile("w") self.assertTrue(f.writable()) self.assertFalse(f.readable()) self.assertFalse(f.seekable()) f.close() - def test_B_readinto(self): + def test_readinto(self): data = bytearray(5) f = LoopbackFile("r+") f._write(b"hello") @@ -215,7 +215,7 @@ class BufferedFileTest(unittest.TestCase): offsets = range(0, len(data), 8) with LoopbackFile("rb+") as f: for offset in offsets: - f.write(buffer(data, offset, 8)) + f.write(buffer(data, offset, 8)) # noqa self.assertEqual(f.read(), data) @needs_builtin("memoryview") diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py index 3e8c39e8..30ffb56d 100644 --- a/tests/test_gssapi.py +++ b/tests/test_gssapi.py @@ -22,22 +22,23 @@ Test the used APIs for GSS-API / SSPI authentication """ -import unittest import socket -from .util import needs_gssapi +from .util import needs_gssapi, KerberosTestCase, update_env @needs_gssapi -class GSSAPITest(unittest.TestCase): - def setup(): +class GSSAPITest(KerberosTestCase): + def setUp(self): + super(GSSAPITest, self).setUp() # TODO: these vars should all come from os.environ or whatever the # approved pytest method is for runtime-configuring test data. self.krb5_mech = "1.2.840.113554.1.2.2" - self.targ_name = "hostname" + self.targ_name = self.realm.hostname self.server_mode = False + update_env(self, self.realm.env) - def test_1_pyasn1(self): + def test_pyasn1(self): """ Test the used methods of pyasn1. """ @@ -48,13 +49,20 @@ class GSSAPITest(unittest.TestCase): mech, __ = decoder.decode(oid) self.assertEquals(self.krb5_mech, mech.__str__()) - def test_2_gssapi_sspi(self): + def _gssapi_sspi_test(self): """ Test the used methods of python-gssapi or sspi, sspicon from pywin32. """ - _API = "MIT" try: import gssapi + + if ( + hasattr(gssapi, "__title__") + and gssapi.__title__ == "python-gssapi" + ): + _API = "PYTHON-GSSAPI-OLD" + else: + _API = "PYTHON-GSSAPI-NEW" except ImportError: import sspicon import sspi @@ -65,7 +73,7 @@ class GSSAPITest(unittest.TestCase): gss_ctxt_status = False mic_msg = b"G'day Mate!" - if _API == "MIT": + if _API == "PYTHON-GSSAPI-OLD": if self.server_mode: gss_flags = ( gssapi.C_PROT_READY_FLAG, @@ -113,6 +121,56 @@ class GSSAPITest(unittest.TestCase): # Check MIC status = gss_srv_ctxt.verify_mic(mic_msg, mic_token) self.assertEquals(0, status) + elif _API == "PYTHON-GSSAPI-NEW": + if self.server_mode: + gss_flags = ( + gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.mutual_authentication, + gssapi.RequirementFlag.delegate_to_peer, + ) + else: + gss_flags = ( + gssapi.RequirementFlag.protection_ready, + gssapi.RequirementFlag.integrity, + gssapi.RequirementFlag.delegate_to_peer, + ) + # Initialize a GSS-API context. + krb5_oid = gssapi.MechType.kerberos + target_name = gssapi.Name( + "host@" + self.targ_name, + name_type=gssapi.NameType.hostbased_service, + ) + gss_ctxt = gssapi.SecurityContext( + name=target_name, + flags=gss_flags, + mech=krb5_oid, + usage="initiate", + ) + if self.server_mode: + c_token = gss_ctxt.step(c_token) + gss_ctxt_status = gss_ctxt.complete + self.assertEquals(False, gss_ctxt_status) + # Accept a GSS-API context. + gss_srv_ctxt = gssapi.SecurityContext(usage="accept") + s_token = gss_srv_ctxt.step(c_token) + gss_ctxt_status = gss_srv_ctxt.complete + self.assertNotEquals(None, s_token) + self.assertEquals(True, gss_ctxt_status) + # Establish the client context + c_token = gss_ctxt.step(s_token) + self.assertEquals(None, c_token) + else: + while not gss_ctxt.complete: + c_token = gss_ctxt.step(c_token) + self.assertNotEquals(None, c_token) + # Build MIC + mic_token = gss_ctxt.get_signature(mic_msg) + + if self.server_mode: + # Check MIC + status = gss_srv_ctxt.verify_signature(mic_msg, mic_token) + self.assertEquals(0, status) else: gss_flags = ( sspicon.ISC_REQ_INTEGRITY @@ -145,3 +203,16 @@ class GSSAPITest(unittest.TestCase): error, token = gss_ctxt.authorize(c_token) c_token = token[0].Buffer self.assertNotEquals(0, error) + + def test_2_gssapi_sspi_client(self): + """ + Test the used methods of python-gssapi or sspi, sspicon from pywin32. + """ + self._gssapi_sspi_test() + + def test_3_gssapi_sspi_server(self): + """ + Test the used methods of python-gssapi or sspi, sspicon from pywin32. + """ + self.server_mode = True + self._gssapi_sspi_test() diff --git a/tests/test_hostkeys.py b/tests/test_hostkeys.py index 295153dd..da47362c 100644 --- a/tests/test_hostkeys.py +++ b/tests/test_hostkeys.py @@ -62,7 +62,7 @@ class HostKeysTest(unittest.TestCase): def tearDown(self): os.unlink("hostfile.temp") - def test_1_load(self): + def test_load(self): hostdict = paramiko.HostKeys("hostfile.temp") self.assertEqual(2, len(hostdict)) self.assertEqual(1, len(list(hostdict.values())[0])) @@ -72,7 +72,7 @@ class HostKeysTest(unittest.TestCase): ).upper() self.assertEqual(b"E6684DB30E109B67B70FF1DC5C7F1363", fp) - def test_2_add(self): + def test_add(self): hostdict = paramiko.HostKeys("hostfile.temp") hh = "|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c=" key = paramiko.RSAKey(data=decodebytes(keyblob)) @@ -83,7 +83,7 @@ class HostKeysTest(unittest.TestCase): self.assertEqual(b"7EC91BB336CB6D810B124B1353C32396", fp) self.assertTrue(hostdict.check("foo.example.com", key)) - def test_3_dict(self): + def test_dict(self): hostdict = paramiko.HostKeys("hostfile.temp") self.assertTrue("secure.example.com" in hostdict) self.assertTrue("not.example.com" not in hostdict) @@ -98,7 +98,7 @@ class HostKeysTest(unittest.TestCase): i += 1 self.assertEqual(2, i) - def test_4_dict_set(self): + def test_dict_set(self): hostdict = paramiko.HostKeys("hostfile.temp") key = paramiko.RSAKey(data=decodebytes(keyblob)) key_dss = paramiko.DSSKey(data=decodebytes(keyblob_dss)) @@ -122,10 +122,10 @@ class HostKeysTest(unittest.TestCase): def test_delitem(self): hostdict = paramiko.HostKeys("hostfile.temp") target = "happy.example.com" - entry = hostdict[target] # will KeyError if not present + hostdict[target] # will KeyError if not present del hostdict[target] try: - entry = hostdict[target] + hostdict[target] except KeyError: pass # Good else: diff --git a/tests/test_kex.py b/tests/test_kex.py index 62512beb..0244ae84 100644 --- a/tests/test_kex.py +++ b/tests/test_kex.py @@ -24,15 +24,26 @@ from binascii import hexlify, unhexlify import os import unittest +from mock import Mock, patch +import pytest + from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import ec +try: + from cryptography.hazmat.primitives.asymmetric import x25519 +except ImportError: + x25519 = None + import paramiko.util from paramiko.kex_group1 import KexGroup1 +from paramiko.kex_group14 import KexGroup14SHA256 from paramiko.kex_gex import KexGex, KexGexSHA256 from paramiko import Message from paramiko.common import byte_chr from paramiko.kex_ecdh_nist import KexNistp256 +from paramiko.kex_group16 import KexGroup16SHA512 +from paramiko.kex_curve25519 import KexCurve25519 def dummy_urandom(n): @@ -40,22 +51,22 @@ def dummy_urandom(n): def dummy_generate_key_pair(obj): - private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037 - public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989" - public_key_numbers_obj = ec.EllipticCurvePublicNumbers.from_encoded_point( + private_key_value = 94761803665136558137557783047955027733968423115106677159790289642479432803037 # noqa + public_key_numbers = "042bdab212fa8ba1b7c843301682a4db424d307246c7e1e6083c41d9ca7b098bf30b3d63e2ec6278488c135360456cc054b3444ecc45998c08894cbc1370f5f989" # noqa + public_key_numbers_obj = ec.EllipticCurvePublicKey.from_encoded_point( ec.SECP256R1(), unhexlify(public_key_numbers) - ) + ).public_numbers() obj.P = ec.EllipticCurvePrivateNumbers( private_value=private_key_value, public_numbers=public_key_numbers_obj ).private_key(default_backend()) if obj.transport.server_mode: - obj.Q_S = ec.EllipticCurvePublicNumbers.from_encoded_point( + obj.Q_S = ec.EllipticCurvePublicKey.from_encoded_point( ec.SECP256R1(), unhexlify(public_key_numbers) - ).public_key(default_backend()) + ) return - obj.Q_C = ec.EllipticCurvePublicNumbers.from_encoded_point( + obj.Q_C = ec.EllipticCurvePublicKey.from_encoded_point( ec.SECP256R1(), unhexlify(public_key_numbers) - ).public_key(default_backend()) + ) class FakeKey(object): @@ -70,7 +81,7 @@ class FakeKey(object): class FakeModulusPack(object): - P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF + P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF # noqa G = 2 def get_modulus(self, min, ask, max): @@ -111,7 +122,7 @@ class FakeTransport(object): class KexTest(unittest.TestCase): - K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504 + K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504 # noqa def setUp(self): self._original_urandom = os.urandom @@ -119,16 +130,32 @@ class KexTest(unittest.TestCase): self._original_generate_key_pair = KexNistp256._generate_key_pair KexNistp256._generate_key_pair = dummy_generate_key_pair + if KexCurve25519.is_available(): + static_x25519_key = x25519.X25519PrivateKey.from_private_bytes( + unhexlify( + b"2184abc7eb3e656d2349d2470ee695b570c227340c2b2863b6c9ff427af1f040" # noqa + ) + ) + mock_x25519 = Mock() + mock_x25519.generate.return_value = static_x25519_key + patcher = patch( + "paramiko.kex_curve25519.X25519PrivateKey", mock_x25519 + ) + patcher.start() + self.x25519_patcher = patcher + def tearDown(self): os.urandom = self._original_urandom KexNistp256._generate_key_pair = self._original_generate_key_pair + if hasattr(self, "x25519_patcher"): + self.x25519_patcher.stop() - def test_1_group1_client(self): + def test_group1_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGroup1(transport) kex.start_kex() - x = b"1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" + x = b"1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect @@ -147,7 +174,7 @@ class KexTest(unittest.TestCase): self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) - def test_2_group1_server(self): + def test_group1_server(self): transport = FakeTransport() transport.server_mode = True kex = KexGroup1(transport) @@ -161,13 +188,13 @@ class KexTest(unittest.TestCase): msg.rewind() kex.parse_next(paramiko.kex_group1._MSG_KEXDH_INIT, msg) H = b"B16BF34DD10945EDE84E9C1EF24A14BFDC843389" - x = b"1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" + x = b"1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(self.K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) - def test_3_gex_client(self): + def test_gex_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGex(transport) @@ -183,7 +210,7 @@ class KexTest(unittest.TestCase): msg.add_mpint(FakeModulusPack.G) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) - x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" + x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect @@ -201,7 +228,7 @@ class KexTest(unittest.TestCase): self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) - def test_4_gex_old_client(self): + def test_gex_old_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGex(transport) @@ -217,7 +244,7 @@ class KexTest(unittest.TestCase): msg.add_mpint(FakeModulusPack.G) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) - x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" + x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect @@ -235,7 +262,7 @@ class KexTest(unittest.TestCase): self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) - def test_5_gex_server(self): + def test_gex_server(self): transport = FakeTransport() transport.server_mode = True kex = KexGex(transport) @@ -254,7 +281,7 @@ class KexTest(unittest.TestCase): msg.add_int(4096) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg) - x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" + x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect @@ -264,15 +291,15 @@ class KexTest(unittest.TestCase): msg.add_mpint(12345) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) - K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 + K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa H = b"CE754197C21BF3452863B4F44D0B3951F12516EF" - x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" + x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) - def test_6_gex_server_with_old_client(self): + def test_gex_server_with_old_client(self): transport = FakeTransport() transport.server_mode = True kex = KexGex(transport) @@ -289,7 +316,7 @@ class KexTest(unittest.TestCase): msg.add_int(2048) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg) - x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" + x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect @@ -299,15 +326,15 @@ class KexTest(unittest.TestCase): msg.add_mpint(12345) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) - K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 + K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa H = b"B41A06B2E59043CEFC1AE16EC31F1E2D12EC455B" - x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" + x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) - def test_7_gex_sha256_client(self): + def test_gex_sha256_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGexSHA256(transport) @@ -323,7 +350,7 @@ class KexTest(unittest.TestCase): msg.add_mpint(FakeModulusPack.G) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) - x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" + x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect @@ -341,7 +368,7 @@ class KexTest(unittest.TestCase): self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) - def test_8_gex_sha256_old_client(self): + def test_gex_sha256_old_client(self): transport = FakeTransport() transport.server_mode = False kex = KexGexSHA256(transport) @@ -357,7 +384,7 @@ class KexTest(unittest.TestCase): msg.add_mpint(FakeModulusPack.G) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) - x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" + x = b"20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect @@ -375,7 +402,7 @@ class KexTest(unittest.TestCase): self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) - def test_9_gex_sha256_server(self): + def test_gex_sha256_server(self): transport = FakeTransport() transport.server_mode = True kex = KexGexSHA256(transport) @@ -394,7 +421,7 @@ class KexTest(unittest.TestCase): msg.add_int(4096) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg) - x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" + x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect @@ -404,15 +431,15 @@ class KexTest(unittest.TestCase): msg.add_mpint(12345) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) - K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 + K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa H = b"CCAC0497CF0ABA1DBF55E1A3995D17F4CC31824B0E8D95CDF8A06F169D050D80" - x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" + x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) - def test_10_gex_sha256_server_with_old_client(self): + def test_gex_sha256_server_with_old_client(self): transport = FakeTransport() transport.server_mode = True kex = KexGexSHA256(transport) @@ -429,7 +456,7 @@ class KexTest(unittest.TestCase): msg.add_int(2048) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg) - x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" + x = b"1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102" # noqa self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertEqual( (paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect @@ -439,16 +466,16 @@ class KexTest(unittest.TestCase): msg.add_mpint(12345) msg.rewind() kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) - K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 + K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581 # noqa H = b"3DDD2AD840AD095E397BA4D0573972DC60F6461FD38A187CACA6615A5BC8ADBB" - x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" + x = b"210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967" # noqa self.assertEqual(K, transport._K) self.assertEqual(H, hexlify(transport._H).upper()) self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) self.assertTrue(transport._activated) - def test_11_kex_nistp256_client(self): - K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 + def test_kex_nistp256_client(self): + K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 # noqa transport = FakeTransport() transport.server_mode = False kex = KexNistp256(transport) @@ -461,7 +488,7 @@ class KexTest(unittest.TestCase): msg = Message() msg.add_string("fake-host-key") Q_S = unhexlify( - "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" + "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" # noqa ) msg.add_string(Q_S) msg.add_string("fake-sig") @@ -473,8 +500,8 @@ class KexTest(unittest.TestCase): self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) self.assertTrue(transport._activated) - def test_12_kex_nistp256_server(self): - K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 + def test_kex_nistp256_server(self): + K = 91610929826364598472338906427792435253694642563583721654249504912114314269754 # noqa transport = FakeTransport() transport.server_mode = True kex = KexNistp256(transport) @@ -486,7 +513,7 @@ class KexTest(unittest.TestCase): # fake init msg = Message() Q_C = unhexlify( - "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" + "043ae159594ba062efa121480e9ef136203fa9ec6b6e1f8723a321c16e62b945f573f3b822258cbcd094b9fa1c125cbfe5f043280893e66863cc0cb4dccbe70210" # noqa ) H = b"2EF4957AFD530DD3F05DBEABF68D724FACC060974DA9704F2AEE4C3DE861E7CA" msg.add_string(Q_C) @@ -495,3 +522,104 @@ class KexTest(unittest.TestCase): self.assertEqual(K, transport._K) self.assertTrue(transport._activated) self.assertEqual(H, hexlify(transport._H).upper()) + + def test_kex_group14_sha256_client(self): + transport = FakeTransport() + transport.server_mode = False + kex = KexGroup14SHA256(transport) + kex.start_kex() + x = b"1E00000101009850B3A8DE3ECCD3F19644139137C93D9C11BC28ED8BE850908EE294E1D43B88B9295311EFAEF5B736A1B652EBE184CCF36CFB0681C1ED66430088FA448B83619F928E7B9592ED6160EC11D639D51C303603F930F743C646B1B67DA38A1D44598DCE6C3F3019422B898044141420E9A10C29B9C58668F7F20A40F154B2C4768FCF7A9AA7179FB6366A7167EE26DD58963E8B880A0572F641DE0A73DC74C930F7C3A0C9388553F3F8403E40CF8B95FEDB1D366596FCF3FDDEB21A0005ADA650EF1733628D807BE5ACB83925462765D9076570056E39994FB328E3108FE406275758D6BF5F32790EF15D8416BF5548164859E785DB45E7787BB0E727ADE08641ED" # noqa + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual( + (paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect + ) + + # fake "reply" + msg = Message() + msg.add_string("fake-host-key") + msg.add_mpint(69) + msg.add_string("fake-sig") + msg.rewind() + kex.parse_next(paramiko.kex_group1._MSG_KEXDH_REPLY, msg) + K = 21526936926159575624241589599003964979640840086252478029709904308461709651400109485351462666820496096345766733042945918306284902585618061272525323382142547359684512114160415969631877620660064043178086464811345023251493620331559440565662862858765724251890489795332144543057725932216208403143759943169004775947331771556537814494448612329251887435553890674764339328444948425882382475260315505741818518926349729970262019325118040559191290279100613049085709127598666890434114956464502529053036826173452792849566280474995114751780998069614898221773345705289637708545219204637224261997310181473787577166103031529148842107599 # noqa + H = b"D007C23686BE8A7737F828DC9E899F8EB5AF423F495F138437BE2529C1B8455F" + self.assertEqual(K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) + self.assertTrue(transport._activated) + + def test_kex_group16_sha512_client(self): + transport = FakeTransport() + transport.server_mode = False + kex = KexGroup16SHA512(transport) + kex.start_kex() + x = b"1E0000020100859FF55A23E0F66463561DD8BFC4764C69C05F85665B06EC9E29EF5003A53A8FA890B6A6EB624DEB55A4FB279DE7010A53580A126817E3D235B05A1081662B1500961D0625F0AAD287F1B597CBA9DB9550D9CC26355C4C59F92E613B5C21AC191F152C09A5DB46DCBA5EA58E3CA6A8B0EB7183E27FAC10106022E8521FA91240FB389060F1E1E4A355049D29DCC82921CE6588791743E4B1DEEE0166F7CC5180C3C75F3773342DF95C8C10AAA5D12975257027936B99B3DED6E6E98CF27EADEAEAE04E7F0A28071F578646B985FCE28A59CEB36287CB65759BE0544D4C4018CDF03C9078FE9CA79ECA611CB6966899E6FD29BE0781491C659FE2380E0D99D50D9CFAAB94E61BE311779719C4C43C6D223AD3799C3915A9E55076A21152DBBF911D6594296D6ECDC1B6FA71997CD29DF987B80FCA7F36BB7F19863C72BBBF839746AFBF9A5B407D468C976AA3E36FA118D3EAAD2E08BF6AE219F81F2CE2BE946337F06CC09BBFABE938A4087E413921CBEC1965ED905999B83396ECA226110CDF6EFB80F815F6489AF87561DA3857F13A7705921306D94176231FBB336B17C3724BC17A28BECB910093AB040873D5D760E8C182B88ECCE3E38DDA68CE35BD152DF7550BD908791FCCEDD1FFDF5ED2A57FFAE79599E487A7726D8A3D950B1729A08FBB60EE462A6BBE8BF0F5F0E1358129A37840FE5B3EEB8BF26E99FA222EAE83" # noqa + self.assertEqual(x, hexlify(transport._message.asbytes()).upper()) + self.assertEqual( + (paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect + ) + + # fake "reply" + msg = Message() + msg.add_string("fake-host-key") + msg.add_mpint(69) + msg.add_string("fake-sig") + msg.rewind() + kex.parse_next(paramiko.kex_group1._MSG_KEXDH_REPLY, msg) + K = 933242830095376162107925500057692534838883186615567574891154103836907630698358649443101764908667358576734565553213003142941996368306996312915844839972197961603283544950658467545799914435739152351344917376359963584614213874232577733869049670230112638724993540996854599166318001059065780674008011575015459772051180901213815080343343801745386220342919837913506966863570473712948197760657442974564354432738520446202131551650771882909329069340612274196233658123593466135642819578182367229641847749149740891990379052266213711500434128970973602206842980669193719602075489724202241641553472106310932258574377789863734311328542715212248147206865762697424822447603031087553480483833829498375309975229907460562402877655519980113688369262871485777790149373908739910846630414678346163764464587129010141922982925829457954376352735653834300282864445132624993186496129911208133529828461690634463092007726349795944930302881758403402084584307180896465875803621285362317770276493727205689466142632599776710824902573926951951209239626732358074877997756011804454926541386215567756538832824717436605031489511654178384081883801272314328403020205577714999460724519735573055540814037716770051316113795603990199374791348798218428912977728347485489266146775472 # noqa + H = b"F6E2BCC846B9B62591EFB86663D55D4769CA06B2EDABE469DF831639B2DDD5A271985011900A724CB2C87F19F347B3632A7C1536AF3D12EE463E6EA75281AF0C" # noqa + self.assertEqual(K, transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) + self.assertTrue(transport._activated) + + @pytest.mark.skipif("not KexCurve25519.is_available()") + def test_kex_c25519_client(self): + K = 71294722834835117201316639182051104803802881348227506835068888449366462300724 # noqa + transport = FakeTransport() + transport.server_mode = False + kex = KexCurve25519(transport) + kex.start_kex() + self.assertEqual( + (paramiko.kex_curve25519._MSG_KEXECDH_REPLY,), transport._expect + ) + + # fake reply + msg = Message() + msg.add_string("fake-host-key") + Q_S = unhexlify( + "8d13a119452382a1ada8eea4c979f3e63ad3f0c7366786d6c5b54b87219bae49" + ) + msg.add_string(Q_S) + msg.add_string("fake-sig") + msg.rewind() + kex.parse_next(paramiko.kex_curve25519._MSG_KEXECDH_REPLY, msg) + H = b"05B6F6437C0CF38D1A6C5A6F6E2558DEB54E7FC62447EBFB1E5D7407326A5475" + self.assertEqual(K, kex.transport._K) + self.assertEqual(H, hexlify(transport._H).upper()) + self.assertEqual((b"fake-host-key", b"fake-sig"), transport._verify) + self.assertTrue(transport._activated) + + @pytest.mark.skipif("not KexCurve25519.is_available()") + def test_kex_c25519_server(self): + K = 71294722834835117201316639182051104803802881348227506835068888449366462300724 # noqa + transport = FakeTransport() + transport.server_mode = True + kex = KexCurve25519(transport) + kex.start_kex() + self.assertEqual( + (paramiko.kex_curve25519._MSG_KEXECDH_INIT,), transport._expect + ) + + # fake init + msg = Message() + Q_C = unhexlify( + "8d13a119452382a1ada8eea4c979f3e63ad3f0c7366786d6c5b54b87219bae49" + ) + H = b"DF08FCFCF31560FEE639D9B6D56D760BC3455B5ADA148E4514181023E7A9B042" + msg.add_string(Q_C) + msg.rewind() + kex.parse_next(paramiko.kex_curve25519._MSG_KEXECDH_INIT, msg) + self.assertEqual(K, transport._K) + self.assertTrue(transport._activated) + self.assertEqual(H, hexlify(transport._H).upper()) diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py index c71ff91c..6f5625dc 100644 --- a/tests/test_kex_gss.py +++ b/tests/test_kex_gss.py @@ -31,7 +31,7 @@ import unittest import paramiko -from .util import needs_gssapi +from .util import needs_gssapi, KerberosTestCase, update_env class NullServer(paramiko.ServerInterface): @@ -53,27 +53,22 @@ class NullServer(paramiko.ServerInterface): return paramiko.OPEN_SUCCEEDED def check_channel_exec_request(self, channel, command): - if command != "yes": + if command != b"yes": return False return True @needs_gssapi -class GSSKexTest(unittest.TestCase): - @staticmethod - def init(username, hostname): - global krb5_principal, targ_name - krb5_principal = username - targ_name = hostname - +class GSSKexTest(KerberosTestCase): def setUp(self): - self.username = krb5_principal - self.hostname = socket.getfqdn(targ_name) + self.username = self.realm.user_princ + self.hostname = socket.getfqdn(self.realm.hostname) self.sockl = socket.socket() - self.sockl.bind((targ_name, 0)) + self.sockl.bind((self.realm.hostname, 0)) self.sockl.listen(1) self.addr, self.port = self.sockl.getsockname() self.event = threading.Event() + update_env(self, self.realm.env) thread = threading.Thread(target=self._run) thread.start() @@ -87,7 +82,7 @@ class GSSKexTest(unittest.TestCase): self.ts = paramiko.Transport(self.socks, gss_kex=True) host_key = paramiko.RSAKey.from_private_key_file("tests/test_rsa.key") self.ts.add_server_key(host_key) - self.ts.set_gss_host(targ_name) + self.ts.set_gss_host(self.realm.hostname) try: self.ts.load_server_moduli() except: @@ -142,7 +137,7 @@ class GSSKexTest(unittest.TestCase): stdout.close() stderr.close() - def test_1_gsskex_and_auth(self): + def test_gsskex_and_auth(self): """ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated Diffie-Hellman Key Exchange and user authentication with the GSS-API @@ -150,7 +145,9 @@ class GSSKexTest(unittest.TestCase): """ self._test_gsskex_and_auth(gss_host=None) - def test_2_gsskex_and_auth_rekey(self): + # To be investigated, see https://github.com/paramiko/paramiko/issues/1312 + @unittest.expectedFailure + def test_gsskex_and_auth_rekey(self): """ Verify that Paramiko can rekey. """ diff --git a/tests/test_message.py b/tests/test_message.py index b843a705..57766d90 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -29,14 +29,14 @@ from paramiko.common import byte_chr, zero_byte class MessageTest(unittest.TestCase): __a = ( - b"\x00\x00\x00\x17\x07\x60\xe0\x90\x00\x00\x00\x01\x71\x00\x00\x00\x05\x68\x65\x6c\x6c\x6f\x00\x00\x03\xe8" + b"\x00\x00\x00\x17\x07\x60\xe0\x90\x00\x00\x00\x01\x71\x00\x00\x00\x05\x68\x65\x6c\x6c\x6f\x00\x00\x03\xe8" # noqa + b"x" * 1000 ) - __b = b"\x01\x00\xf3\x00\x3f\x00\x00\x00\x10\x68\x75\x65\x79\x2c\x64\x65\x77\x65\x79\x2c\x6c\x6f\x75\x69\x65" - __c = b"\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x01\x11\x00\x00\x00\x07\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x06\x9a\x1b\x2c\x3d\x4e\xf7" - __d = b"\x00\x00\x00\x05\xff\x00\x00\x00\x05\x11\x22\x33\x44\x55\xff\x00\x00\x00\x0a\x00\xf0\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03\x63\x61\x74\x00\x00\x00\x03\x61\x2c\x62" + __b = b"\x01\x00\xf3\x00\x3f\x00\x00\x00\x10\x68\x75\x65\x79\x2c\x64\x65\x77\x65\x79\x2c\x6c\x6f\x75\x69\x65" # noqa + __c = b"\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x01\x11\x00\x00\x00\x07\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x06\x9a\x1b\x2c\x3d\x4e\xf7" # noqa + __d = b"\x00\x00\x00\x05\xff\x00\x00\x00\x05\x11\x22\x33\x44\x55\xff\x00\x00\x00\x0a\x00\xf0\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03\x63\x61\x74\x00\x00\x00\x03\x61\x2c\x62" # noqa - def test_1_encode(self): + def test_encode(self): msg = Message() msg.add_int(23) msg.add_int(123789456) @@ -62,7 +62,7 @@ class MessageTest(unittest.TestCase): msg.add_mpint(-0x65e4d3c2b109) self.assertEqual(msg.asbytes(), self.__c) - def test_2_decode(self): + def test_decode(self): msg = Message(self.__a) self.assertEqual(msg.get_int(), 23) self.assertEqual(msg.get_int(), 123789456) @@ -84,7 +84,7 @@ class MessageTest(unittest.TestCase): self.assertEqual(msg.get_mpint(), 0xf5e4d3c2b109) self.assertEqual(msg.get_mpint(), -0x65e4d3c2b109) - def test_3_add(self): + def test_add(self): msg = Message() msg.add(5) msg.add(0x1122334455) @@ -94,7 +94,7 @@ class MessageTest(unittest.TestCase): msg.add(["a", "b"]) self.assertEqual(msg.asbytes(), self.__d) - def test_4_misc(self): + def test_misc(self): msg = Message(self.__d) self.assertEqual(msg.get_adaptive_int(), 5) self.assertEqual(msg.get_adaptive_int(), 0x1122334455) diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py index 6920f08e..de80770e 100644 --- a/tests/test_packetizer.py +++ b/tests/test_packetizer.py @@ -38,7 +38,7 @@ x1f = byte_chr(0x1f) class PacketizerTest(unittest.TestCase): - def test_1_write(self): + def test_write(self): rsock = LoopSocket() wsock = LoopSocket() rsock.link(wsock) @@ -64,11 +64,11 @@ class PacketizerTest(unittest.TestCase): # 32 + 12 bytes of MAC = 44 self.assertEqual(44, len(data)) self.assertEqual( - b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0", + b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0", # noqa data[:16], ) - def test_2_read(self): + def test_read(self): rsock = LoopSocket() wsock = LoopSocket() rsock.link(wsock) @@ -82,7 +82,7 @@ class PacketizerTest(unittest.TestCase): ).decryptor() p.set_inbound_cipher(decryptor, 16, sha1, 12, x1f * 20) wsock.send( - b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0\x90\xd2\x16\x56\x0d\x71\x73\x61\x38\x7c\x4c\x3d\xfb\x97\x7d\xe2\x6e\x03\xb1\xa0\xc2\x1c\xd6\x41\x41\x4c\xb4\x59" + b"\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0\x90\xd2\x16\x56\x0d\x71\x73\x61\x38\x7c\x4c\x3d\xfb\x97\x7d\xe2\x6e\x03\xb1\xa0\xc2\x1c\xd6\x41\x41\x4c\xb4\x59" # noqa ) cmd, m = p.read_message() self.assertEqual(100, cmd) @@ -90,7 +90,7 @@ class PacketizerTest(unittest.TestCase): self.assertEqual(1, m.get_int()) self.assertEqual(900, m.get_int()) - def test_3_closed(self): + def test_closed(self): if sys.platform.startswith("win"): # no SIGALRM on windows return rsock = LoopSocket() diff --git a/tests/test_pkey.py b/tests/test_pkey.py index 38c1fbef..a01acf6f 100644 --- a/tests/test_pkey.py +++ b/tests/test_pkey.py @@ -33,20 +33,20 @@ from .util import _support # from openssh's ssh-keygen -PUB_RSA = "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c=" -PUB_DSS = "ssh-dss AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgwj/oLjYbVtjAy6pl/h0EKCWx2rf1IetyNsTxWrniA9I6HeDj65X1FyDkg6g8tvCnaNB8Xp/UUhuzHuGsMIipRxBxw9LF608EqZcj1E3ytktoW5B5OcjrkEoz3xG7C+rpIjYvAAAAFQDwz4UnmsGiSNu5iqjn3uTzwUpshwAAAIEAkxfFeY8P2wZpDjX0MimZl5wkoFQDL25cPzGBuB4OnB8NoUk/yjAHIIpEShw8V+LzouMK5CTJQo5+Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg4Ok10+XFDxlqZo8Y+wAAACARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmnjO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacIBlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgE=" -PUB_ECDSA_256 = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJSPZm3ZWkvk/Zx8WP+fZRZ5/NBBHnGQwR6uIC6XHGPDIHuWUzIjAwA0bzqkOUffEsbLe+uQgKl5kbc/L8KA/eo=" -PUB_ECDSA_384 = "ecdsa-sha2-nistp384 AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBBbGibQLW9AAZiGN2hEQxWYYoFaWKwN3PKSaDJSMqmIn1Z9sgRUuw8Y/w502OGvXL/wFk0i2z50l3pWZjD7gfMH7gX5TUiCzwrQkS+Hn1U2S9aF5WJp0NcIzYxXw2r4M2A==" -PUB_ECDSA_521 = "ecdsa-sha2-nistp521 AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBACaOaFLZGuxa5AW16qj6VLypFbLrEWrt9AZUloCMefxO8bNLjK/O5g0rAVasar1TnyHE9qj4NwzANZASWjQNbc4MAG8vzqezFwLIn/kNyNTsXNfqEko9OgHZknlj2Z79dwTJcRAL4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA==" -PUB_RSA_2K_OPENSSH = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDF+Dpr54DX0WdeTDpNAMdkCWEkl3OXtNgf58qlN1gX572OLBqLf0zT4bHstUEpU3piazph/rSWcUMuBoD46tZ6jiH7H9b9Pem2eYQWaELDDkM+v9BMbEy5rMbFRLol5OtEvPFqneyEAanPOgvd8t3yyhSev9QVusakzJ8j8LGgrA8huYZ+Srnw0shEWLG70KUKCh3rG0QIvA8nfhtUOisr2Gp+F0YxMGb5gwBlQYAYE5l6u1SjZ7hNjyNosjK+wRBFgFFBYVpkZKJgWoK9w4ijFyzMZTucnZMqKOKAjIJvHfKBf2/cEfYxSq1EndqTqjYsd9T7/s2vcn1OH5a0wkER" -PUB_DSS_1K_OPENSSH = "ssh-dss AAAAB3NzaC1kc3MAAACBAL8XEx7F9xuwBNles+vWpNF+YcofrBhjX1r5QhpBe0eoYWLHRcroN6lxwCdGYRfgOoRjTncBiixQX/uUxAY96zDh3ir492s2BcJt4ihvNn/AY0I0OTuX/2IwGk9CGzafjaeZNVYxMa8lcVt0hSOTjkPQ7gVuk6bJzMInvie+VWKLAAAAFQDUgYdY+rhR0SkKbC09BS/SIHcB+wAAAIB44+4zpCNcd0CGvZlowH99zyPX8uxQtmTLQFuR2O8O0FgVVuCdDgD0D9W8CLOp32oatpM0jyyN89EdvSWzjHzZJ+L6H1FtZps7uhpDFWHdva1R25vyGecLMUuXjo5t/D7oCDih+HwHoSAxoi0QvsPd8/qqHQVznNJKtR6thUpXEwAAAIAG4DCBjbgTTgpBw0egRkJwBSz0oTt+1IcapNU2jA6N8urMSk9YXHEQHKN68BAF3YJ59q2Ujv3LOXmBqGd1T+kzwUszfMlgzq8MMu19Yfzse6AIK1Agn1Vj6F7YXLsXDN+T4KszX5+FJa7t/Zsp3nALWy6l0f4WKivEF5Y2QpEFcQ==" +PUB_RSA = "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c=" # noqa +PUB_DSS = "ssh-dss AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgwj/oLjYbVtjAy6pl/h0EKCWx2rf1IetyNsTxWrniA9I6HeDj65X1FyDkg6g8tvCnaNB8Xp/UUhuzHuGsMIipRxBxw9LF608EqZcj1E3ytktoW5B5OcjrkEoz3xG7C+rpIjYvAAAAFQDwz4UnmsGiSNu5iqjn3uTzwUpshwAAAIEAkxfFeY8P2wZpDjX0MimZl5wkoFQDL25cPzGBuB4OnB8NoUk/yjAHIIpEShw8V+LzouMK5CTJQo5+Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg4Ok10+XFDxlqZo8Y+wAAACARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmnjO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacIBlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgE=" # noqa +PUB_ECDSA_256 = "ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBJSPZm3ZWkvk/Zx8WP+fZRZ5/NBBHnGQwR6uIC6XHGPDIHuWUzIjAwA0bzqkOUffEsbLe+uQgKl5kbc/L8KA/eo=" # noqa +PUB_ECDSA_384 = "ecdsa-sha2-nistp384 AAAAE2VjZHNhLXNoYTItbmlzdHAzODQAAAAIbmlzdHAzODQAAABhBBbGibQLW9AAZiGN2hEQxWYYoFaWKwN3PKSaDJSMqmIn1Z9sgRUuw8Y/w502OGvXL/wFk0i2z50l3pWZjD7gfMH7gX5TUiCzwrQkS+Hn1U2S9aF5WJp0NcIzYxXw2r4M2A==" # noqa +PUB_ECDSA_521 = "ecdsa-sha2-nistp521 AAAAE2VjZHNhLXNoYTItbmlzdHA1MjEAAAAIbmlzdHA1MjEAAACFBACaOaFLZGuxa5AW16qj6VLypFbLrEWrt9AZUloCMefxO8bNLjK/O5g0rAVasar1TnyHE9qj4NwzANZASWjQNbc4MAG8vzqezFwLIn/kNyNTsXNfqEko9OgHZknlj2Z79dwTJcRAL4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA==" # noqa +PUB_RSA_2K_OPENSSH = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDF+Dpr54DX0WdeTDpNAMdkCWEkl3OXtNgf58qlN1gX572OLBqLf0zT4bHstUEpU3piazph/rSWcUMuBoD46tZ6jiH7H9b9Pem2eYQWaELDDkM+v9BMbEy5rMbFRLol5OtEvPFqneyEAanPOgvd8t3yyhSev9QVusakzJ8j8LGgrA8huYZ+Srnw0shEWLG70KUKCh3rG0QIvA8nfhtUOisr2Gp+F0YxMGb5gwBlQYAYE5l6u1SjZ7hNjyNosjK+wRBFgFFBYVpkZKJgWoK9w4ijFyzMZTucnZMqKOKAjIJvHfKBf2/cEfYxSq1EndqTqjYsd9T7/s2vcn1OH5a0wkER" # noqa +PUB_DSS_1K_OPENSSH = "ssh-dss AAAAB3NzaC1kc3MAAACBAL8XEx7F9xuwBNles+vWpNF+YcofrBhjX1r5QhpBe0eoYWLHRcroN6lxwCdGYRfgOoRjTncBiixQX/uUxAY96zDh3ir492s2BcJt4ihvNn/AY0I0OTuX/2IwGk9CGzafjaeZNVYxMa8lcVt0hSOTjkPQ7gVuk6bJzMInvie+VWKLAAAAFQDUgYdY+rhR0SkKbC09BS/SIHcB+wAAAIB44+4zpCNcd0CGvZlowH99zyPX8uxQtmTLQFuR2O8O0FgVVuCdDgD0D9W8CLOp32oatpM0jyyN89EdvSWzjHzZJ+L6H1FtZps7uhpDFWHdva1R25vyGecLMUuXjo5t/D7oCDih+HwHoSAxoi0QvsPd8/qqHQVznNJKtR6thUpXEwAAAIAG4DCBjbgTTgpBw0egRkJwBSz0oTt+1IcapNU2jA6N8urMSk9YXHEQHKN68BAF3YJ59q2Ujv3LOXmBqGd1T+kzwUszfMlgzq8MMu19Yfzse6AIK1Agn1Vj6F7YXLsXDN+T4KszX5+FJa7t/Zsp3nALWy6l0f4WKivEF5Y2QpEFcQ==" # noqa FINGER_RSA = "1024 60:73:38:44:cb:51:86:65:7f:de:da:a2:2b:5a:57:d5" FINGER_DSS = "1024 44:78:f0:b9:a2:3c:c5:18:20:09:ff:75:5b:c1:d2:6c" FINGER_ECDSA_256 = "256 25:19:eb:55:e6:a1:47:ff:4f:38:d2:75:6f:a5:d5:60" FINGER_ECDSA_384 = "384 c1:8d:a0:59:09:47:41:8e:a8:a6:07:01:29:23:b4:65" FINGER_ECDSA_521 = "521 44:58:22:52:12:33:16:0e:ce:0e:be:2c:7c:7e:cc:1e" -SIGNED_RSA = "20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8" +SIGNED_RSA = "20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8" # noqa FINGER_RSA_2K_OPENSSH = "2048 68:d1:72:01:bf:c0:0c:66:97:78:df:ce:75:74:46:d6" FINGER_DSS_1K_OPENSSH = "1024 cf:1d:eb:d7:61:d3:12:94:c6:c0:c6:54:35:35:b0:82" @@ -112,8 +112,8 @@ L4QLcT5aND0EHZLB2fAUDXiWIb2j4rg1mwPlBMiBXA== x1234 = b"\x01\x02\x03\x04" -TEST_KEY_BYTESTR_2 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x81\x00\xd3\x8fV\xea\x07\x85\xa6k%\x8d<\x1f\xbc\x8dT\x98\xa5\x96$\xf3E#\xbe>\xbc\xd2\x93\x93\x87f\xceD\x18\xdb \x0c\xb3\xa1a\x96\xf8e#\xcc\xacS\x8a#\xefVlE\x83\x1epv\xc1o\x17M\xef\xdf\x89DUXL\xa6\x8b\xaa<\x06\x10\xd7\x93w\xec\xaf\xe2\xaf\x95\xd8\xfb\xd9\xbfw\xcb\x9f0)#y{\x10\x90\xaa\x85l\tPru\x8c\t\x19\xce\xa0\xf1\xd2\xdc\x8e/\x8b\xa8f\x9c0\xdey\x84\xd2F\xf7\xcbmm\x1f\x87" -TEST_KEY_BYTESTR_3 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\x07k%<\x1fT$E#>ғfD\x18 \x0cae#̬S#VlE\x1epvo\x17M߉DUXL<\x06\x10דw\u2bd5ٿw˟0)#y{\x10l\tPru\t\x19Π\u070e/f0yFmm\x1f" +TEST_KEY_BYTESTR_2 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x81\x00\xd3\x8fV\xea\x07\x85\xa6k%\x8d<\x1f\xbc\x8dT\x98\xa5\x96$\xf3E#\xbe>\xbc\xd2\x93\x93\x87f\xceD\x18\xdb \x0c\xb3\xa1a\x96\xf8e#\xcc\xacS\x8a#\xefVlE\x83\x1epv\xc1o\x17M\xef\xdf\x89DUXL\xa6\x8b\xaa<\x06\x10\xd7\x93w\xec\xaf\xe2\xaf\x95\xd8\xfb\xd9\xbfw\xcb\x9f0)#y{\x10\x90\xaa\x85l\tPru\x8c\t\x19\xce\xa0\xf1\xd2\xdc\x8e/\x8b\xa8f\x9c0\xdey\x84\xd2F\xf7\xcbmm\x1f\x87" # noqa +TEST_KEY_BYTESTR_3 = "\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x01#\x00\x00\x00\x00ӏV\x07k%<\x1fT$E#>ғfD\x18 \x0cae#̬S#VlE\x1epvo\x17M߉DUXL<\x06\x10דw\u2bd5ٿw˟0)#y{\x10l\tPru\t\x19Π\u070e/f0yFmm\x1f" # noqa class KeyTest(unittest.TestCase): @@ -134,12 +134,12 @@ class KeyTest(unittest.TestCase): self.assertEqual(fh.readline()[:-1], "Proc-Type: 4,ENCRYPTED") self.assertEqual(fh.readline()[0:10], "DEK-Info: ") - def test_1_generate_key_bytes(self): + def test_generate_key_bytes(self): key = util.generate_key_bytes(md5, x1234, "happy birthday", 30) - exp = b"\x61\xE1\xF2\x72\xF4\xC1\xC4\x56\x15\x86\xBD\x32\x24\x98\xC0\xE9\x24\x67\x27\x80\xF4\x7B\xB3\x7D\xDA\x7D\x54\x01\x9E\x64" + exp = b"\x61\xE1\xF2\x72\xF4\xC1\xC4\x56\x15\x86\xBD\x32\x24\x98\xC0\xE9\x24\x67\x27\x80\xF4\x7B\xB3\x7D\xDA\x7D\x54\x01\x9E\x64" # noqa self.assertEqual(exp, key) - def test_2_load_rsa(self): + def test_load_rsa(self): key = RSAKey.from_private_key_file(_support("test_rsa.key")) self.assertEqual("ssh-rsa", key.get_name()) exp_rsa = b(FINGER_RSA.split()[1].replace(":", "")) @@ -155,7 +155,7 @@ class KeyTest(unittest.TestCase): key2 = RSAKey.from_private_key(s) self.assertEqual(key, key2) - def test_3_load_rsa_password(self): + def test_load_rsa_password(self): key = RSAKey.from_private_key_file( _support("test_rsa_password.key"), "television" ) @@ -166,7 +166,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(PUB_RSA.split()[1], key.get_base64()) self.assertEqual(1024, key.get_bits()) - def test_4_load_dss(self): + def test_load_dss(self): key = DSSKey.from_private_key_file(_support("test_dss.key")) self.assertEqual("ssh-dss", key.get_name()) exp_dss = b(FINGER_DSS.split()[1].replace(":", "")) @@ -182,7 +182,7 @@ class KeyTest(unittest.TestCase): key2 = DSSKey.from_private_key(s) self.assertEqual(key, key2) - def test_5_load_dss_password(self): + def test_load_dss_password(self): key = DSSKey.from_private_key_file( _support("test_dss_password.key"), "television" ) @@ -193,7 +193,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(PUB_DSS.split()[1], key.get_base64()) self.assertEqual(1024, key.get_bits()) - def test_6_compare_rsa(self): + def test_compare_rsa(self): # verify that the private & public keys compare equal key = RSAKey.from_private_key_file(_support("test_rsa.key")) self.assertEqual(key, key) @@ -202,7 +202,7 @@ class KeyTest(unittest.TestCase): self.assertTrue(not pub.can_sign()) self.assertEqual(key, pub) - def test_7_compare_dss(self): + def test_compare_dss(self): # verify that the private & public keys compare equal key = DSSKey.from_private_key_file(_support("test_dss.key")) self.assertEqual(key, key) @@ -211,7 +211,7 @@ class KeyTest(unittest.TestCase): self.assertTrue(not pub.can_sign()) self.assertEqual(key, pub) - def test_8_sign_rsa(self): + def test_sign_rsa(self): # verify that the rsa private key can sign and verify key = RSAKey.from_private_key_file(_support("test_rsa.key")) msg = key.sign_ssh_data(b"ice weasels") @@ -226,7 +226,7 @@ class KeyTest(unittest.TestCase): pub = RSAKey(data=key.asbytes()) self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg)) - def test_9_sign_dss(self): + def test_sign_dss(self): # verify that the dss private key can sign and verify key = DSSKey.from_private_key_file(_support("test_dss.key")) msg = key.sign_ssh_data(b"ice weasels") @@ -241,19 +241,19 @@ class KeyTest(unittest.TestCase): pub = DSSKey(data=key.asbytes()) self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg)) - def test_A_generate_rsa(self): + def test_generate_rsa(self): key = RSAKey.generate(1024) msg = key.sign_ssh_data(b"jerri blank") msg.rewind() self.assertTrue(key.verify_ssh_sig(b"jerri blank", msg)) - def test_B_generate_dss(self): + def test_generate_dss(self): key = DSSKey.generate(1024) msg = key.sign_ssh_data(b"jerri blank") msg.rewind() self.assertTrue(key.verify_ssh_sig(b"jerri blank", msg)) - def test_C_generate_ecdsa(self): + def test_generate_ecdsa(self): key = ECDSAKey.generate() msg = key.sign_ssh_data(b"jerri blank") msg.rewind() @@ -282,7 +282,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(key.get_bits(), 521) self.assertEqual(key.get_name(), "ecdsa-sha2-nistp521") - def test_10_load_ecdsa_256(self): + def test_load_ecdsa_256(self): key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) self.assertEqual("ecdsa-sha2-nistp256", key.get_name()) exp_ecdsa = b(FINGER_ECDSA_256.split()[1].replace(":", "")) @@ -298,7 +298,7 @@ class KeyTest(unittest.TestCase): key2 = ECDSAKey.from_private_key(s) self.assertEqual(key, key2) - def test_11_load_ecdsa_password_256(self): + def test_load_ecdsa_password_256(self): key = ECDSAKey.from_private_key_file( _support("test_ecdsa_password_256.key"), b"television" ) @@ -309,7 +309,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(PUB_ECDSA_256.split()[1], key.get_base64()) self.assertEqual(256, key.get_bits()) - def test_12_compare_ecdsa_256(self): + def test_compare_ecdsa_256(self): # verify that the private & public keys compare equal key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) self.assertEqual(key, key) @@ -318,7 +318,7 @@ class KeyTest(unittest.TestCase): self.assertTrue(not pub.can_sign()) self.assertEqual(key, pub) - def test_13_sign_ecdsa_256(self): + def test_sign_ecdsa_256(self): # verify that the rsa private key can sign and verify key = ECDSAKey.from_private_key_file(_support("test_ecdsa_256.key")) msg = key.sign_ssh_data(b"ice weasels") @@ -334,7 +334,7 @@ class KeyTest(unittest.TestCase): pub = ECDSAKey(data=key.asbytes()) self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg)) - def test_14_load_ecdsa_384(self): + def test_load_ecdsa_384(self): key = ECDSAKey.from_private_key_file(_support("test_ecdsa_384.key")) self.assertEqual("ecdsa-sha2-nistp384", key.get_name()) exp_ecdsa = b(FINGER_ECDSA_384.split()[1].replace(":", "")) @@ -350,7 +350,7 @@ class KeyTest(unittest.TestCase): key2 = ECDSAKey.from_private_key(s) self.assertEqual(key, key2) - def test_15_load_ecdsa_password_384(self): + def test_load_ecdsa_password_384(self): key = ECDSAKey.from_private_key_file( _support("test_ecdsa_password_384.key"), b"television" ) @@ -361,7 +361,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(PUB_ECDSA_384.split()[1], key.get_base64()) self.assertEqual(384, key.get_bits()) - def test_16_compare_ecdsa_384(self): + def test_compare_ecdsa_384(self): # verify that the private & public keys compare equal key = ECDSAKey.from_private_key_file(_support("test_ecdsa_384.key")) self.assertEqual(key, key) @@ -370,7 +370,7 @@ class KeyTest(unittest.TestCase): self.assertTrue(not pub.can_sign()) self.assertEqual(key, pub) - def test_17_sign_ecdsa_384(self): + def test_sign_ecdsa_384(self): # verify that the rsa private key can sign and verify key = ECDSAKey.from_private_key_file(_support("test_ecdsa_384.key")) msg = key.sign_ssh_data(b"ice weasels") @@ -386,7 +386,7 @@ class KeyTest(unittest.TestCase): pub = ECDSAKey(data=key.asbytes()) self.assertTrue(pub.verify_ssh_sig(b"ice weasels", msg)) - def test_18_load_ecdsa_521(self): + def test_load_ecdsa_521(self): key = ECDSAKey.from_private_key_file(_support("test_ecdsa_521.key")) self.assertEqual("ecdsa-sha2-nistp521", key.get_name()) exp_ecdsa = b(FINGER_ECDSA_521.split()[1].replace(":", "")) @@ -405,7 +405,7 @@ class KeyTest(unittest.TestCase): key2 = ECDSAKey.from_private_key(s) self.assertEqual(key, key2) - def test_19_load_ecdsa_password_521(self): + def test_load_ecdsa_password_521(self): key = ECDSAKey.from_private_key_file( _support("test_ecdsa_password_521.key"), b"television" ) @@ -416,7 +416,7 @@ class KeyTest(unittest.TestCase): self.assertEqual(PUB_ECDSA_521.split()[1], key.get_base64()) self.assertEqual(521, key.get_bits()) - def test_20_compare_ecdsa_521(self): + def test_compare_ecdsa_521(self): # verify that the private & public keys compare equal key = ECDSAKey.from_private_key_file(_support("test_ecdsa_521.key")) self.assertEqual(key, key) @@ -425,7 +425,7 @@ class KeyTest(unittest.TestCase): self.assertTrue(not pub.can_sign()) self.assertEqual(key, pub) - def test_21_sign_ecdsa_521(self): + def test_sign_ecdsa_521(self): # verify that the rsa private key can sign and verify key = ECDSAKey.from_private_key_file(_support("test_ecdsa_521.key")) msg = key.sign_ssh_data(b"ice weasels") @@ -493,6 +493,18 @@ class KeyTest(unittest.TestCase): ) self.assertNotEqual(key1.asbytes(), key2.asbytes()) + def test_ed25519_funky_padding(self): + # Proves #1306 by just not exploding with 'Invalid key'. + Ed25519Key.from_private_key_file( + _support("test_ed25519-funky-padding.key") + ) + + def test_ed25519_funky_padding_with_passphrase(self): + # Proves #1306 by just not exploding with 'Invalid key'. + Ed25519Key.from_private_key_file( + _support("test_ed25519-funky-padding_password.key"), b"asdf" + ) + def test_ed25519_compare(self): # verify that the private & public keys compare equal key = Ed25519Key.from_private_key_file(_support("test_ed25519.key")) @@ -504,7 +516,7 @@ class KeyTest(unittest.TestCase): def test_ed25519_nonbytes_password(self): # https://github.com/paramiko/paramiko/issues/1039 - key = Ed25519Key.from_private_key_file( + Ed25519Key.from_private_key_file( _support("test_ed25519_password.key"), # NOTE: not a bytes. Amusingly, the test above for same key DOES # explicitly cast to bytes...code smell! @@ -553,7 +565,7 @@ class KeyTest(unittest.TestCase): # Delve into blob contents, for test purposes msg = Message(key.public_blob.key_blob) self.assertEqual(msg.get_text(), "ssh-rsa-cert-v01@openssh.com") - nonce = msg.get_string() + msg.get_string() e = msg.get_mpint() n = msg.get_mpint() self.assertEqual(e, key.public_numbers.e) diff --git a/tests/test_proxy.py b/tests/test_proxy.py new file mode 100644 index 00000000..2e0b0e51 --- /dev/null +++ b/tests/test_proxy.py @@ -0,0 +1,144 @@ +import signal +import socket + +from mock import patch +from pytest import raises + +from paramiko import ProxyCommand, ProxyCommandFailure + + +class TestProxyCommand(object): + @patch("paramiko.proxy.subprocess") + def test_init_takes_command_string(self, subprocess): + ProxyCommand(command_line="do a thing") + subprocess.Popen.assert_called_once_with( + ["do", "a", "thing"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=0, + ) + + @patch("paramiko.proxy.subprocess.Popen") + def test_send_writes_to_process_stdin_returning_length(self, Popen): + proxy = ProxyCommand("hi") + written = proxy.send(b"data") + Popen.return_value.stdin.write.assert_called_once_with(b"data") + assert written == len(b"data") + + @patch("paramiko.proxy.subprocess.Popen") + def test_send_raises_ProxyCommandFailure_on_error(self, Popen): + Popen.return_value.stdin.write.side_effect = IOError(0, "whoops") + with raises(ProxyCommandFailure) as info: + ProxyCommand("hi").send("data") + assert info.value.command == "hi" + assert info.value.error == "whoops" + + @patch("paramiko.proxy.subprocess.Popen") + @patch("paramiko.proxy.os.read") + @patch("paramiko.proxy.select") + def test_recv_reads_from_process_stdout_returning_bytes( + self, select, os_read, Popen + ): + stdout = Popen.return_value.stdout + select.return_value = [stdout], None, None + fileno = stdout.fileno.return_value + # Intentionally returning <5 at a time sometimes + os_read.side_effect = [b"was", b"te", b"of ti", b"me"] + proxy = ProxyCommand("hi") + data = proxy.recv(5) + assert data == b"waste" + assert [x[0] for x in os_read.call_args_list] == [ + (fileno, 5), + (fileno, 2), + ] + + @patch("paramiko.proxy.subprocess.Popen") + @patch("paramiko.proxy.os.read") + @patch("paramiko.proxy.select") + def test_recv_returns_buffer_on_timeout_if_any_read( + self, select, os_read, Popen + ): + stdout = Popen.return_value.stdout + select.return_value = [stdout], None, None + fileno = stdout.fileno.return_value + os_read.side_effect = [b"was", socket.timeout] + proxy = ProxyCommand("hi") + data = proxy.recv(5) + assert data == b"was" # not b"waste" + assert os_read.call_args[0] == (fileno, 2) + + @patch("paramiko.proxy.subprocess.Popen") + @patch("paramiko.proxy.os.read") + @patch("paramiko.proxy.select") + def test_recv_raises_timeout_if_nothing_read(self, select, os_read, Popen): + stdout = Popen.return_value.stdout + select.return_value = [stdout], None, None + fileno = stdout.fileno.return_value + os_read.side_effect = socket.timeout + proxy = ProxyCommand("hi") + with raises(socket.timeout): + proxy.recv(5) + assert os_read.call_args[0] == (fileno, 5) + + @patch("paramiko.proxy.subprocess.Popen") + @patch("paramiko.proxy.os.read") + @patch("paramiko.proxy.select") + def test_recv_raises_ProxyCommandFailure_on_non_timeout_error( + self, select, os_read, Popen + ): + select.return_value = [Popen.return_value.stdout], None, None + os_read.side_effect = IOError(0, "whoops") + with raises(ProxyCommandFailure) as info: + ProxyCommand("hi").recv(5) + assert info.value.command == "hi" + assert info.value.error == "whoops" + + @patch("paramiko.proxy.subprocess.Popen") + @patch("paramiko.proxy.os.kill") + def test_close_kills_subprocess(self, os_kill, Popen): + proxy = ProxyCommand("hi") + proxy.close() + os_kill.assert_called_once_with(Popen.return_value.pid, signal.SIGTERM) + + @patch("paramiko.proxy.subprocess.Popen") + def test_closed_exposes_whether_subprocess_has_exited(self, Popen): + proxy = ProxyCommand("hi") + Popen.return_value.returncode = None + assert proxy.closed is False + assert proxy._closed is False + Popen.return_value.returncode = 0 + assert proxy.closed is True + assert proxy._closed is True + + @patch("paramiko.proxy.time.time") + @patch("paramiko.proxy.subprocess.Popen") + @patch("paramiko.proxy.os.read") + @patch("paramiko.proxy.select") + def test_timeout_affects_whether_timeout_is_raised( + self, select, os_read, Popen, time + ): + stdout = Popen.return_value.stdout + select.return_value = [stdout], None, None + # Base case: None timeout means no timing out + os_read.return_value = b"meh" + proxy = ProxyCommand("yello") + assert proxy.timeout is None + # Implicit 'no raise' check + assert proxy.recv(3) == b"meh" + # Use settimeout to set timeout, and it is honored + time.side_effect = [0, 10] # elapsed > 7 + proxy = ProxyCommand("ohnoz") + proxy.settimeout(7) + assert proxy.timeout == 7 + with raises(socket.timeout): + proxy.recv(3) + + @patch("paramiko.proxy.subprocess", new=None) + @patch("paramiko.proxy.subprocess_import_error", new=ImportError("meh")) + def test_raises_subprocess_ImportErrors_at_runtime(self): + # Not an ideal test, but I don't know of a non-bad way to fake out + # module-time ImportErrors. So we mock the symptoms. Meh! + with raises(ImportError) as info: + ProxyCommand("hi!!!") + assert str(info.value) == "meh" diff --git a/tests/test_sftp.py b/tests/test_sftp.py index fdb7c9bc..e4e18e5a 100644 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -26,23 +26,18 @@ do test file operations in (so no existing files will be harmed). import os import socket import sys -import threading -import unittest import warnings from binascii import hexlify from tempfile import mkstemp import pytest -import paramiko -import paramiko.util from paramiko.py3compat import PY2, b, u, StringIO from paramiko.common import o777, o600, o666, o644 from paramiko.sftp_attr import SFTPAttributes from .util import needs_builtin -from .stub_sftp import StubServer, StubSFTPServer -from .util import _support, slow +from .util import slow ARTICLE = """ @@ -74,14 +69,21 @@ decreased compared with chicken. # Here is how unicode characters are encoded over 1 to 6 bytes in utf-8 -# U-00000000 - U-0000007F: 0xxxxxxx -# U-00000080 - U-000007FF: 110xxxxx 10xxxxxx -# U-00000800 - U-0000FFFF: 1110xxxx 10xxxxxx 10xxxxxx -# U-00010000 - U-001FFFFF: 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx -# U-00200000 - U-03FFFFFF: 111110xx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx -# U-04000000 - U-7FFFFFFF: 1111110x 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx +# U-00000000 - U-0000007F: +# 0xxxxxxx +# U-00000080 - U-000007FF: +# 110xxxxx 10xxxxxx +# U-00000800 - U-0000FFFF: +# 1110xxxx 10xxxxxx 10xxxxxx +# U-00010000 - U-001FFFFF: +# 11110xxx 10xxxxxx 10xxxxxx 10xxxxxx +# U-00200000 - U-03FFFFFF: +# 111110xx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx +# U-04000000 - U-7FFFFFFF: +# 1111110x 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx 10xxxxxx # Note that: hex(int('11000011',2)) == '0xc3' -# Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation byte" +# Thus, the following 2-bytes sequence is not valid utf8: "invalid continuation +# byte" NON_UTF8_DATA = b"\xC3\xC3" unicode_folder = u"\u00fcnic\u00f8de" if PY2 else "\u00fcnic\u00f8de" @@ -90,7 +92,7 @@ utf8_folder = b"/\xc3\xbcnic\xc3\xb8\x64\x65" @slow class TestSFTP(object): - def test_1_file(self, sftp): + def test_file(self, sftp): """ verify that we can create a file. """ @@ -101,7 +103,7 @@ class TestSFTP(object): f.close() sftp.remove(sftp.FOLDER + "/test") - def test_2_close(self, sftp): + def test_close(self, sftp): """ Verify that SFTP session close() causes a socket error on next action. """ @@ -109,7 +111,7 @@ class TestSFTP(object): with pytest.raises(socket.error, match="Socket is closed"): sftp.open(sftp.FOLDER + "/test2", "w") - def test_2_sftp_can_be_used_as_context_manager(self, sftp): + def test_sftp_can_be_used_as_context_manager(self, sftp): """ verify that the sftp session is closed when exiting the context manager """ @@ -118,7 +120,7 @@ class TestSFTP(object): with pytest.raises(socket.error, match="Socket is closed"): sftp.open(sftp.FOLDER + "/test2", "w") - def test_3_write(self, sftp): + def test_write(self, sftp): """ verify that a file can be created and written, and the size is correct. """ @@ -129,7 +131,7 @@ class TestSFTP(object): finally: sftp.remove(sftp.FOLDER + "/duck.txt") - def test_3_sftp_file_can_be_used_as_context_manager(self, sftp): + def test_sftp_file_can_be_used_as_context_manager(self, sftp): """ verify that an opened file can be used as a context manager """ @@ -140,7 +142,7 @@ class TestSFTP(object): finally: sftp.remove(sftp.FOLDER + "/duck.txt") - def test_4_append(self, sftp): + def test_append(self, sftp): """ verify that a file can be opened for append, and tell() still works. """ @@ -158,7 +160,7 @@ class TestSFTP(object): finally: sftp.remove(sftp.FOLDER + "/append.txt") - def test_5_rename(self, sftp): + def test_rename(self, sftp): """ verify that renaming a file works. """ @@ -185,7 +187,7 @@ class TestSFTP(object): except: pass - def test_5a_posix_rename(self, sftp): + def testa_posix_rename(self, sftp): """Test posix-rename@openssh.com protocol extension.""" try: # first check that the normal rename works as specified @@ -214,7 +216,7 @@ class TestSFTP(object): except: pass - def test_6_folder(self, sftp): + def test_folder(self, sftp): """ create a temporary folder, verify that we can create a file in it, then remove the folder and verify that we can't create a file in it anymore. @@ -227,7 +229,7 @@ class TestSFTP(object): with pytest.raises(IOError, match="No such file"): sftp.open(sftp.FOLDER + "/subfolder/test") - def test_7_listdir(self, sftp): + def test_listdir(self, sftp): """ verify that a folder can be created, a bunch of files can be placed in it, and those files show up in sftp.listdir. @@ -248,7 +250,7 @@ class TestSFTP(object): sftp.remove(sftp.FOLDER + "/fish.txt") sftp.remove(sftp.FOLDER + "/tertiary.py") - def test_7_5_listdir_iter(self, sftp): + def test_listdir_iter(self, sftp): """ listdir_iter version of above test """ @@ -268,7 +270,7 @@ class TestSFTP(object): sftp.remove(sftp.FOLDER + "/fish.txt") sftp.remove(sftp.FOLDER + "/tertiary.py") - def test_8_setstat(self, sftp): + def test_setstat(self, sftp): """ verify that the setstat functions (chown, chmod, utime, truncate) work. """ @@ -305,7 +307,7 @@ class TestSFTP(object): finally: sftp.remove(sftp.FOLDER + "/special") - def test_9_fsetstat(self, sftp): + def test_fsetstat(self, sftp): """ verify that the fsetstat functions (chown, chmod, utime, truncate) work on open files. @@ -345,12 +347,12 @@ class TestSFTP(object): finally: sftp.remove(sftp.FOLDER + "/special") - def test_A_readline_seek(self, sftp): + def test_readline_seek(self, sftp): """ - create a text file and write a bunch of text into it. then count the lines - in the file, and seek around to retrieve particular lines. this should - verify that read buffering and 'tell' work well together, and that read - buffering is reset on 'seek'. + create a text file and write a bunch of text into it. then count the + lines in the file, and seek around to retrieve particular lines. this + should verify that read buffering and 'tell' work well together, and + that read buffering is reset on 'seek'. """ try: with sftp.open(sftp.FOLDER + "/duck.txt", "w") as f: @@ -370,17 +372,14 @@ class TestSFTP(object): f.seek(pos_list[17], f.SEEK_SET) assert f.readline()[:4] == "duck" f.seek(pos_list[10], f.SEEK_SET) - assert ( - f.readline() - == "duck types were equally resistant to exogenous insulin compared with chicken.\n" - ) + expected = "duck types were equally resistant to exogenous insulin compared with chicken.\n" # noqa + assert f.readline() == expected finally: sftp.remove(sftp.FOLDER + "/duck.txt") - def test_B_write_seek(self, sftp): + def test_write_seek(self, sftp): """ - create a text file, seek back and change part of it, and verify that the - changes worked. + Create a text file, seek back, change it, and verify. """ try: with sftp.open(sftp.FOLDER + "/testing.txt", "w") as f: @@ -395,7 +394,7 @@ class TestSFTP(object): finally: sftp.remove(sftp.FOLDER + "/testing.txt") - def test_C_symlink(self, sftp): + def test_symlink(self, sftp): """ create a symlink and then check that lstat doesn't follow it. """ @@ -442,7 +441,7 @@ class TestSFTP(object): except: pass - def test_D_flush_seek(self, sftp): + def test_flush_seek(self, sftp): """ verify that buffered writes are automatically flushed on seek. """ @@ -462,7 +461,7 @@ class TestSFTP(object): except: pass - def test_E_realpath(self, sftp): + def test_realpath(self, sftp): """ test that realpath is returning something non-empty and not an error. @@ -473,7 +472,7 @@ class TestSFTP(object): assert len(f) > 0 assert os.path.join(pwd, sftp.FOLDER) == f - def test_F_mkdir(self, sftp): + def test_mkdir(self, sftp): """ verify that mkdir/rmdir work. """ @@ -484,7 +483,7 @@ class TestSFTP(object): with pytest.raises(IOError, match="No such file"): sftp.rmdir(sftp.FOLDER + "/subfolder") - def test_G_chdir(self, sftp): + def test_chdir(self, sftp): """ verify that chdir/getcwd work. """ @@ -520,7 +519,7 @@ class TestSFTP(object): except: pass - def test_H_get_put(self, sftp): + def test_get_put(self, sftp): """ verify that get/put work. """ @@ -555,7 +554,7 @@ class TestSFTP(object): os.unlink(localname) sftp.unlink(sftp.FOLDER + "/bunny.txt") - def test_I_check(self, sftp): + def test_check(self, sftp): """ verify that file.check() works against our own server. (it's an sftp extension that we support, and may be the only ones who @@ -577,14 +576,12 @@ class TestSFTP(object): == u(hexlify(sum)).upper() ) sum = f.check("md5", 0, 0, 510) - assert ( - u(hexlify(sum)).upper() - == "EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6" - ) # noqa + expected = "EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6" # noqa + assert u(hexlify(sum)).upper() == expected finally: sftp.unlink(sftp.FOLDER + "/kitty.txt") - def test_J_x_flag(self, sftp): + def test_x_flag(self, sftp): """ verify that the 'x' flag works when opening a file. """ @@ -599,7 +596,7 @@ class TestSFTP(object): finally: sftp.unlink(sftp.FOLDER + "/unusual.txt") - def test_K_utf8(self, sftp): + def test_utf8(self, sftp): """ verify that unicode strings are encoded into utf8 correctly. """ @@ -615,7 +612,7 @@ class TestSFTP(object): self.fail("exception " + str(e)) sftp.unlink(b(sftp.FOLDER) + utf8_folder) - def test_L_utf8_chdir(self, sftp): + def test_utf8_chdir(self, sftp): sftp.mkdir(sftp.FOLDER + "/" + unicode_folder) try: sftp.chdir(sftp.FOLDER + "/" + unicode_folder) @@ -626,7 +623,7 @@ class TestSFTP(object): sftp.chdir() sftp.rmdir(sftp.FOLDER + "/" + unicode_folder) - def test_M_bad_readv(self, sftp): + def test_bad_readv(self, sftp): """ verify that readv at the end of the file doesn't essplode. """ @@ -642,7 +639,7 @@ class TestSFTP(object): finally: sftp.unlink(sftp.FOLDER + "/zero") - def test_N_put_without_confirm(self, sftp): + def test_put_without_confirm(self, sftp): """ verify that get/put work without confirmation. """ @@ -671,11 +668,11 @@ class TestSFTP(object): os.unlink(localname) sftp.unlink(sftp.FOLDER + "/bunny.txt") - def test_O_getcwd(self, sftp): + def test_getcwd(self, sftp): """ verify that chdir/getcwd work. """ - assert sftp.getcwd() == None + assert sftp.getcwd() is None root = sftp.normalize(".") if root[-1] != "/": root += "/" @@ -690,7 +687,7 @@ class TestSFTP(object): except: pass - def XXX_test_M_seek_append(self, sftp): + def test_seek_append(self, sftp): """ verify that seek does't affect writes during append. @@ -728,7 +725,7 @@ class TestSFTP(object): # appear but they're clearly emitted from subthreads that have no error # handling. No point running it until that is fixed somehow. @pytest.mark.skip("Doesn't prove anything right now") - def test_N_file_with_percent(self, sftp): + def test_file_with_percent(self, sftp): """ verify that we can create a file with a '%' in the filename. ( it needs to be properly escaped by _log() ) @@ -740,7 +737,7 @@ class TestSFTP(object): f.close() sftp.remove(sftp.FOLDER + "/test%file") - def test_O_non_utf8_data(self, sftp): + def test_non_utf8_data(self, sftp): """Test write() and read() of non utf8 data""" try: with sftp.open("%s/nonutf8data" % sftp.FOLDER, "w") as f: @@ -770,7 +767,7 @@ class TestSFTP(object): try: with sftp.open("%s/write_buffer" % sftp.FOLDER, "wb") as f: for offset in range(0, len(data), 8): - f.write(buffer(data, offset, 8)) + f.write(buffer(data, offset, 8)) # noqa with sftp.open("%s/write_buffer" % sftp.FOLDER, "rb") as f: assert f.read() == data diff --git a/tests/test_sftp_big.py b/tests/test_sftp_big.py index 9df566e8..fc556faf 100644 --- a/tests/test_sftp_big.py +++ b/tests/test_sftp_big.py @@ -23,12 +23,10 @@ a real actual sftp server is contacted, and a new folder is created there to do test file operations in (so no existing files will be harmed). """ -import os import random import struct import sys import time -import unittest from paramiko.common import o660 @@ -37,7 +35,7 @@ from .util import slow @slow class TestBigSFTP(object): - def test_1_lots_of_files(self, sftp): + def test_lots_of_files(self, sftp): """ create a bunch of files over the same session. """ @@ -65,7 +63,7 @@ class TestBigSFTP(object): except: pass - def test_2_big_file(self, sftp): + def test_big_file(self, sftp): """ write a 1MB file with no buffering. """ @@ -96,7 +94,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_3_big_file_pipelined(self, sftp): + def test_big_file_pipelined(self, sftp): """ write a 1MB file, with no linefeeds, using pipelining. """ @@ -122,7 +120,8 @@ class TestBigSFTP(object): file_size = f.stat().st_size f.prefetch(file_size) - # read on odd boundaries to make sure the bytes aren't getting scrambled + # read on odd boundaries to make sure the bytes aren't getting + # scrambled n = 0 k2blob = kblob + kblob chunk = 629 @@ -140,7 +139,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_4_prefetch_seek(self, sftp): + def test_prefetch_seek(self, sftp): kblob = bytes().join([struct.pack(">H", n) for n in range(512)]) try: with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f: @@ -180,7 +179,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_5_readv_seek(self, sftp): + def test_readv_seek(self, sftp): kblob = bytes().join([struct.pack(">H", n) for n in range(512)]) try: with sftp.open("%s/hongry.txt" % sftp.FOLDER, "wb") as f: @@ -220,7 +219,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_6_lots_of_prefetching(self, sftp): + def test_lots_of_prefetching(self, sftp): """ prefetch a 1MB file a bunch of times, discarding the file object without using it, to verify that paramiko doesn't get confused. @@ -255,7 +254,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_7_prefetch_readv(self, sftp): + def test_prefetch_readv(self, sftp): """ verify that prefetch and readv don't conflict with each other. """ @@ -296,7 +295,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_8_large_readv(self, sftp): + def test_large_readv(self, sftp): """ verify that a very large readv is broken up correctly and still returned as a single blob. @@ -325,7 +324,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_9_big_file_big_buffer(self, sftp): + def test_big_file_big_buffer(self, sftp): """ write a 1MB file, with no linefeeds, and a big buffer. """ @@ -342,7 +341,7 @@ class TestBigSFTP(object): finally: sftp.remove("%s/hongry.txt" % sftp.FOLDER) - def test_A_big_file_renegotiate(self, sftp): + def test_big_file_renegotiate(self, sftp): """ write a 1MB file, forcing key renegotiation in the middle. """ diff --git a/tests/test_ssh_exception.py b/tests/test_ssh_exception.py index d9e0bd22..1628986a 100644 --- a/tests/test_ssh_exception.py +++ b/tests/test_ssh_exception.py @@ -1,7 +1,15 @@ import pickle import unittest -from paramiko.ssh_exception import NoValidConnectionsError +from paramiko import RSAKey +from paramiko.ssh_exception import ( + NoValidConnectionsError, + BadAuthenticationType, + PartialAuthentication, + ChannelException, + BadHostKeyException, + ProxyCommandFailure, +) class NoValidConnectionsErrorTest(unittest.TestCase): @@ -33,3 +41,35 @@ class NoValidConnectionsErrorTest(unittest.TestCase): ) exp = "Unable to connect to port 22 on 10.0.0.42, 127.0.0.1 or ::1" assert exp in str(exc) + + +class ExceptionStringDisplayTest(unittest.TestCase): + def test_BadAuthenticationType(self): + exc = BadAuthenticationType( + "Bad authentication type", ["ok", "also-ok"] + ) + expected = "Bad authentication type; allowed types: ['ok', 'also-ok']" + assert str(exc) == expected + + def test_PartialAuthentication(self): + exc = PartialAuthentication(["ok", "also-ok"]) + expected = "Partial authentication; allowed types: ['ok', 'also-ok']" + assert str(exc) == expected + + def test_BadHostKeyException(self): + got_key = RSAKey.generate(2048) + wanted_key = RSAKey.generate(2048) + exc = BadHostKeyException("myhost", got_key, wanted_key) + expected = "Host key for server 'myhost' does not match: got '{}', expected '{}'" # noqa + assert str(exc) == expected.format( + got_key.get_base64(), wanted_key.get_base64() + ) + + def test_ProxyCommandFailure(self): + exc = ProxyCommandFailure("man squid", 7) + expected = 'ProxyCommand("man squid") returned nonzero exit status: 7' + assert str(exc) == expected + + def test_ChannelException(self): + exc = ChannelException(17, "whatever") + assert str(exc) == "ChannelException(17, 'whatever')" diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py index b6b50152..92801c20 100644 --- a/tests/test_ssh_gss.py +++ b/tests/test_ssh_gss.py @@ -25,11 +25,10 @@ Unit Tests for the GSS-API / SSPI SSHv2 Authentication (gssapi-with-mic) import socket import threading -import unittest import paramiko -from .util import _support, needs_gssapi +from .util import _support, needs_gssapi, KerberosTestCase, update_env from .test_client import FINGERPRINTS @@ -61,23 +60,24 @@ class NullServer(paramiko.ServerInterface): return paramiko.OPEN_SUCCEEDED def check_channel_exec_request(self, channel, command): - if command != "yes": + if command != b"yes": return False return True @needs_gssapi -class GSSAuthTest(unittest.TestCase): +class GSSAuthTest(KerberosTestCase): def setUp(self): # TODO: username and targ_name should come from os.environ or whatever # the approved pytest method is for runtime-configuring test data. - self.username = "krb5_principal" - self.hostname = socket.getfqdn("targ_name") + self.username = self.realm.user_princ + self.hostname = socket.getfqdn(self.realm.hostname) self.sockl = socket.socket() - self.sockl.bind(("targ_name", 0)) + self.sockl.bind((self.realm.hostname, 0)) self.sockl.listen(1) self.addr, self.port = self.sockl.getsockname() self.event = threading.Event() + update_env(self, self.realm.env) thread = threading.Thread(target=self._run) thread.start() @@ -139,16 +139,16 @@ class GSSAuthTest(unittest.TestCase): stdout.close() stderr.close() - def test_1_gss_auth(self): + def test_gss_auth(self): """ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication (gssapi-with-mic) in client and server mode. """ self._test_connection(allow_agent=False, look_for_keys=False) - def test_2_auth_trickledown(self): + def test_auth_trickledown(self): """ - Failed gssapi-with-mic auth doesn't prevent subsequent key auth from succeeding + Failed gssapi-with-mic doesn't prevent subsequent key from succeeding """ self.hostname = ( "this_host_does_not_exists_and_causes_a_GSSAPI-exception" diff --git a/tests/test_transport.py b/tests/test_transport.py index 2b8ee3bc..e2174896 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -28,36 +28,32 @@ import socket import time import threading import random -from hashlib import sha1 import unittest from mock import Mock from paramiko import ( - Transport, - SecurityOptions, - ServerInterface, - RSAKey, - DSSKey, - SSHException, + AuthHandler, ChannelException, + DSSKey, Packetizer, - Channel, - AuthHandler, + RSAKey, + SSHException, + SecurityOptions, + ServerInterface, + Transport, ) from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED from paramiko.common import ( - MSG_KEXINIT, - cMSG_CHANNEL_WINDOW_ADJUST, - cMSG_UNIMPLEMENTED, + DEFAULT_MAX_PACKET_SIZE, + DEFAULT_WINDOW_SIZE, + MAX_WINDOW_SIZE, MIN_PACKET_SIZE, MIN_WINDOW_SIZE, - MAX_WINDOW_SIZE, - DEFAULT_WINDOW_SIZE, - DEFAULT_MAX_PACKET_SIZE, - MSG_NAMES, - MSG_UNIMPLEMENTED, + MSG_KEXINIT, MSG_USERAUTH_SUCCESS, + cMSG_CHANNEL_WINDOW_ADJUST, + cMSG_UNIMPLEMENTED, ) from paramiko.py3compat import bytes, byte_chr from paramiko.message import Message @@ -185,7 +181,7 @@ class TransportTest(unittest.TestCase): self.assertTrue(event.is_set()) self.assertTrue(self.ts.is_active()) - def test_1_security_options(self): + def test_security_options(self): o = self.tc.get_security_options() self.assertEqual(type(o), SecurityOptions) self.assertTrue(("aes256-cbc", "blowfish-cbc") != o.ciphers) @@ -202,7 +198,7 @@ class TransportTest(unittest.TestCase): except TypeError: pass - def test_1b_security_options_reset(self): + def testb_security_options_reset(self): o = self.tc.get_security_options() # should not throw any exceptions o.ciphers = o.ciphers @@ -211,17 +207,17 @@ class TransportTest(unittest.TestCase): o.kex = o.kex o.compression = o.compression - def test_2_compute_key(self): - self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929 - self.tc.H = b"\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3" + def test_compute_key(self): + self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929 # noqa + self.tc.H = b"\x0C\x83\x07\xCD\xE6\x85\x6F\xF3\x0B\xA9\x36\x84\xEB\x0F\x04\xC2\x52\x0E\x9E\xD3" # noqa self.tc.session_id = self.tc.H key = self.tc._compute_key("C", 32) self.assertEqual( - b"207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995", + b"207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995", # noqa hexlify(key).upper(), ) - def test_3_simple(self): + def test_simple(self): """ verify that we can establish an ssh link with ourselves across the loopback sockets. this is hardly "simple" but it's simpler than the @@ -249,7 +245,7 @@ class TransportTest(unittest.TestCase): self.assertEqual(True, self.tc.is_authenticated()) self.assertEqual(True, self.ts.is_authenticated()) - def test_3a_long_banner(self): + def testa_long_banner(self): """ verify that a long banner doesn't mess up the handshake. """ @@ -268,7 +264,7 @@ class TransportTest(unittest.TestCase): self.assertTrue(event.is_set()) self.assertTrue(self.ts.is_active()) - def test_4_special(self): + def test_special(self): """ verify that the client can demand odd handshake settings, and can renegotiate keys in mid-stream. @@ -289,7 +285,7 @@ class TransportTest(unittest.TestCase): self.ts.send_ignore(1024) @slow - def test_5_keepalive(self): + def test_keepalive(self): """ verify that the keepalive will be sent. """ @@ -299,7 +295,7 @@ class TransportTest(unittest.TestCase): time.sleep(2) self.assertEqual("keepalive@lag.net", self.server._global_request) - def test_6_exec_command(self): + def test_exec_command(self): """ verify that exec_command() does something reasonable. """ @@ -343,7 +339,7 @@ class TransportTest(unittest.TestCase): self.assertEqual("This is on stderr.\n", f.readline()) self.assertEqual("", f.readline()) - def test_6a_channel_can_be_used_as_context_manager(self): + def testa_channel_can_be_used_as_context_manager(self): """ verify that exec_command() does something reasonable. """ @@ -359,7 +355,7 @@ class TransportTest(unittest.TestCase): self.assertEqual("Hello there.\n", f.readline()) self.assertEqual("", f.readline()) - def test_7_invoke_shell(self): + def test_invoke_shell(self): """ verify that invoke_shell() does something reasonable. """ @@ -373,18 +369,18 @@ class TransportTest(unittest.TestCase): chan.close() self.assertEqual("", f.readline()) - def test_8_channel_exception(self): + def test_channel_exception(self): """ verify that ChannelException is thrown for a bad open-channel request. """ self.setup_test_server() try: - chan = self.tc.open_channel("bogus") + self.tc.open_channel("bogus") self.fail("expected exception") except ChannelException as e: self.assertTrue(e.code == OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED) - def test_9_exit_status(self): + def test_exit_status(self): """ verify that get_exit_status() works. """ @@ -413,7 +409,7 @@ class TransportTest(unittest.TestCase): self.assertEqual(23, chan.recv_exit_status()) chan.close() - def test_A_select(self): + def test_select(self): """ verify that select() on a channel works. """ @@ -468,7 +464,7 @@ class TransportTest(unittest.TestCase): # ...and now is closed. self.assertEqual(True, p._closed) - def test_B_renegotiate(self): + def test_renegotiate(self): """ verify that a transport can correctly renegotiate mid-stream. """ @@ -492,7 +488,7 @@ class TransportTest(unittest.TestCase): schan.close() - def test_C_compression(self): + def test_compression(self): """ verify that zlib compression is basically working. """ @@ -510,14 +506,15 @@ class TransportTest(unittest.TestCase): bytes2 = self.tc.packetizer._Packetizer__sent_bytes block_size = self.tc._cipher_info[self.tc.local_cipher]["block-size"] mac_size = self.tc._mac_info[self.tc.local_mac]["size"] - # tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :) + # tests show this is actually compressed to *52 bytes*! including + # packet overhead! nice!! :) self.assertTrue(bytes2 - bytes < 1024) self.assertEqual(16 + block_size + mac_size, bytes2 - bytes) chan.close() schan.close() - def test_D_x11(self): + def test_x11(self): """ verify that an x11 port can be requested and opened. """ @@ -555,7 +552,7 @@ class TransportTest(unittest.TestCase): chan.close() schan.close() - def test_E_reverse_port_forwarding(self): + def test_reverse_port_forwarding(self): """ verify that a client can ask the server to open a reverse port for forwarding. @@ -563,7 +560,7 @@ class TransportTest(unittest.TestCase): self.setup_test_server() chan = self.tc.open_session() chan.exec_command("yes") - schan = self.ts.accept(1.0) + self.ts.accept(1.0) requested = [] @@ -594,7 +591,7 @@ class TransportTest(unittest.TestCase): self.tc.cancel_port_forward("127.0.0.1", port) self.assertTrue(self.server._listen is None) - def test_F_port_forwarding(self): + def test_port_forwarding(self): """ verify that a client can forward new connections from a locally- forwarded port. @@ -602,7 +599,7 @@ class TransportTest(unittest.TestCase): self.setup_test_server() chan = self.tc.open_session() chan.exec_command("yes") - schan = self.ts.accept(1.0) + self.ts.accept(1.0) # open a port on the "server" that the client will ask to forward to. greeting_server = socket.socket() @@ -626,7 +623,7 @@ class TransportTest(unittest.TestCase): self.assertEqual(b"Hello!\n", cs.recv(7)) cs.close() - def test_G_stderr_select(self): + def test_stderr_select(self): """ verify that select() on a channel works even if only stderr is receiving data. @@ -665,7 +662,7 @@ class TransportTest(unittest.TestCase): schan.close() chan.close() - def test_H_send_ready(self): + def test_send_ready(self): """ verify that send_ready() indicates when a send would not block. """ @@ -689,9 +686,10 @@ class TransportTest(unittest.TestCase): chan.close() self.assertEqual(chan.send_ready(), True) - def test_I_rekey_deadlock(self): + def test_rekey_deadlock(self): """ - Regression test for deadlock when in-transit messages are received after MSG_KEXINIT is sent + Regression test for deadlock when in-transit messages are received + after MSG_KEXINIT is sent Note: When this test fails, it may leak threads. """ @@ -714,12 +712,15 @@ class TransportTest(unittest.TestCase): # MSG_KEXINIT to the remote host. # # On the remote host (using any SSH implementation): - # 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST is sent. - # 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is sent. + # 5. The MSG_CHANNEL_DATA is received, and MSG_CHANNEL_WINDOW_ADJUST + # is sent. + # 6. The MSG_KEXINIT is received, and a corresponding MSG_KEXINIT is + # sent. # # In the main thread: # 7. The user's program calls Channel.send(). - # 8. Channel.send acquires Channel.lock, then calls Transport._send_user_message(). + # 8. Channel.send acquires Channel.lock, then calls + # Transport._send_user_message(). # 9. Transport._send_user_message waits for Transport.clear_to_send # to be set (i.e., it waits for re-keying to complete). # Channel.lock is still held. @@ -854,7 +855,7 @@ class TransportTest(unittest.TestCase): schan.close() chan.close() - def test_J_sanitze_packet_size(self): + def test_sanitze_packet_size(self): """ verify that we conform to the rfc of packet and window sizes. """ @@ -865,7 +866,7 @@ class TransportTest(unittest.TestCase): ]: self.assertEqual(self.tc._sanitize_packet_size(val), correct) - def test_K_sanitze_window_size(self): + def test_sanitze_window_size(self): """ verify that we conform to the rfc of packet and window sizes. """ @@ -877,7 +878,7 @@ class TransportTest(unittest.TestCase): self.assertEqual(self.tc._sanitize_window_size(val), correct) @slow - def test_L_handshake_timeout(self): + def test_handshake_timeout(self): """ verify that we can get a hanshake timeout. """ @@ -914,7 +915,7 @@ class TransportTest(unittest.TestCase): password="pygmalion", ) - def test_M_select_after_close(self): + def test_select_after_close(self): """ verify that select works when a channel is already closed. """ @@ -969,11 +970,11 @@ class TransportTest(unittest.TestCase): # send() accepts buffer instances sent = 0 while sent < len(data): - sent += chan.send(buffer(data, sent, 8)) + sent += chan.send(buffer(data, sent, 8)) # noqa self.assertEqual(sfile.read(len(data)), data) # sendall() accepts a buffer instance - chan.sendall(buffer(data)) + chan.sendall(buffer(data)) # noqa self.assertEqual(sfile.read(len(data)), data) @needs_builtin("memoryview") @@ -1101,3 +1102,70 @@ class TransportTest(unittest.TestCase): assert not self.ts.auth_handler.authenticated # Real fix's behavior self._expect_unimplemented() + + +class AlgorithmDisablingTests(unittest.TestCase): + def test_preferred_lists_default_to_private_attribute_contents(self): + t = Transport(sock=Mock()) + assert t.preferred_ciphers == t._preferred_ciphers + assert t.preferred_macs == t._preferred_macs + assert t.preferred_keys == t._preferred_keys + assert t.preferred_kex == t._preferred_kex + + def test_preferred_lists_filter_disabled_algorithms(self): + t = Transport( + sock=Mock(), + disabled_algorithms={ + "ciphers": ["aes128-cbc"], + "macs": ["hmac-md5"], + "keys": ["ssh-dss"], + "kex": ["diffie-hellman-group14-sha256"], + }, + ) + assert "aes128-cbc" in t._preferred_ciphers + assert "aes128-cbc" not in t.preferred_ciphers + assert "hmac-md5" in t._preferred_macs + assert "hmac-md5" not in t.preferred_macs + assert "ssh-dss" in t._preferred_keys + assert "ssh-dss" not in t.preferred_keys + assert "diffie-hellman-group14-sha256" in t._preferred_kex + assert "diffie-hellman-group14-sha256" not in t.preferred_kex + + def test_implementation_refers_to_public_algo_lists(self): + t = Transport( + sock=Mock(), + disabled_algorithms={ + "ciphers": ["aes128-cbc"], + "macs": ["hmac-md5"], + "keys": ["ssh-dss"], + "kex": ["diffie-hellman-group14-sha256"], + "compression": ["zlib"], + }, + ) + # Enable compression cuz otherwise disabling one option for it makes no + # sense... + t.use_compression(True) + # Effectively a random spot check, but kex init touches most/all of the + # algorithm lists so it's a good spot. + t._send_message = Mock() + t._send_kex_init() + # Cribbed from Transport._parse_kex_init, which didn't feel worth + # refactoring given all the vars involved :( + m = t._send_message.call_args[0][0] + m.rewind() + m.get_byte() # the msg type + m.get_bytes(16) # cookie, discarded + kexen = m.get_list() + server_keys = m.get_list() + ciphers = m.get_list() + m.get_list() + macs = m.get_list() + m.get_list() + compressions = m.get_list() + # OK, now we can actually check that our disabled algos were not + # included (as this message includes the full lists) + assert "aes128-cbc" not in ciphers + assert "hmac-md5" not in macs + assert "ssh-dss" not in server_keys + assert "diffie-hellman-group14-sha256" not in kexen + assert "zlib" not in compressions diff --git a/tests/test_util.py b/tests/test_util.py index 705baa14..8ce260d1 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -26,32 +26,12 @@ import os from hashlib import sha1 import unittest +import paramiko import paramiko.util -from paramiko.util import lookup_ssh_host_config as host_config, safe_string -from paramiko.py3compat import StringIO, byte_ord, b +from paramiko.util import safe_string +from paramiko.py3compat import byte_ord -# Note some lines in this configuration have trailing spaces on purpose -test_config_file = """\ -Host * - User robey - IdentityFile =~/.ssh/id_rsa - -# comment -Host *.example.com - \tUser bjork -Port=3333 -Host * -""" - -dont_strip_whitespace_please = "\t \t Crazy something dumb " - -test_config_file += dont_strip_whitespace_please -test_config_file += """ -Host spoo.example.com -Crazy something else -""" - test_hosts_file = """\ secure.example.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA1PD6U2/TVxET6lkpKhOk5r\ 9q/kAYG6sP9f5zuUYP8i7FOFp/6ncCEbbtg/lB+A3iidyxoSWl+9jtoyyDOOVX4UIDV9G11Ml8om3\ @@ -62,155 +42,75 @@ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\ """ -# for test 1: -from paramiko import * - - class UtilTest(unittest.TestCase): - def test_import(self): + def test_imports(self): """ verify that all the classes can be imported from paramiko. """ - symbols = list(globals().keys()) - self.assertTrue("Transport" in symbols) - self.assertTrue("SSHClient" in symbols) - self.assertTrue("MissingHostKeyPolicy" in symbols) - self.assertTrue("AutoAddPolicy" in symbols) - self.assertTrue("RejectPolicy" in symbols) - self.assertTrue("WarningPolicy" in symbols) - self.assertTrue("SecurityOptions" in symbols) - self.assertTrue("SubsystemHandler" in symbols) - self.assertTrue("Channel" in symbols) - self.assertTrue("RSAKey" in symbols) - self.assertTrue("DSSKey" in symbols) - self.assertTrue("Message" in symbols) - self.assertTrue("SSHException" in symbols) - self.assertTrue("AuthenticationException" in symbols) - self.assertTrue("PasswordRequiredException" in symbols) - self.assertTrue("BadAuthenticationType" in symbols) - self.assertTrue("ChannelException" in symbols) - self.assertTrue("SFTP" in symbols) - self.assertTrue("SFTPFile" in symbols) - self.assertTrue("SFTPHandle" in symbols) - self.assertTrue("SFTPClient" in symbols) - self.assertTrue("SFTPServer" in symbols) - self.assertTrue("SFTPError" in symbols) - self.assertTrue("SFTPAttributes" in symbols) - self.assertTrue("SFTPServerInterface" in symbols) - self.assertTrue("ServerInterface" in symbols) - self.assertTrue("BufferedFile" in symbols) - self.assertTrue("Agent" in symbols) - self.assertTrue("AgentKey" in symbols) - self.assertTrue("HostKeys" in symbols) - self.assertTrue("SSHConfig" in symbols) - self.assertTrue("util" in symbols) - - def test_parse_config(self): - global test_config_file - f = StringIO(test_config_file) - config = paramiko.util.parse_ssh_config(f) - self.assertEqual( - config._config, - [ - {"host": ["*"], "config": {}}, - { - "host": ["*"], - "config": { - "identityfile": ["~/.ssh/id_rsa"], - "user": "robey", - }, - }, - { - "host": ["*.example.com"], - "config": {"user": "bjork", "port": "3333"}, - }, - {"host": ["*"], "config": {"crazy": "something dumb"}}, - { - "host": ["spoo.example.com"], - "config": {"crazy": "something else"}, - }, - ], - ) - - def test_host_config(self): - global test_config_file - f = StringIO(test_config_file) - config = paramiko.util.parse_ssh_config(f) - - for host, values in { - "irc.danger.com": { - "crazy": "something dumb", - "hostname": "irc.danger.com", - "user": "robey", - }, - "irc.example.com": { - "crazy": "something dumb", - "hostname": "irc.example.com", - "user": "robey", - "port": "3333", - }, - "spoo.example.com": { - "crazy": "something dumb", - "hostname": "spoo.example.com", - "user": "robey", - "port": "3333", - }, - }.items(): - values = dict( - values, - hostname=host, - identityfile=[os.path.expanduser("~/.ssh/id_rsa")], - ) - self.assertEqual( - paramiko.util.lookup_ssh_host_config(host, config), values - ) + for name in ( + "Agent", + "AgentKey", + "AuthenticationException", + "AutoAddPolicy", + "BadAuthenticationType", + "BufferedFile", + "Channel", + "ChannelException", + "ConfigParseError", + "CouldNotCanonicalize", + "DSSKey", + "HostKeys", + "Message", + "MissingHostKeyPolicy", + "PasswordRequiredException", + "RSAKey", + "RejectPolicy", + "SFTP", + "SFTPAttributes", + "SFTPClient", + "SFTPError", + "SFTPFile", + "SFTPHandle", + "SFTPServer", + "SFTPServerInterface", + "SSHClient", + "SSHConfig", + "SSHConfigDict", + "SSHException", + "SecurityOptions", + "ServerInterface", + "SubsystemHandler", + "Transport", + "WarningPolicy", + "util", + ): + assert name in paramiko.__all__ def test_generate_key_bytes(self): x = paramiko.util.generate_key_bytes( sha1, b"ABCDEFGH", "This is my secret passphrase.", 64 ) hex = "".join(["%02x" % byte_ord(c) for c in x]) - self.assertEqual( - hex, - "9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b", - ) + hexpected = "9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b" # noqa + assert hex == hexpected def test_host_keys(self): with open("hostfile.temp", "w") as f: f.write(test_hosts_file) try: hostdict = paramiko.util.load_host_keys("hostfile.temp") - self.assertEqual(2, len(hostdict)) - self.assertEqual(1, len(list(hostdict.values())[0])) - self.assertEqual(1, len(list(hostdict.values())[1])) + assert 2 == len(hostdict) + assert 1 == len(list(hostdict.values())[0]) + assert 1 == len(list(hostdict.values())[1]) fp = hexlify( hostdict["secure.example.com"]["ssh-rsa"].get_fingerprint() ).upper() - self.assertEqual(b"E6684DB30E109B67B70FF1DC5C7F1363", fp) + assert b"E6684DB30E109B67B70FF1DC5C7F1363" == fp finally: os.unlink("hostfile.temp") - def test_host_config_expose_issue_33(self): - test_config_file = """ -Host www13.* - Port 22 - -Host *.example.com - Port 2222 - -Host * - Port 3333 - """ - f = StringIO(test_config_file) - config = paramiko.util.parse_ssh_config(f) - host = "www13.example.com" - self.assertEqual( - paramiko.util.lookup_ssh_host_config(host, config), - {"hostname": host, "port": "22"}, - ) - def test_eintr_retry(self): - self.assertEqual("foo", paramiko.util.retry_on_signal(lambda: "foo")) + assert "foo" == paramiko.util.retry_on_signal(lambda: "foo") # Variables that are set by raises_intr intr_errors_remaining = [3] @@ -223,8 +123,8 @@ Host * raise IOError(errno.EINTR, "file", "interrupted system call") self.assertTrue(paramiko.util.retry_on_signal(raises_intr) is None) - self.assertEqual(0, intr_errors_remaining[0]) - self.assertEqual(4, call_count[0]) + assert 0 == intr_errors_remaining[0] + assert 4 == call_count[0] def raises_ioerror_not_eintr(): raise IOError(errno.ENOENT, "file", "file not found") @@ -242,273 +142,10 @@ Host * lambda: paramiko.util.retry_on_signal(raises_other_exception), ) - def test_proxycommand_config_equals_parsing(self): - """ - ProxyCommand should not split on equals signs within the value. - """ - conf = """ -Host space-delimited - ProxyCommand foo bar=biz baz - -Host equals-delimited - ProxyCommand=foo bar=biz baz -""" - f = StringIO(conf) - config = paramiko.util.parse_ssh_config(f) - for host in ("space-delimited", "equals-delimited"): - self.assertEqual( - host_config(host, config)["proxycommand"], "foo bar=biz baz" - ) - - def test_proxycommand_interpolation(self): - """ - ProxyCommand should perform interpolation on the value - """ - config = paramiko.util.parse_ssh_config( - StringIO( - """ -Host specific - Port 37 - ProxyCommand host %h port %p lol - -Host portonly - Port 155 - -Host * - Port 25 - ProxyCommand host %h port %p -""" - ) - ) - for host, val in ( - ("foo.com", "host foo.com port 25"), - ("specific", "host specific port 37 lol"), - ("portonly", "host portonly port 155"), - ): - self.assertEqual(host_config(host, config)["proxycommand"], val) - - def test_proxycommand_tilde_expansion(self): - """ - Tilde (~) should be expanded inside ProxyCommand - """ - config = paramiko.util.parse_ssh_config( - StringIO( - """ -Host test - ProxyCommand ssh -F ~/.ssh/test_config bastion nc %h %p -""" - ) - ) - self.assertEqual( - "ssh -F %s/.ssh/test_config bastion nc test 22" - % os.path.expanduser("~"), - host_config("test", config)["proxycommand"], - ) - - def test_host_config_test_negation(self): - test_config_file = """ -Host www13.* !*.example.com - Port 22 - -Host *.example.com !www13.* - Port 2222 - -Host www13.* - Port 8080 - -Host * - Port 3333 - """ - f = StringIO(test_config_file) - config = paramiko.util.parse_ssh_config(f) - host = "www13.example.com" - self.assertEqual( - paramiko.util.lookup_ssh_host_config(host, config), - {"hostname": host, "port": "8080"}, - ) - - def test_host_config_test_proxycommand(self): - test_config_file = """ -Host proxy-with-equal-divisor-and-space -ProxyCommand = foo=bar - -Host proxy-with-equal-divisor-and-no-space -ProxyCommand=foo=bar - -Host proxy-without-equal-divisor -ProxyCommand foo=bar:%h-%p - """ - for host, values in { - "proxy-with-equal-divisor-and-space": { - "hostname": "proxy-with-equal-divisor-and-space", - "proxycommand": "foo=bar", - }, - "proxy-with-equal-divisor-and-no-space": { - "hostname": "proxy-with-equal-divisor-and-no-space", - "proxycommand": "foo=bar", - }, - "proxy-without-equal-divisor": { - "hostname": "proxy-without-equal-divisor", - "proxycommand": "foo=bar:proxy-without-equal-divisor-22", - }, - }.items(): - - f = StringIO(test_config_file) - config = paramiko.util.parse_ssh_config(f) - self.assertEqual( - paramiko.util.lookup_ssh_host_config(host, config), values - ) - - def test_host_config_test_identityfile(self): - test_config_file = """ - -IdentityFile id_dsa0 - -Host * -IdentityFile id_dsa1 - -Host dsa2 -IdentityFile id_dsa2 - -Host dsa2* -IdentityFile id_dsa22 - """ - for host, values in { - "foo": {"hostname": "foo", "identityfile": ["id_dsa0", "id_dsa1"]}, - "dsa2": { - "hostname": "dsa2", - "identityfile": ["id_dsa0", "id_dsa1", "id_dsa2", "id_dsa22"], - }, - "dsa22": { - "hostname": "dsa22", - "identityfile": ["id_dsa0", "id_dsa1", "id_dsa22"], - }, - }.items(): - - f = StringIO(test_config_file) - config = paramiko.util.parse_ssh_config(f) - self.assertEqual( - paramiko.util.lookup_ssh_host_config(host, config), values - ) - - def test_config_addressfamily_and_lazy_fqdn(self): - """ - Ensure the code path honoring non-'all' AddressFamily doesn't asplode - """ - test_config = """ -AddressFamily inet -IdentityFile something_%l_using_fqdn -""" - config = paramiko.util.parse_ssh_config(StringIO(test_config)) - assert config.lookup( - "meh" - ) # will die during lookup() if bug regresses - def test_clamp_value(self): - self.assertEqual(32768, paramiko.util.clamp_value(32767, 32768, 32769)) - self.assertEqual(32767, paramiko.util.clamp_value(32767, 32765, 32769)) - self.assertEqual(32769, paramiko.util.clamp_value(32767, 32770, 32769)) - - def test_config_dos_crlf_succeeds(self): - config_file = StringIO("host abcqwerty\r\nHostName 127.0.0.1\r\n") - config = paramiko.SSHConfig() - config.parse(config_file) - self.assertEqual(config.lookup("abcqwerty")["hostname"], "127.0.0.1") - - def test_get_hostnames(self): - f = StringIO(test_config_file) - config = paramiko.util.parse_ssh_config(f) - self.assertEqual( - config.get_hostnames(), {"*", "*.example.com", "spoo.example.com"} - ) - - def test_quoted_host_names(self): - test_config_file = """\ -Host "param pam" param "pam" - Port 1111 - -Host "param2" - Port 2222 - -Host param3 parara - Port 3333 - -Host param4 "p a r" "p" "par" para - Port 4444 -""" - res = { - "param pam": {"hostname": "param pam", "port": "1111"}, - "param": {"hostname": "param", "port": "1111"}, - "pam": {"hostname": "pam", "port": "1111"}, - "param2": {"hostname": "param2", "port": "2222"}, - "param3": {"hostname": "param3", "port": "3333"}, - "parara": {"hostname": "parara", "port": "3333"}, - "param4": {"hostname": "param4", "port": "4444"}, - "p a r": {"hostname": "p a r", "port": "4444"}, - "p": {"hostname": "p", "port": "4444"}, - "par": {"hostname": "par", "port": "4444"}, - "para": {"hostname": "para", "port": "4444"}, - } - f = StringIO(test_config_file) - config = paramiko.util.parse_ssh_config(f) - for host, values in res.items(): - self.assertEquals( - paramiko.util.lookup_ssh_host_config(host, config), values - ) - - def test_quoted_params_in_config(self): - test_config_file = """\ -Host "param pam" param "pam" - IdentityFile id_rsa - -Host "param2" - IdentityFile "test rsa key" - -Host param3 parara - IdentityFile id_rsa - IdentityFile "test rsa key" -""" - res = { - "param pam": {"hostname": "param pam", "identityfile": ["id_rsa"]}, - "param": {"hostname": "param", "identityfile": ["id_rsa"]}, - "pam": {"hostname": "pam", "identityfile": ["id_rsa"]}, - "param2": {"hostname": "param2", "identityfile": ["test rsa key"]}, - "param3": { - "hostname": "param3", - "identityfile": ["id_rsa", "test rsa key"], - }, - "parara": { - "hostname": "parara", - "identityfile": ["id_rsa", "test rsa key"], - }, - } - f = StringIO(test_config_file) - config = paramiko.util.parse_ssh_config(f) - for host, values in res.items(): - self.assertEquals( - paramiko.util.lookup_ssh_host_config(host, config), values - ) - - def test_quoted_host_in_config(self): - conf = SSHConfig() - correct_data = { - "param": ["param"], - '"param"': ["param"], - "param pam": ["param", "pam"], - '"param" "pam"': ["param", "pam"], - '"param" pam': ["param", "pam"], - 'param "pam"': ["param", "pam"], - 'param "pam" p': ["param", "pam", "p"], - '"param" pam "p"': ["param", "pam", "p"], - '"pa ram"': ["pa ram"], - '"pa ram" pam': ["pa ram", "pam"], - 'param "p a m"': ["param", "p a m"], - } - incorrect_data = ['param"', '"param', 'param "pam', 'param "pam" "p a'] - for host, values in correct_data.items(): - self.assertEquals(conf._get_hosts(host), values) - for host in incorrect_data: - self.assertRaises(Exception, conf._get_hosts, host) + assert 32768 == paramiko.util.clamp_value(32767, 32768, 32769) + assert 32767 == paramiko.util.clamp_value(32767, 32765, 32769) + assert 32769 == paramiko.util.clamp_value(32767, 32770, 32769) def test_safe_string(self): vanilla = b"vanilla" @@ -521,54 +158,3 @@ Host param3 parara assert safe_vanilla == vanilla, msg msg = err.format(safe_has_bytes, expected_bytes) assert safe_has_bytes == expected_bytes, msg - - def test_proxycommand_none_issue_418(self): - test_config_file = """ -Host proxycommand-standard-none - ProxyCommand None - -Host proxycommand-with-equals-none - ProxyCommand=None - """ - for host, values in { - "proxycommand-standard-none": { - "hostname": "proxycommand-standard-none" - }, - "proxycommand-with-equals-none": { - "hostname": "proxycommand-with-equals-none" - }, - }.items(): - - f = StringIO(test_config_file) - config = paramiko.util.parse_ssh_config(f) - self.assertEqual( - paramiko.util.lookup_ssh_host_config(host, config), values - ) - - def test_proxycommand_none_masking(self): - # Re: https://github.com/paramiko/paramiko/issues/670 - source_config = """ -Host specific-host - ProxyCommand none - -Host other-host - ProxyCommand other-proxy - -Host * - ProxyCommand default-proxy -""" - config = paramiko.SSHConfig() - config.parse(StringIO(source_config)) - # When bug is present, the full stripping-out of specific-host's - # ProxyCommand means it actually appears to pick up the default - # ProxyCommand value instead, due to cascading. It should (for - # backwards compatibility reasons in 1.x/2.x) appear completely blank, - # as if the host had no ProxyCommand whatsoever. - # Threw another unrelated host in there just for sanity reasons. - self.assertFalse("proxycommand" in config.lookup("specific-host")) - self.assertEqual( - config.lookup("other-host")["proxycommand"], "other-proxy" - ) - self.assertEqual( - config.lookup("some-random-host")["proxycommand"], "default-proxy" - ) diff --git a/tests/util.py b/tests/util.py index 4ca02374..9057f516 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,19 +1,28 @@ from os.path import dirname, realpath, join +import os +import sys +import unittest import pytest from paramiko.py3compat import builtins +from paramiko.ssh_gss import GSS_AUTH_AVAILABLE + + +tests_dir = dirname(realpath(__file__)) def _support(filename): - return join(dirname(realpath(__file__)), filename) + return join(tests_dir, filename) + +def _config(name): + return join(tests_dir, "configs", name) -# TODO: consider using pytest.importorskip('gssapi') instead? We presumably -# still need CLI configurability for the Kerberos parameters, though, so can't -# JUST key off presence of GSSAPI optional dependency... -# TODO: anyway, s/True/os.environ.get('RUN_GSSAPI', False)/ or something. -needs_gssapi = pytest.mark.skipif(True, reason="No GSSAPI to test") + +needs_gssapi = pytest.mark.skipif( + not GSS_AUTH_AVAILABLE, reason="No GSSAPI to test" +) def needs_builtin(name): @@ -25,3 +34,96 @@ def needs_builtin(name): slow = pytest.mark.slow + +# GSSAPI / Kerberos related tests need a working Kerberos environment. +# The class `KerberosTestCase` provides such an environment or skips all tests. +# There are 3 distinct cases: +# +# - A Kerberos environment has already been created and the environment +# contains the required information. +# +# - We can use the package 'k5test' to setup an working kerberos environment on +# the fly. +# +# - We skip all tests. +# +# ToDo: add a Windows specific implementation? + +if ( + os.environ.get("K5TEST_USER_PRINC", None) + and os.environ.get("K5TEST_HOSTNAME", None) + and os.environ.get("KRB5_KTNAME", None) +): # add other vars as needed + + # The environment provides the required information + class DummyK5Realm(object): + def __init__(self): + for k in os.environ: + if not k.startswith("K5TEST_"): + continue + setattr(self, k[7:].lower(), os.environ[k]) + self.env = {} + + class KerberosTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.realm = DummyK5Realm() + + @classmethod + def tearDownClass(cls): + del cls.realm + + +else: + try: + # Try to setup a kerberos environment + from k5test import KerberosTestCase + except Exception: + # Use a dummy, that skips all tests + class KerberosTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + raise unittest.SkipTest( + "Missing extension package k5test. " + 'Please run "pip install k5test" ' + "to install it." + ) + + +def update_env(testcase, mapping, env=os.environ): + """Modify os.environ during a test case and restore during cleanup.""" + saved_env = env.copy() + + def replace(target, source): + target.update(source) + for k in list(target): + if k not in source: + target.pop(k, None) + + testcase.addCleanup(replace, env, saved_env) + env.update(mapping) + return testcase + + +def k5shell(args=None): + """Create a shell with an kerberos environment + + This can be used to debug paramiko or to test the old GSSAPI. + To test a different GSSAPI, simply activate a suitable venv + within the shell. + """ + import k5test + import atexit + import subprocess + + k5 = k5test.K5Realm() + atexit.register(k5.stop) + os.environ.update(k5.env) + for n in ("realm", "user_princ", "hostname"): + os.environ["K5TEST_" + n.upper()] = getattr(k5, n) + + if not args: + args = sys.argv[1:] + if not args: + args = [os.environ.get("SHELL", "bash")] + sys.exit(subprocess.call(args)) |