summaryrefslogtreecommitdiffhomepage
path: root/demos
diff options
context:
space:
mode:
authorJeff Forcier <jeff@bitprophet.org>2018-05-29 15:18:24 -0700
committerJeff Forcier <jeff@bitprophet.org>2018-05-29 15:18:34 -0700
commitc091e756084ce017d8d872ffeaf95422f79140f1 (patch)
tree4c8c82399a6a21217f78ff36a90c3a355f6a0c23 /demos
parent0bf3fa458deffe1306f226a7257ceda927ca9e8e (diff)
Blacken under black 18.5b0
Diffstat (limited to 'demos')
-rwxr-xr-xdemos/demo.py85
-rwxr-xr-xdemos/demo_keygen.py130
-rw-r--r--demos/demo_server.py88
-rw-r--r--demos/demo_sftp.py76
-rw-r--r--demos/demo_simple.py50
-rw-r--r--demos/forward.py176
-rw-r--r--demos/interactive.py19
-rwxr-xr-xdemos/rforward.py147
8 files changed, 494 insertions, 277 deletions
diff --git a/demos/demo.py b/demos/demo.py
index fff61784..c9b0a5f5 100755
--- a/demos/demo.py
+++ b/demos/demo.py
@@ -31,6 +31,7 @@ import traceback
from paramiko.py3compat import input
import paramiko
+
try:
import interactive
except ImportError:
@@ -42,71 +43,73 @@ def agent_auth(transport, username):
Attempt to authenticate to the given transport using any of the private
keys available from an SSH agent.
"""
-
+
agent = paramiko.Agent()
agent_keys = agent.get_keys()
if len(agent_keys) == 0:
return
-
+
for key in agent_keys:
- print('Trying ssh-agent key %s' % hexlify(key.get_fingerprint()))
+ print("Trying ssh-agent key %s" % hexlify(key.get_fingerprint()))
try:
transport.auth_publickey(username, key)
- print('... success!')
+ print("... success!")
return
except paramiko.SSHException:
- print('... nope.')
+ print("... nope.")
def manual_auth(username, hostname):
- default_auth = 'p'
- auth = input('Auth by (p)assword, (r)sa key, or (d)ss key? [%s] ' % default_auth)
+ default_auth = "p"
+ auth = input(
+ "Auth by (p)assword, (r)sa key, or (d)ss key? [%s] " % default_auth
+ )
if len(auth) == 0:
auth = default_auth
- if auth == 'r':
- default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')
- path = input('RSA key [%s]: ' % default_path)
+ if auth == "r":
+ default_path = os.path.join(os.environ["HOME"], ".ssh", "id_rsa")
+ path = input("RSA key [%s]: " % default_path)
if len(path) == 0:
path = default_path
try:
key = paramiko.RSAKey.from_private_key_file(path)
except paramiko.PasswordRequiredException:
- password = getpass.getpass('RSA key password: ')
+ password = getpass.getpass("RSA key password: ")
key = paramiko.RSAKey.from_private_key_file(path, password)
t.auth_publickey(username, key)
- elif auth == 'd':
- default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_dsa')
- path = input('DSS key [%s]: ' % default_path)
+ elif auth == "d":
+ default_path = os.path.join(os.environ["HOME"], ".ssh", "id_dsa")
+ path = input("DSS key [%s]: " % default_path)
if len(path) == 0:
path = default_path
try:
key = paramiko.DSSKey.from_private_key_file(path)
except paramiko.PasswordRequiredException:
- password = getpass.getpass('DSS key password: ')
+ password = getpass.getpass("DSS key password: ")
key = paramiko.DSSKey.from_private_key_file(path, password)
t.auth_publickey(username, key)
else:
- pw = getpass.getpass('Password for %s@%s: ' % (username, hostname))
+ pw = getpass.getpass("Password for %s@%s: " % (username, hostname))
t.auth_password(username, pw)
# setup logging
-paramiko.util.log_to_file('demo.log')
+paramiko.util.log_to_file("demo.log")
-username = ''
+username = ""
if len(sys.argv) > 1:
hostname = sys.argv[1]
- if hostname.find('@') >= 0:
- username, hostname = hostname.split('@')
+ if hostname.find("@") >= 0:
+ username, hostname = hostname.split("@")
else:
- hostname = input('Hostname: ')
+ hostname = input("Hostname: ")
if len(hostname) == 0:
- print('*** Hostname required.')
+ print("*** Hostname required.")
sys.exit(1)
port = 22
-if hostname.find(':') >= 0:
- hostname, portstr = hostname.split(':')
+if hostname.find(":") >= 0:
+ hostname, portstr = hostname.split(":")
port = int(portstr)
# now connect
@@ -114,7 +117,7 @@ try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((hostname, port))
except Exception as e:
- print('*** Connect failed: ' + str(e))
+ print("*** Connect failed: " + str(e))
traceback.print_exc()
sys.exit(1)
@@ -123,34 +126,38 @@ try:
try:
t.start_client()
except paramiko.SSHException:
- print('*** SSH negotiation failed.')
+ print("*** SSH negotiation failed.")
sys.exit(1)
try:
- keys = paramiko.util.load_host_keys(os.path.expanduser('~/.ssh/known_hosts'))
+ keys = paramiko.util.load_host_keys(
+ os.path.expanduser("~/.ssh/known_hosts")
+ )
except IOError:
try:
- keys = paramiko.util.load_host_keys(os.path.expanduser('~/ssh/known_hosts'))
+ keys = paramiko.util.load_host_keys(
+ os.path.expanduser("~/ssh/known_hosts")
+ )
except IOError:
- print('*** Unable to open host keys file')
+ print("*** Unable to open host keys file")
keys = {}
# check server's host key -- this is important.
key = t.get_remote_server_key()
if hostname not in keys:
- print('*** WARNING: Unknown host key!')
+ print("*** WARNING: Unknown host key!")
elif key.get_name() not in keys[hostname]:
- print('*** WARNING: Unknown host key!')
+ print("*** WARNING: Unknown host key!")
elif keys[hostname][key.get_name()] != key:
- print('*** WARNING: Host key has changed!!!')
+ print("*** WARNING: Host key has changed!!!")
sys.exit(1)
else:
- print('*** Host key OK.')
+ print("*** Host key OK.")
# get username
- if username == '':
+ if username == "":
default_username = getpass.getuser()
- username = input('Username [%s]: ' % default_username)
+ username = input("Username [%s]: " % default_username)
if len(username) == 0:
username = default_username
@@ -158,25 +165,23 @@ try:
if not t.is_authenticated():
manual_auth(username, hostname)
if not t.is_authenticated():
- print('*** Authentication failed. :(')
+ print("*** Authentication failed. :(")
t.close()
sys.exit(1)
chan = t.open_session()
chan.get_pty()
chan.invoke_shell()
- print('*** Here we go!\n')
+ print("*** Here we go!\n")
interactive.interactive_shell(chan)
chan.close()
t.close()
except Exception as e:
- print('*** Caught exception: ' + str(e.__class__) + ': ' + str(e))
+ print("*** Caught exception: " + str(e.__class__) + ": " + str(e))
traceback.print_exc()
try:
t.close()
except:
pass
sys.exit(1)
-
-
diff --git a/demos/demo_keygen.py b/demos/demo_keygen.py
index 860ee4e9..6a80272d 100755
--- a/demos/demo_keygen.py
+++ b/demos/demo_keygen.py
@@ -28,62 +28,97 @@ from paramiko import RSAKey
from paramiko.ssh_exception import SSHException
from paramiko.py3compat import u
-usage="""
+usage = """
%prog [-v] [-b bits] -t type [-N new_passphrase] [-f output_keyfile]"""
default_values = {
"ktype": "dsa",
"bits": 1024,
"filename": "output",
- "comment": ""
+ "comment": "",
}
-key_dispatch_table = {
- 'dsa': DSSKey,
- 'rsa': RSAKey,
-}
+key_dispatch_table = {"dsa": DSSKey, "rsa": RSAKey}
+
def progress(arg=None):
if not arg:
- sys.stdout.write('0%\x08\x08\x08 ')
+ sys.stdout.write("0%\x08\x08\x08 ")
sys.stdout.flush()
- elif arg[0] == 'p':
- sys.stdout.write('25%\x08\x08\x08\x08 ')
+ elif arg[0] == "p":
+ sys.stdout.write("25%\x08\x08\x08\x08 ")
sys.stdout.flush()
- elif arg[0] == 'h':
- sys.stdout.write('50%\x08\x08\x08\x08 ')
+ elif arg[0] == "h":
+ sys.stdout.write("50%\x08\x08\x08\x08 ")
sys.stdout.flush()
- elif arg[0] == 'x':
- sys.stdout.write('75%\x08\x08\x08\x08 ')
+ elif arg[0] == "x":
+ sys.stdout.write("75%\x08\x08\x08\x08 ")
sys.stdout.flush()
-if __name__ == '__main__':
- phrase=None
- pfunc=None
+if __name__ == "__main__":
+
+ phrase = None
+ pfunc = None
parser = OptionParser(usage=usage)
- parser.add_option("-t", "--type", type="string", dest="ktype",
+ parser.add_option(
+ "-t",
+ "--type",
+ type="string",
+ dest="ktype",
help="Specify type of key to create (dsa or rsa)",
- metavar="ktype", default=default_values["ktype"])
- parser.add_option("-b", "--bits", type="int", dest="bits",
- help="Number of bits in the key to create", metavar="bits",
- default=default_values["bits"])
- parser.add_option("-N", "--new-passphrase", dest="newphrase",
- help="Provide new passphrase", metavar="phrase")
- parser.add_option("-P", "--old-passphrase", dest="oldphrase",
- help="Provide old passphrase", metavar="phrase")
- parser.add_option("-f", "--filename", type="string", dest="filename",
- help="Filename of the key file", metavar="filename",
- default=default_values["filename"])
- parser.add_option("-q", "--quiet", default=False, action="store_false",
- help="Quiet")
- parser.add_option("-v", "--verbose", default=False, action="store_true",
- help="Verbose")
- parser.add_option("-C", "--comment", type="string", dest="comment",
- help="Provide a new comment", metavar="comment",
- default=default_values["comment"])
+ metavar="ktype",
+ default=default_values["ktype"],
+ )
+ parser.add_option(
+ "-b",
+ "--bits",
+ type="int",
+ dest="bits",
+ help="Number of bits in the key to create",
+ metavar="bits",
+ default=default_values["bits"],
+ )
+ parser.add_option(
+ "-N",
+ "--new-passphrase",
+ dest="newphrase",
+ help="Provide new passphrase",
+ metavar="phrase",
+ )
+ parser.add_option(
+ "-P",
+ "--old-passphrase",
+ dest="oldphrase",
+ help="Provide old passphrase",
+ metavar="phrase",
+ )
+ parser.add_option(
+ "-f",
+ "--filename",
+ type="string",
+ dest="filename",
+ help="Filename of the key file",
+ metavar="filename",
+ default=default_values["filename"],
+ )
+ parser.add_option(
+ "-q", "--quiet", default=False, action="store_false", help="Quiet"
+ )
+ parser.add_option(
+ "-v", "--verbose", default=False, action="store_true", help="Verbose"
+ )
+ parser.add_option(
+ "-C",
+ "--comment",
+ type="string",
+ dest="comment",
+ help="Provide a new comment",
+ metavar="comment",
+ default=default_values["comment"],
+ )
(options, args) = parser.parse_args()
@@ -95,18 +130,23 @@ if __name__ == '__main__':
globals()[o] = getattr(options, o, default_values[o.lower()])
if options.newphrase:
- phrase = getattr(options, 'newphrase')
+ phrase = getattr(options, "newphrase")
if options.verbose:
pfunc = progress
- sys.stdout.write("Generating priv/pub %s %d bits key pair (%s/%s.pub)..." % (ktype, bits, filename, filename))
+ sys.stdout.write(
+ "Generating priv/pub %s %d bits key pair (%s/%s.pub)..."
+ % (ktype, bits, filename, filename)
+ )
sys.stdout.flush()
- if ktype == 'dsa' and bits > 1024:
+ if ktype == "dsa" and bits > 1024:
raise SSHException("DSA Keys must be 1024 bits")
if ktype not in key_dispatch_table:
- raise SSHException("Unknown %s algorithm to generate keys pair" % ktype)
+ raise SSHException(
+ "Unknown %s algorithm to generate keys pair" % ktype
+ )
# generating private key
prv = key_dispatch_table[ktype].generate(bits=bits, progress_func=pfunc)
@@ -114,7 +154,7 @@ if __name__ == '__main__':
# generating public key
pub = key_dispatch_table[ktype](filename=filename, password=phrase)
- with open("%s.pub" % filename, 'w') as f:
+ with open("%s.pub" % filename, "w") as f:
f.write("%s %s" % (pub.get_name(), pub.get_base64()))
if options.comment:
f.write(" %s" % comment)
@@ -123,4 +163,12 @@ if __name__ == '__main__':
print("done.")
hash = u(hexlify(pub.get_fingerprint()))
- print("Fingerprint: %d %s %s.pub (%s)" % (bits, ":".join([ hash[i:2+i] for i in range(0, len(hash), 2)]), filename, ktype.upper()))
+ print(
+ "Fingerprint: %d %s %s.pub (%s)"
+ % (
+ bits,
+ ":".join([hash[i : 2 + i] for i in range(0, len(hash), 2)]),
+ filename,
+ ktype.upper(),
+ )
+ )
diff --git a/demos/demo_server.py b/demos/demo_server.py
index 3a7ec854..313e5fb2 100644
--- a/demos/demo_server.py
+++ b/demos/demo_server.py
@@ -31,45 +31,47 @@ from paramiko.py3compat import b, u, decodebytes
# setup logging
-paramiko.util.log_to_file('demo_server.log')
+paramiko.util.log_to_file("demo_server.log")
-host_key = paramiko.RSAKey(filename='test_rsa.key')
-#host_key = paramiko.DSSKey(filename='test_dss.key')
+host_key = paramiko.RSAKey(filename="test_rsa.key")
+# host_key = paramiko.DSSKey(filename='test_dss.key')
-print('Read key: ' + u(hexlify(host_key.get_fingerprint())))
+print("Read key: " + u(hexlify(host_key.get_fingerprint())))
-class Server (paramiko.ServerInterface):
+class Server(paramiko.ServerInterface):
# 'data' is the output of base64.b64encode(key)
# (using the "user_rsa_key" files)
- data = (b'AAAAB3NzaC1yc2EAAAABIwAAAIEAyO4it3fHlmGZWJaGrfeHOVY7RWO3P9M7hp'
- b'fAu7jJ2d7eothvfeuoRFtJwhUmZDluRdFyhFY/hFAh76PJKGAusIqIQKlkJxMC'
- b'KDqIexkgHAfID/6mqvmnSJf0b5W8v5h2pI/stOSwTQ+pxVhwJ9ctYDhRSlF0iT'
- b'UWT10hcuO4Ks8=')
+ data = (
+ b"AAAAB3NzaC1yc2EAAAABIwAAAIEAyO4it3fHlmGZWJaGrfeHOVY7RWO3P9M7hp"
+ b"fAu7jJ2d7eothvfeuoRFtJwhUmZDluRdFyhFY/hFAh76PJKGAusIqIQKlkJxMC"
+ b"KDqIexkgHAfID/6mqvmnSJf0b5W8v5h2pI/stOSwTQ+pxVhwJ9ctYDhRSlF0iT"
+ b"UWT10hcuO4Ks8="
+ )
good_pub_key = paramiko.RSAKey(data=decodebytes(data))
def __init__(self):
self.event = threading.Event()
def check_channel_request(self, kind, chanid):
- if kind == 'session':
+ if kind == "session":
return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def check_auth_password(self, username, password):
- if (username == 'robey') and (password == 'foo'):
+ if (username == "robey") and (password == "foo"):
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
def check_auth_publickey(self, username, key):
- print('Auth attempt with key: ' + u(hexlify(key.get_fingerprint())))
- if (username == 'robey') and (key == self.good_pub_key):
+ print("Auth attempt with key: " + u(hexlify(key.get_fingerprint())))
+ if (username == "robey") and (key == self.good_pub_key):
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
-
- def check_auth_gssapi_with_mic(self, username,
- gss_authenticated=paramiko.AUTH_FAILED,
- cc_file=None):
+
+ def check_auth_gssapi_with_mic(
+ self, username, gss_authenticated=paramiko.AUTH_FAILED, cc_file=None
+ ):
"""
.. note::
We are just checking in `AuthHandler` that the given user is a
@@ -88,9 +90,9 @@ class Server (paramiko.ServerInterface):
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
- def check_auth_gssapi_keyex(self, username,
- gss_authenticated=paramiko.AUTH_FAILED,
- cc_file=None):
+ def check_auth_gssapi_keyex(
+ self, username, gss_authenticated=paramiko.AUTH_FAILED, cc_file=None
+ ):
if gss_authenticated == paramiko.AUTH_SUCCESSFUL:
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
@@ -99,14 +101,15 @@ class Server (paramiko.ServerInterface):
return True
def get_allowed_auths(self, username):
- return 'gssapi-keyex,gssapi-with-mic,password,publickey'
+ return "gssapi-keyex,gssapi-with-mic,password,publickey"
def check_channel_shell_request(self, channel):
self.event.set()
return True
- def check_channel_pty_request(self, channel, term, width, height, pixelwidth,
- pixelheight, modes):
+ def check_channel_pty_request(
+ self, channel, term, width, height, pixelwidth, pixelheight, modes
+ ):
return True
@@ -116,22 +119,22 @@ DoGSSAPIKeyExchange = True
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind(('', 2200))
+ sock.bind(("", 2200))
except Exception as e:
- print('*** Bind failed: ' + str(e))
+ print("*** Bind failed: " + str(e))
traceback.print_exc()
sys.exit(1)
try:
sock.listen(100)
- print('Listening for connection ...')
+ print("Listening for connection ...")
client, addr = sock.accept()
except Exception as e:
- print('*** Listen/accept failed: ' + str(e))
+ print("*** Listen/accept failed: " + str(e))
traceback.print_exc()
sys.exit(1)
-print('Got a connection!')
+print("Got a connection!")
try:
t = paramiko.Transport(client, gss_kex=DoGSSAPIKeyExchange)
@@ -139,43 +142,44 @@ try:
try:
t.load_server_moduli()
except:
- print('(Failed to load moduli -- gex will be unsupported.)')
+ print("(Failed to load moduli -- gex will be unsupported.)")
raise
t.add_server_key(host_key)
server = Server()
try:
t.start_server(server=server)
except paramiko.SSHException:
- print('*** SSH negotiation failed.')
+ print("*** SSH negotiation failed.")
sys.exit(1)
# wait for auth
chan = t.accept(20)
if chan is None:
- print('*** No channel.')
+ print("*** No channel.")
sys.exit(1)
- print('Authenticated!')
+ print("Authenticated!")
server.event.wait(10)
if not server.event.is_set():
- print('*** Client never asked for a shell.')
+ print("*** Client never asked for a shell.")
sys.exit(1)
- chan.send('\r\n\r\nWelcome to my dorky little BBS!\r\n\r\n')
- chan.send('We are on fire all the time! Hooray! Candy corn for everyone!\r\n')
- chan.send('Happy birthday to Robot Dave!\r\n\r\n')
- chan.send('Username: ')
- f = chan.makefile('rU')
- username = f.readline().strip('\r\n')
- chan.send('\r\nI don\'t like you, ' + username + '.\r\n')
+ chan.send("\r\n\r\nWelcome to my dorky little BBS!\r\n\r\n")
+ chan.send(
+ "We are on fire all the time! Hooray! Candy corn for everyone!\r\n"
+ )
+ chan.send("Happy birthday to Robot Dave!\r\n\r\n")
+ chan.send("Username: ")
+ f = chan.makefile("rU")
+ username = f.readline().strip("\r\n")
+ chan.send("\r\nI don't like you, " + username + ".\r\n")
chan.close()
except Exception as e:
- print('*** Caught exception: ' + str(e.__class__) + ': ' + str(e))
+ print("*** Caught exception: " + str(e.__class__) + ": " + str(e))
traceback.print_exc()
try:
t.close()
except:
pass
sys.exit(1)
-
diff --git a/demos/demo_sftp.py b/demos/demo_sftp.py
index 2cb44701..7f6a002e 100644
--- a/demos/demo_sftp.py
+++ b/demos/demo_sftp.py
@@ -32,38 +32,38 @@ from paramiko.py3compat import input
# setup logging
-paramiko.util.log_to_file('demo_sftp.log')
+paramiko.util.log_to_file("demo_sftp.log")
# Paramiko client configuration
-UseGSSAPI = True # enable GSS-API / SSPI authentication
+UseGSSAPI = True # enable GSS-API / SSPI authentication
DoGSSAPIKeyExchange = True
Port = 22
# get hostname
-username = ''
+username = ""
if len(sys.argv) > 1:
hostname = sys.argv[1]
- if hostname.find('@') >= 0:
- username, hostname = hostname.split('@')
+ if hostname.find("@") >= 0:
+ username, hostname = hostname.split("@")
else:
- hostname = input('Hostname: ')
+ hostname = input("Hostname: ")
if len(hostname) == 0:
- print('*** Hostname required.')
+ print("*** Hostname required.")
sys.exit(1)
-if hostname.find(':') >= 0:
- hostname, portstr = hostname.split(':')
+if hostname.find(":") >= 0:
+ hostname, portstr = hostname.split(":")
Port = int(portstr)
# get username
-if username == '':
+if username == "":
default_username = getpass.getuser()
- username = input('Username [%s]: ' % default_username)
+ username = input("Username [%s]: " % default_username)
if len(username) == 0:
username = default_username
if not UseGSSAPI:
- password = getpass.getpass('Password for %s@%s: ' % (username, hostname))
+ password = getpass.getpass("Password for %s@%s: " % (username, hostname))
else:
password = None
@@ -72,59 +72,69 @@ else:
hostkeytype = None
hostkey = None
try:
- host_keys = paramiko.util.load_host_keys(os.path.expanduser('~/.ssh/known_hosts'))
+ host_keys = paramiko.util.load_host_keys(
+ os.path.expanduser("~/.ssh/known_hosts")
+ )
except IOError:
try:
# try ~/ssh/ too, because windows can't have a folder named ~/.ssh/
- host_keys = paramiko.util.load_host_keys(os.path.expanduser('~/ssh/known_hosts'))
+ host_keys = paramiko.util.load_host_keys(
+ os.path.expanduser("~/ssh/known_hosts")
+ )
except IOError:
- print('*** Unable to open host keys file')
+ print("*** Unable to open host keys file")
host_keys = {}
if hostname in host_keys:
hostkeytype = host_keys[hostname].keys()[0]
hostkey = host_keys[hostname][hostkeytype]
- print('Using host key of type %s' % hostkeytype)
+ print("Using host key of type %s" % hostkeytype)
# now, connect and use paramiko Transport to negotiate SSH2 across the connection
try:
t = paramiko.Transport((hostname, Port))
- t.connect(hostkey, username, password, gss_host=socket.getfqdn(hostname),
- gss_auth=UseGSSAPI, gss_kex=DoGSSAPIKeyExchange)
+ t.connect(
+ hostkey,
+ username,
+ password,
+ gss_host=socket.getfqdn(hostname),
+ gss_auth=UseGSSAPI,
+ gss_kex=DoGSSAPIKeyExchange,
+ )
sftp = paramiko.SFTPClient.from_transport(t)
# dirlist on remote host
- dirlist = sftp.listdir('.')
+ dirlist = sftp.listdir(".")
print("Dirlist: %s" % dirlist)
# copy this demo onto the server
try:
sftp.mkdir("demo_sftp_folder")
except IOError:
- print('(assuming demo_sftp_folder/ already exists)')
- with sftp.open('demo_sftp_folder/README', 'w') as f:
- f.write('This was created by demo_sftp.py.\n')
- with open('demo_sftp.py', 'r') as f:
+ print("(assuming demo_sftp_folder/ already exists)")
+ with sftp.open("demo_sftp_folder/README", "w") as f:
+ f.write("This was created by demo_sftp.py.\n")
+ with open("demo_sftp.py", "r") as f:
data = f.read()
- sftp.open('demo_sftp_folder/demo_sftp.py', 'w').write(data)
- print('created demo_sftp_folder/ on the server')
-
+ sftp.open("demo_sftp_folder/demo_sftp.py", "w").write(data)
+ print("created demo_sftp_folder/ on the server")
+
# copy the README back here
- with sftp.open('demo_sftp_folder/README', 'r') as f:
+ with sftp.open("demo_sftp_folder/README", "r") as f:
data = f.read()
- with open('README_demo_sftp', 'w') as f:
+ with open("README_demo_sftp", "w") as f:
f.write(data)
- print('copied README back here')
-
+ print("copied README back here")
+
# BETTER: use the get() and put() methods
- sftp.put('demo_sftp.py', 'demo_sftp_folder/demo_sftp.py')
- sftp.get('demo_sftp_folder/README', 'README_demo_sftp')
+ sftp.put("demo_sftp.py", "demo_sftp_folder/demo_sftp.py")
+ sftp.get("demo_sftp_folder/README", "README_demo_sftp")
t.close()
except Exception as e:
- print('*** Caught exception: %s: %s' % (e.__class__, e))
+ print("*** Caught exception: %s: %s" % (e.__class__, e))
traceback.print_exc()
try:
t.close()
diff --git a/demos/demo_simple.py b/demos/demo_simple.py
index 9def57f8..5dd4f6c1 100644
--- a/demos/demo_simple.py
+++ b/demos/demo_simple.py
@@ -28,6 +28,7 @@ import traceback
from paramiko.py3compat import input
import paramiko
+
try:
import interactive
except ImportError:
@@ -35,39 +36,43 @@ except ImportError:
# setup logging
-paramiko.util.log_to_file('demo_simple.log')
+paramiko.util.log_to_file("demo_simple.log")
# Paramiko client configuration
-UseGSSAPI = paramiko.GSS_AUTH_AVAILABLE # enable "gssapi-with-mic" authentication, if supported by your python installation
-DoGSSAPIKeyExchange = paramiko.GSS_AUTH_AVAILABLE # enable "gssapi-kex" key exchange, if supported by your python installation
+UseGSSAPI = (
+ paramiko.GSS_AUTH_AVAILABLE
+) # enable "gssapi-with-mic" authentication, if supported by your python installation
+DoGSSAPIKeyExchange = (
+ paramiko.GSS_AUTH_AVAILABLE
+) # enable "gssapi-kex" key exchange, if supported by your python installation
# UseGSSAPI = False
# DoGSSAPIKeyExchange = False
port = 22
# get hostname
-username = ''
+username = ""
if len(sys.argv) > 1:
hostname = sys.argv[1]
- if hostname.find('@') >= 0:
- username, hostname = hostname.split('@')
+ if hostname.find("@") >= 0:
+ username, hostname = hostname.split("@")
else:
- hostname = input('Hostname: ')
+ hostname = input("Hostname: ")
if len(hostname) == 0:
- print('*** Hostname required.')
+ print("*** Hostname required.")
sys.exit(1)
-if hostname.find(':') >= 0:
- hostname, portstr = hostname.split(':')
+if hostname.find(":") >= 0:
+ hostname, portstr = hostname.split(":")
port = int(portstr)
# get username
-if username == '':
+if username == "":
default_username = getpass.getuser()
- username = input('Username [%s]: ' % default_username)
+ username = input("Username [%s]: " % default_username)
if len(username) == 0:
username = default_username
if not UseGSSAPI and not DoGSSAPIKeyExchange:
- password = getpass.getpass('Password for %s@%s: ' % (username, hostname))
+ password = getpass.getpass("Password for %s@%s: " % (username, hostname))
# now, connect and use paramiko Client to negotiate SSH2 across the connection
@@ -75,27 +80,34 @@ try:
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.WarningPolicy())
- print('*** Connecting...')
+ print("*** Connecting...")
if not UseGSSAPI and not DoGSSAPIKeyExchange:
client.connect(hostname, port, username, password)
else:
try:
- client.connect(hostname, port, username, gss_auth=UseGSSAPI,
- gss_kex=DoGSSAPIKeyExchange)
+ client.connect(
+ hostname,
+ port,
+ username,
+ gss_auth=UseGSSAPI,
+ gss_kex=DoGSSAPIKeyExchange,
+ )
except Exception:
# traceback.print_exc()
- password = getpass.getpass('Password for %s@%s: ' % (username, hostname))
+ password = getpass.getpass(
+ "Password for %s@%s: " % (username, hostname)
+ )
client.connect(hostname, port, username, password)
chan = client.invoke_shell()
print(repr(client.get_transport()))
- print('*** Here we go!\n')
+ print("*** Here we go!\n")
interactive.interactive_shell(chan)
chan.close()
client.close()
except Exception as e:
- print('*** Caught exception: %s: %s' % (e.__class__, e))
+ print("*** Caught exception: %s: %s" % (e.__class__, e))
traceback.print_exc()
try:
client.close()
diff --git a/demos/forward.py b/demos/forward.py
index 96e1700d..98757911 100644
--- a/demos/forward.py
+++ b/demos/forward.py
@@ -30,6 +30,7 @@ import getpass
import os
import socket
import select
+
try:
import SocketServer
except ImportError:
@@ -46,30 +47,41 @@ DEFAULT_PORT = 4000
g_verbose = True
-class ForwardServer (SocketServer.ThreadingTCPServer):
+class ForwardServer(SocketServer.ThreadingTCPServer):
daemon_threads = True
allow_reuse_address = True
-
-class Handler (SocketServer.BaseRequestHandler):
+
+class Handler(SocketServer.BaseRequestHandler):
def handle(self):
try:
- chan = self.ssh_transport.open_channel('direct-tcpip',
- (self.chain_host, self.chain_port),
- self.request.getpeername())
+ chan = self.ssh_transport.open_channel(
+ "direct-tcpip",
+ (self.chain_host, self.chain_port),
+ self.request.getpeername(),
+ )
except Exception as e:
- verbose('Incoming request to %s:%d failed: %s' % (self.chain_host,
- self.chain_port,
- repr(e)))
+ verbose(
+ "Incoming request to %s:%d failed: %s"
+ % (self.chain_host, self.chain_port, repr(e))
+ )
return
if chan is None:
- verbose('Incoming request to %s:%d was rejected by the SSH server.' %
- (self.chain_host, self.chain_port))
+ verbose(
+ "Incoming request to %s:%d was rejected by the SSH server."
+ % (self.chain_host, self.chain_port)
+ )
return
- verbose('Connected! Tunnel open %r -> %r -> %r' % (self.request.getpeername(),
- chan.getpeername(), (self.chain_host, self.chain_port)))
+ verbose(
+ "Connected! Tunnel open %r -> %r -> %r"
+ % (
+ self.request.getpeername(),
+ chan.getpeername(),
+ (self.chain_host, self.chain_port),
+ )
+ )
while True:
r, w, x = select.select([self.request, chan], [], [])
if self.request in r:
@@ -82,22 +94,23 @@ class Handler (SocketServer.BaseRequestHandler):
if len(data) == 0:
break
self.request.send(data)
-
+
peername = self.request.getpeername()
chan.close()
self.request.close()
- verbose('Tunnel closed from %r' % (peername,))
+ verbose("Tunnel closed from %r" % (peername,))
def forward_tunnel(local_port, remote_host, remote_port, transport):
# this is a little convoluted, but lets me configure things for the Handler
# object. (SocketServer doesn't give Handlers any way to access the outer
# server normally.)
- class SubHander (Handler):
+ class SubHander(Handler):
chain_host = remote_host
chain_port = remote_port
ssh_transport = transport
- ForwardServer(('', local_port), SubHander).serve_forever()
+
+ ForwardServer(("", local_port), SubHander).serve_forever()
def verbose(s):
@@ -114,40 +127,88 @@ the SSH server. This is similar to the openssh -L option.
def get_host_port(spec, default_port):
"parse 'hostname:22' into a host and port, with the port optional"
- args = (spec.split(':', 1) + [default_port])[:2]
+ args = (spec.split(":", 1) + [default_port])[:2]
args[1] = int(args[1])
return args[0], args[1]
def parse_options():
global g_verbose
-
- parser = OptionParser(usage='usage: %prog [options] <ssh-server>[:<server-port>]',
- version='%prog 1.0', description=HELP)
- parser.add_option('-q', '--quiet', action='store_false', dest='verbose', default=True,
- help='squelch all informational output')
- parser.add_option('-p', '--local-port', action='store', type='int', dest='port',
- default=DEFAULT_PORT,
- help='local port to forward (default: %d)' % DEFAULT_PORT)
- parser.add_option('-u', '--user', action='store', type='string', dest='user',
- default=getpass.getuser(),
- help='username for SSH authentication (default: %s)' % getpass.getuser())
- parser.add_option('-K', '--key', action='store', type='string', dest='keyfile',
- default=None,
- help='private key file to use for SSH authentication')
- parser.add_option('', '--no-key', action='store_false', dest='look_for_keys', default=True,
- help='don\'t look for or use a private key file')
- parser.add_option('-P', '--password', action='store_true', dest='readpass', default=False,
- help='read password (for key or password auth) from stdin')
- parser.add_option('-r', '--remote', action='store', type='string', dest='remote', default=None, metavar='host:port',
- help='remote host and port to forward to')
+
+ parser = OptionParser(
+ usage="usage: %prog [options] <ssh-server>[:<server-port>]",
+ version="%prog 1.0",
+ description=HELP,
+ )
+ parser.add_option(
+ "-q",
+ "--quiet",
+ action="store_false",
+ dest="verbose",
+ default=True,
+ help="squelch all informational output",
+ )
+ parser.add_option(
+ "-p",
+ "--local-port",
+ action="store",
+ type="int",
+ dest="port",
+ default=DEFAULT_PORT,
+ help="local port to forward (default: %d)" % DEFAULT_PORT,
+ )
+ parser.add_option(
+ "-u",
+ "--user",
+ action="store",
+ type="string",
+ dest="user",
+ default=getpass.getuser(),
+ help="username for SSH authentication (default: %s)"
+ % getpass.getuser(),
+ )
+ parser.add_option(
+ "-K",
+ "--key",
+ action="store",
+ type="string",
+ dest="keyfile",
+ default=None,
+ help="private key file to use for SSH authentication",
+ )
+ parser.add_option(
+ "",
+ "--no-key",
+ action="store_false",
+ dest="look_for_keys",
+ default=True,
+ help="don't look for or use a private key file",
+ )
+ parser.add_option(
+ "-P",
+ "--password",
+ action="store_true",
+ dest="readpass",
+ default=False,
+ help="read password (for key or password auth) from stdin",
+ )
+ parser.add_option(
+ "-r",
+ "--remote",
+ action="store",
+ type="string",
+ dest="remote",
+ default=None,
+ metavar="host:port",
+ help="remote host and port to forward to",
+ )
options, args = parser.parse_args()
if len(args) != 1:
- parser.error('Incorrect number of arguments.')
+ parser.error("Incorrect number of arguments.")
if options.remote is None:
- parser.error('Remote address required (-r).')
-
+ parser.error("Remote address required (-r).")
+
g_verbose = options.verbose
server_host, server_port = get_host_port(args[0], SSH_PORT)
remote_host, remote_port = get_host_port(options.remote, SSH_PORT)
@@ -156,31 +217,42 @@ def parse_options():
def main():
options, server, remote = parse_options()
-
+
password = None
if options.readpass:
- password = getpass.getpass('Enter SSH password: ')
-
+ password = getpass.getpass("Enter SSH password: ")
+
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.WarningPolicy())
- verbose('Connecting to ssh host %s:%d ...' % (server[0], server[1]))
+ verbose("Connecting to ssh host %s:%d ..." % (server[0], server[1]))
try:
- client.connect(server[0], server[1], username=options.user, key_filename=options.keyfile,
- look_for_keys=options.look_for_keys, password=password)
+ client.connect(
+ server[0],
+ server[1],
+ username=options.user,
+ key_filename=options.keyfile,
+ look_for_keys=options.look_for_keys,
+ password=password,
+ )
except Exception as e:
- print('*** Failed to connect to %s:%d: %r' % (server[0], server[1], e))
+ print("*** Failed to connect to %s:%d: %r" % (server[0], server[1], e))
sys.exit(1)
- verbose('Now forwarding port %d to %s:%d ...' % (options.port, remote[0], remote[1]))
+ verbose(
+ "Now forwarding port %d to %s:%d ..."
+ % (options.port, remote[0], remote[1])
+ )
try:
- forward_tunnel(options.port, remote[0], remote[1], client.get_transport())
+ forward_tunnel(
+ options.port, remote[0], remote[1], client.get_transport()
+ )
except KeyboardInterrupt:
- print('C-c: Port forwarding stopped.')
+ print("C-c: Port forwarding stopped.")
sys.exit(0)
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
diff --git a/demos/interactive.py b/demos/interactive.py
index 7138cd6c..037787c4 100644
--- a/demos/interactive.py
+++ b/demos/interactive.py
@@ -25,6 +25,7 @@ from paramiko.py3compat import u
try:
import termios
import tty
+
has_termios = True
except ImportError:
has_termios = False
@@ -39,7 +40,7 @@ def interactive_shell(chan):
def posix_shell(chan):
import select
-
+
oldtty = termios.tcgetattr(sys.stdin)
try:
tty.setraw(sys.stdin.fileno())
@@ -52,7 +53,7 @@ def posix_shell(chan):
try:
x = u(chan.recv(1024))
if len(x) == 0:
- sys.stdout.write('\r\n*** EOF\r\n')
+ sys.stdout.write("\r\n*** EOF\r\n")
break
sys.stdout.write(x)
sys.stdout.flush()
@@ -67,26 +68,28 @@ def posix_shell(chan):
finally:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
-
+
# thanks to Mike Looijmans for this code
def windows_shell(chan):
import threading
- sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")
-
+ sys.stdout.write(
+ "Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n"
+ )
+
def writeall(sock):
while True:
data = sock.recv(256)
if not data:
- sys.stdout.write('\r\n*** EOF ***\r\n\r\n')
+ sys.stdout.write("\r\n*** EOF ***\r\n\r\n")
sys.stdout.flush()
break
sys.stdout.write(data)
sys.stdout.flush()
-
+
writer = threading.Thread(target=writeall, args=(chan,))
writer.start()
-
+
try:
while True:
d = sys.stdin.read(1)
diff --git a/demos/rforward.py b/demos/rforward.py
index ae70670c..a2e8a776 100755
--- a/demos/rforward.py
+++ b/demos/rforward.py
@@ -47,11 +47,13 @@ def handler(chan, host, port):
try:
sock.connect((host, port))
except Exception as e:
- verbose('Forwarding request to %s:%d failed: %r' % (host, port, e))
+ verbose("Forwarding request to %s:%d failed: %r" % (host, port, e))
return
-
- verbose('Connected! Tunnel open %r -> %r -> %r' % (chan.origin_addr,
- chan.getpeername(), (host, port)))
+
+ verbose(
+ "Connected! Tunnel open %r -> %r -> %r"
+ % (chan.origin_addr, chan.getpeername(), (host, port))
+ )
while True:
r, w, x = select.select([sock, chan], [], [])
if sock in r:
@@ -66,16 +68,18 @@ def handler(chan, host, port):
sock.send(data)
chan.close()
sock.close()
- verbose('Tunnel closed from %r' % (chan.origin_addr,))
+ verbose("Tunnel closed from %r" % (chan.origin_addr,))
def reverse_forward_tunnel(server_port, remote_host, remote_port, transport):
- transport.request_port_forward('', server_port)
+ transport.request_port_forward("", server_port)
while True:
chan = transport.accept(1000)
if chan is None:
continue
- thr = threading.Thread(target=handler, args=(chan, remote_host, remote_port))
+ thr = threading.Thread(
+ target=handler, args=(chan, remote_host, remote_port)
+ )
thr.setDaemon(True)
thr.start()
@@ -95,40 +99,88 @@ network. This is similar to the openssh -R option.
def get_host_port(spec, default_port):
"parse 'hostname:22' into a host and port, with the port optional"
- args = (spec.split(':', 1) + [default_port])[:2]
+ args = (spec.split(":", 1) + [default_port])[:2]
args[1] = int(args[1])
return args[0], args[1]
def parse_options():
global g_verbose
-
- parser = OptionParser(usage='usage: %prog [options] <ssh-server>[:<server-port>]',
- version='%prog 1.0', description=HELP)
- parser.add_option('-q', '--quiet', action='store_false', dest='verbose', default=True,
- help='squelch all informational output')
- parser.add_option('-p', '--remote-port', action='store', type='int', dest='port',
- default=DEFAULT_PORT,
- help='port on server to forward (default: %d)' % DEFAULT_PORT)
- parser.add_option('-u', '--user', action='store', type='string', dest='user',
- default=getpass.getuser(),
- help='username for SSH authentication (default: %s)' % getpass.getuser())
- parser.add_option('-K', '--key', action='store', type='string', dest='keyfile',
- default=None,
- help='private key file to use for SSH authentication')
- parser.add_option('', '--no-key', action='store_false', dest='look_for_keys', default=True,
- help='don\'t look for or use a private key file')
- parser.add_option('-P', '--password', action='store_true', dest='readpass', default=False,
- help='read password (for key or password auth) from stdin')
- parser.add_option('-r', '--remote', action='store', type='string', dest='remote', default=None, metavar='host:port',
- help='remote host and port to forward to')
+
+ parser = OptionParser(
+ usage="usage: %prog [options] <ssh-server>[:<server-port>]",
+ version="%prog 1.0",
+ description=HELP,
+ )
+ parser.add_option(
+ "-q",
+ "--quiet",
+ action="store_false",
+ dest="verbose",
+ default=True,
+ help="squelch all informational output",
+ )
+ parser.add_option(
+ "-p",
+ "--remote-port",
+ action="store",
+ type="int",
+ dest="port",
+ default=DEFAULT_PORT,
+ help="port on server to forward (default: %d)" % DEFAULT_PORT,
+ )
+ parser.add_option(
+ "-u",
+ "--user",
+ action="store",
+ type="string",
+ dest="user",
+ default=getpass.getuser(),
+ help="username for SSH authentication (default: %s)"
+ % getpass.getuser(),
+ )
+ parser.add_option(
+ "-K",
+ "--key",
+ action="store",
+ type="string",
+ dest="keyfile",
+ default=None,
+ help="private key file to use for SSH authentication",
+ )
+ parser.add_option(
+ "",
+ "--no-key",
+ action="store_false",
+ dest="look_for_keys",
+ default=True,
+ help="don't look for or use a private key file",
+ )
+ parser.add_option(
+ "-P",
+ "--password",
+ action="store_true",
+ dest="readpass",
+ default=False,
+ help="read password (for key or password auth) from stdin",
+ )
+ parser.add_option(
+ "-r",
+ "--remote",
+ action="store",
+ type="string",
+ dest="remote",
+ default=None,
+ metavar="host:port",
+ help="remote host and port to forward to",
+ )
options, args = parser.parse_args()
if len(args) != 1:
- parser.error('Incorrect number of arguments.')
+ parser.error("Incorrect number of arguments.")
if options.remote is None:
- parser.error('Remote address required (-r).')
-
+ parser.error("Remote address required (-r).")
+
g_verbose = options.verbose
server_host, server_port = get_host_port(args[0], SSH_PORT)
remote_host, remote_port = get_host_port(options.remote, SSH_PORT)
@@ -137,31 +189,42 @@ def parse_options():
def main():
options, server, remote = parse_options()
-
+
password = None
if options.readpass:
- password = getpass.getpass('Enter SSH password: ')
-
+ password = getpass.getpass("Enter SSH password: ")
+
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.WarningPolicy())
- verbose('Connecting to ssh host %s:%d ...' % (server[0], server[1]))
+ verbose("Connecting to ssh host %s:%d ..." % (server[0], server[1]))
try:
- client.connect(server[0], server[1], username=options.user, key_filename=options.keyfile,
- look_for_keys=options.look_for_keys, password=password)
+ client.connect(
+ server[0],
+ server[1],
+ username=options.user,
+ key_filename=options.keyfile,
+ look_for_keys=options.look_for_keys,
+ password=password,
+ )
except Exception as e:
- print('*** Failed to connect to %s:%d: %r' % (server[0], server[1], e))
+ print("*** Failed to connect to %s:%d: %r" % (server[0], server[1], e))
sys.exit(1)
- verbose('Now forwarding remote port %d to %s:%d ...' % (options.port, remote[0], remote[1]))
+ verbose(
+ "Now forwarding remote port %d to %s:%d ..."
+ % (options.port, remote[0], remote[1])
+ )
try:
- reverse_forward_tunnel(options.port, remote[0], remote[1], client.get_transport())
+ reverse_forward_tunnel(
+ options.port, remote[0], remote[1], client.get_transport()
+ )
except KeyboardInterrupt:
- print('C-c: Port forwarding stopped.')
+ print("C-c: Port forwarding stopped.")
sys.exit(0)
-if __name__ == '__main__':
+if __name__ == "__main__":
main()