summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--tests/stub_sftp.py192
1 files changed, 192 insertions, 0 deletions
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py
new file mode 100644
index 00000000..e1dbb97f
--- /dev/null
+++ b/tests/stub_sftp.py
@@ -0,0 +1,192 @@
+#!/usr/bin/python
+
+# Copyright (C) 2003-2004 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+A stub SFTP server for loopback SFTP testing.
+"""
+
+import os
+from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \
+ SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED
+
+
+class StubServer (ServerInterface):
+ def check_auth_password(self, username, password):
+ # all are allowed
+ return AUTH_SUCCESSFUL
+
+ def check_channel_request(self, kind, chanid):
+ return OPEN_SUCCEEDED
+
+
+class StubSFTPHandle (SFTPHandle):
+ def stat(self):
+ try:
+ return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno()))
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+
+ def chattr(self, attr):
+ # python doesn't have equivalents to fchown or fchmod, so we have to
+ # use the stored filename
+ try:
+ SFTPServer.set_file_attr(self.filename, attr)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+
+
+class StubSFTPServer (SFTPServerInterface):
+ # assume current folder is a fine root
+ # (the tests always create and eventualy delete a subfolder, so there shouldn't be any mess)
+ ROOT = os.getcwd()
+
+ def _realpath(self, path):
+ return self.ROOT + self.canonicalize(path)
+
+ def list_folder(self, path):
+ path = self._realpath(path)
+ try:
+ out = [ ]
+ flist = os.listdir(path)
+ for fname in flist:
+ attr = SFTPAttributes.from_stat(os.stat(os.path.join(path, fname)))
+ attr.filename = fname
+ out.append(attr)
+ return out
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+
+ def stat(self, path):
+ path = self._realpath(path)
+ try:
+ return SFTPAttributes.from_stat(os.stat(path))
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+
+ def lstat(self, path):
+ path = self._realpath(path)
+ try:
+ return SFTPAttributes.from_stat(os.lstat(path))
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+
+ def open(self, path, flags, attr):
+ path = self._realpath(path)
+ try:
+ fd = os.open(path, flags)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ if (flags & os.O_CREAT) and (attr is not None):
+ SFTPServer.set_file_attr(path, attr)
+ if flags & os.O_WRONLY:
+ fstr = 'w'
+ elif flags & os.O_RDWR:
+ fstr = 'r+'
+ else:
+ # O_RDONLY (== 0)
+ fstr = 'r'
+ try:
+ f = os.fdopen(fd, fstr)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ fobj = StubSFTPHandle()
+ fobj.filename = path
+ fobj.readfile = f
+ fobj.writefile = f
+ return fobj
+
+ def remove(self, path):
+ path = self._realpath(path)
+ try:
+ os.remove(path)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def rename(self, oldpath, newpath):
+ oldpath = self._realpath(oldpath)
+ newpath = self._realpath(newpath)
+ try:
+ os.rename(oldpath, newpath)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def mkdir(self, path, attr):
+ path = self._realpath(path)
+ try:
+ os.mkdir(path)
+ if attr is not None:
+ SFTPServer.set_file_attr(path, attr)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def rmdir(self, path):
+ path = self._realpath(path)
+ try:
+ os.rmdir(path)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def chattr(self, path, attr):
+ path = self._realpath(path)
+ try:
+ SFTPServer.set_file_attr(path, attr)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def symlink(self, target_path, path):
+ path = self._realpath(path)
+ if (len(target_path) > 0) and (target_path[0] == '/'):
+ # absolute symlink
+ target_path = os.path.join(self.ROOT, target_path[1:])
+ if target_path[:2] == '//':
+ # bug in os.path.join
+ target_path = target_path[1:]
+ else:
+ # compute relative to path
+ abspath = os.path.join(os.path.dirname(path), target_path)
+ if abspath[:len(self.ROOT)] != self.ROOT:
+ # this symlink isn't going to work anyway -- just break it immediately
+ target_path = '<error>'
+ try:
+ os.symlink(target_path, path)
+ except:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def readlink(self, path):
+ path = self._realpath(path)
+ try:
+ symlink = os.readlink(path)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ # if it's absolute, remove the root
+ if os.path.isabs(symlink):
+ if symlink[:len(self.ROOT)] == self.ROOT:
+ symlink = symlink[len(self.ROOT):]
+ if (len(symlink) == 0) or (symlink[0] != '/'):
+ symlink = '/' + symlink
+ else:
+ symlink = '<error>'
+ return symlink