- Replace autoconf/make build system with CMake (installs to /opt/archie) - Add CPack DEB packaging for Debian Trixie (non-free/net, postinst creates archie user, extracts DB skeleton, sets setuid bits, enables systemd units) - Add Gitea Actions workflow building .deb + binary/source tarballs on tag push - Add portable archie_init.py for non-Debian post-install setup - Port all scripts to Linux: getent passwd, systemctl, tail -n +N, gzip - Add SFTP (libssh2) and FTPS (OpenSSL) scrapers alongside anonftp - Add Flask web frontend (archie-web.service) - Fix filter scripts (exec cat replaces broken sed s///g) - Update all manpages: paths, contacts, add SFTP/FTPS section - Update etc/: enable gzip, add webindex catalog, fix localhost refs - Remove: AIX-2/SunOS-4.1.4/SunOS-5.4 dirs, tcl7.6/, tcl-dp/, tk4.2/, berkdb/, old Makefile.in/pre/post fragments, build.sh, unwrap scripts - Add .gitignore Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
255 lines
7.7 KiB
Python
255 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Minimal SFTP test server using paramiko (like rspivak/sftpserver).
|
|
|
|
Usage:
|
|
python3 sftp_server.py [--host 127.0.0.1] [--port 2222] [--root /tmp/sftp-root]
|
|
[--keyfile /tmp/sftp_host.key]
|
|
|
|
Credentials: testuser / testpass
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import socket
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
import paramiko
|
|
|
|
|
|
class StubSFTPHandle(paramiko.SFTPHandle):
|
|
def stat(self):
|
|
try:
|
|
return paramiko.SFTPAttributes.from_stat(os.fstat(self.readfile.fileno()))
|
|
except OSError as e:
|
|
return paramiko.SFTPServer.convert_errno(e.errno)
|
|
|
|
def chattr(self, attr):
|
|
return paramiko.SFTP_OK
|
|
|
|
|
|
class StubSFTPServer(paramiko.SFTPServerInterface):
|
|
def __init__(self, channel, root, *args, **kwargs):
|
|
self.root = root
|
|
super().__init__(channel, *args, **kwargs)
|
|
|
|
def _realpath(self, path):
|
|
return self.root + self.canonicalize(path)
|
|
|
|
def list_folder(self, path):
|
|
path = self._realpath(path)
|
|
try:
|
|
out = []
|
|
for fname in os.listdir(path):
|
|
attr = paramiko.SFTPAttributes.from_stat(
|
|
os.stat(os.path.join(path, fname)))
|
|
attr.filename = fname
|
|
out.append(attr)
|
|
return out
|
|
except OSError as e:
|
|
return paramiko.SFTPServer.convert_errno(e.errno)
|
|
|
|
def stat(self, path):
|
|
path = self._realpath(path)
|
|
try:
|
|
return paramiko.SFTPAttributes.from_stat(os.stat(path))
|
|
except OSError as e:
|
|
return paramiko.SFTPServer.convert_errno(e.errno)
|
|
|
|
def lstat(self, path):
|
|
path = self._realpath(path)
|
|
try:
|
|
return paramiko.SFTPAttributes.from_stat(os.lstat(path))
|
|
except OSError as e:
|
|
return paramiko.SFTPServer.convert_errno(e.errno)
|
|
|
|
def open(self, path, flags, attr):
|
|
path = self._realpath(path)
|
|
try:
|
|
binary_flag = getattr(os, 'O_BINARY', 0)
|
|
flags |= binary_flag
|
|
mode = getattr(attr, 'st_mode', None)
|
|
if mode is not None:
|
|
fd = os.open(path, flags, mode)
|
|
else:
|
|
fd = os.open(path, flags, 0o666)
|
|
except OSError as e:
|
|
return paramiko.SFTPServer.convert_errno(e.errno)
|
|
|
|
if (flags & os.O_CREAT) and (attr is not None):
|
|
attr._flags &= ~attr.FLAG_PERMISSIONS
|
|
paramiko.SFTPServer.set_file_attr(path, attr)
|
|
|
|
if flags & os.O_WRONLY:
|
|
fstr = 'wb'
|
|
elif flags & os.O_RDWR:
|
|
fstr = 'r+b'
|
|
else:
|
|
fstr = 'rb'
|
|
|
|
try:
|
|
f = os.fdopen(fd, fstr)
|
|
except OSError as e:
|
|
return paramiko.SFTPServer.convert_errno(e.errno)
|
|
|
|
fobj = StubSFTPHandle(flags)
|
|
fobj.filename = path
|
|
fobj.readfile = f
|
|
fobj.writefile = f
|
|
return fobj
|
|
|
|
def remove(self, path):
|
|
path = self._realpath(path)
|
|
try:
|
|
os.remove(path)
|
|
except OSError as e:
|
|
return paramiko.SFTPServer.convert_errno(e.errno)
|
|
return paramiko.SFTP_OK
|
|
|
|
def rename(self, oldpath, newpath):
|
|
oldpath = self._realpath(oldpath)
|
|
newpath = self._realpath(newpath)
|
|
try:
|
|
os.rename(oldpath, newpath)
|
|
except OSError as e:
|
|
return paramiko.SFTPServer.convert_errno(e.errno)
|
|
return paramiko.SFTP_OK
|
|
|
|
def mkdir(self, path, attr):
|
|
path = self._realpath(path)
|
|
try:
|
|
os.mkdir(path)
|
|
if attr is not None:
|
|
paramiko.SFTPServer.set_file_attr(path, attr)
|
|
except OSError as e:
|
|
return paramiko.SFTPServer.convert_errno(e.errno)
|
|
return paramiko.SFTP_OK
|
|
|
|
def rmdir(self, path):
|
|
path = self._realpath(path)
|
|
try:
|
|
os.rmdir(path)
|
|
except OSError as e:
|
|
return paramiko.SFTPServer.convert_errno(e.errno)
|
|
return paramiko.SFTP_OK
|
|
|
|
def chattr(self, path, attr):
|
|
path = self._realpath(path)
|
|
try:
|
|
paramiko.SFTPServer.set_file_attr(path, attr)
|
|
except OSError as e:
|
|
return paramiko.SFTPServer.convert_errno(e.errno)
|
|
return paramiko.SFTP_OK
|
|
|
|
def symlink(self, target_path, path):
|
|
path = self._realpath(path)
|
|
if (len(target_path) > 0) and (target_path[0] == '/'):
|
|
target_path = self.root + target_path
|
|
try:
|
|
os.symlink(target_path, path)
|
|
except OSError as e:
|
|
return paramiko.SFTPServer.convert_errno(e.errno)
|
|
return paramiko.SFTP_OK
|
|
|
|
def readlink(self, path):
|
|
path = self._realpath(path)
|
|
try:
|
|
symlink = os.readlink(path)
|
|
except OSError as e:
|
|
return paramiko.SFTPServer.convert_errno(e.errno)
|
|
if symlink.startswith(self.root):
|
|
symlink = symlink[len(self.root):]
|
|
return symlink
|
|
|
|
|
|
class ServerInterface(paramiko.ServerInterface):
|
|
def __init__(self, username, password):
|
|
self._username = username
|
|
self._password = password
|
|
|
|
def check_auth_password(self, username, password):
|
|
if username == self._username and password == self._password:
|
|
return paramiko.AUTH_SUCCESSFUL
|
|
return paramiko.AUTH_FAILED
|
|
|
|
def check_auth_publickey(self, username, key):
|
|
return paramiko.AUTH_FAILED
|
|
|
|
def get_allowed_auths(self, username):
|
|
return "password"
|
|
|
|
def check_channel_request(self, kind, chanid):
|
|
return paramiko.OPEN_SUCCEEDED
|
|
|
|
|
|
def handle_client(conn, host_key, root, username, password):
|
|
try:
|
|
transport = paramiko.Transport(conn)
|
|
transport.add_server_key(host_key)
|
|
|
|
server_iface = ServerInterface(username, password)
|
|
transport.set_subsystem_handler(
|
|
"sftp", paramiko.SFTPServer,
|
|
sftp_si=lambda server, *a, **kw: StubSFTPServer(server, root, *a, **kw)
|
|
)
|
|
transport.start_server(server=server_iface)
|
|
|
|
chan = transport.accept(60)
|
|
if chan is None:
|
|
transport.close()
|
|
return
|
|
# Keep transport alive until the client disconnects (paramiko 4.x removed chan.event)
|
|
while transport.is_active():
|
|
time.sleep(0.5)
|
|
transport.close()
|
|
except Exception:
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--host", default="127.0.0.1")
|
|
ap.add_argument("--port", type=int, default=2222)
|
|
ap.add_argument("--root", default="/tmp/sftp-root")
|
|
ap.add_argument("--keyfile", default="/tmp/sftp_host.key")
|
|
ap.add_argument("--user", default="testuser")
|
|
ap.add_argument("--password", default="testpass")
|
|
args = ap.parse_args()
|
|
|
|
os.makedirs(args.root, exist_ok=True)
|
|
|
|
if not os.path.exists(args.keyfile):
|
|
subprocess.run(
|
|
["ssh-keygen", "-t", "rsa", "-b", "2048", "-f", args.keyfile, "-N", ""],
|
|
check=True, capture_output=True
|
|
)
|
|
print(f"Generated host key: {args.keyfile}", flush=True)
|
|
|
|
host_key = paramiko.RSAKey.from_private_key_file(args.keyfile)
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
|
|
sock.bind((args.host, args.port))
|
|
sock.listen(10)
|
|
print(f"SFTP server listening on {args.host}:{args.port}, root={args.root}",
|
|
flush=True)
|
|
|
|
while True:
|
|
conn, addr = sock.accept()
|
|
t = threading.Thread(
|
|
target=handle_client,
|
|
args=(conn, host_key, args.root, args.user, args.password),
|
|
daemon=True,
|
|
)
|
|
t.start()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.WARNING)
|
|
main()
|