Files
mars-nwe/tests/afp/afp_endpoint_inventory.py
ai 92b0c4a34a
All checks were successful
Source release / source-package (push) Successful in 55s
afp: add deleted file Macintosh info endpoint
2026-06-01 11:07:43 +02:00

391 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""Inventory mars_nwe AFP endpoint coverage before the final WebSDK/header audit.
The helper is intentionally read-only. It scans src/nwconn.c and prints a
compact table of AFP subfunction names, likely handler functions, implementation
status, and the mars_nwe backend families touched by each handler.
It does not replace the final manual WebSDK / Novell header comparison. Its
job is to make that review reproducible and to flag drift-prone markers such as
old transition labels, Netatalk/libatalk references, or AFP-local shortcuts.
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
CALL_NAME_RE = re.compile(
r"case\s+0x([0-9a-fA-F]+)\s*:\s*return\s*\(\s*\"([^\"]+)\"\s*\)\s*;"
)
STATIC_AFP_FUNC_RE = re.compile(
r"static\s+int\s+(afp_[A-Za-z0-9_]+)\s*\([^)]*\)\s*\{", re.MULTILINE
)
# These aliases deliberately mirror the documented NetWare AFP subfunction
# table. They make the report stable for endpoints that use shared helper
# functions or generic dispatch wrappers where the source function name is not a
# direct mechanical spelling of afp_call_name(). Each candidate is used only if
# the function exists in the scanned source.
HANDLER_CANDIDATES_BY_CALL: Dict[int, Tuple[str, ...]] = {
0x01: ("afp_create_directory", "afp_create_directory_or_file", "afp_create_file_or_directory", "afp_create_file_dir"),
0x02: ("afp_create_file", "afp_create_directory_or_file", "afp_create_file_or_directory", "afp_create_file_dir"),
0x03: ("afp_delete", "afp_delete_file_or_directory", "afp_delete_file_dir"),
0x04: ("afp_get_entry_id_from_name",),
0x05: ("afp_get_file_information",),
0x06: ("afp_get_entry_id_from_netware_handle",),
0x07: ("afp_rename", "afp_rename_file_or_directory", "afp_rename_file_dir"),
0x08: ("afp_open_file_fork",),
0x09: ("afp_set_file_information",),
0x0A: ("afp_scan_file_information",),
0x0B: ("afp_alloc_temporary_dir_handle",),
0x0C: ("afp_get_entry_id_from_path_name",),
0x0D: ("afp_create_directory", "afp_create_directory_or_file", "afp_create_file_or_directory", "afp_create_file_dir"),
0x0E: ("afp_create_file", "afp_create_directory_or_file", "afp_create_file_or_directory", "afp_create_file_dir"),
0x0F: ("afp_get_file_information",),
0x10: ("afp_set_file_information",),
0x11: ("afp_scan_file_information",),
0x12: ("afp_get_dos_name_from_entry_id",),
0x13: (),
}
# Some AFP endpoints are implemented directly in the AFP dispatch switch rather
# than as standalone static afp_* functions. Keep those visible in the report
# so the final audit can focus on WebSDK/header semantics instead of parser
# limitations. These labels are intentionally conservative: they describe the
# implementation shape and backend families, not final compatibility status.
INLINE_ENDPOINTS: Dict[int, Tuple[str, str, Tuple[str, ...]]] = {
0x01: ("inline AFP create directory case", "implemented", ("mars_nwe path/namespace", "mars_nwe object lifecycle")),
0x02: ("inline AFP create file case", "implemented", ("mars_nwe path/namespace", "mars_nwe object lifecycle", "AFP-only xattrs via nwatalk")),
0x03: ("inline AFP delete case", "implemented", ("mars_nwe path/namespace", "mars_nwe object lifecycle")),
0x07: ("inline AFP rename case", "implemented", ("mars_nwe path/namespace", "mars_nwe object lifecycle", "AFP-only xattrs via nwatalk")),
0x08: ("inline AFP open file fork case", "implemented", ("mars_nwe path/namespace", "mars_nwe file handles/I/O")),
0x09: ("inline AFP set file information case", "implemented", ("mars_nwe path/namespace", "mars_nwe attributes/archive/fileinfo", "mars_nwe trustee/rights", "AFP-only xattrs via nwatalk")),
0x0B: ("inline AFP alloc temporary dir handle case", "implemented", ("mars_nwe path/namespace", "mars_nwe directory handles")),
0x0D: ("inline AFP 2.0 create directory case", "implemented", ("mars_nwe path/namespace", "mars_nwe object lifecycle")),
0x0E: ("inline AFP 2.0 create file case", "implemented", ("mars_nwe path/namespace", "mars_nwe object lifecycle", "AFP-only xattrs via nwatalk")),
0x10: ("inline AFP 2.0 set file information case", "implemented", ("mars_nwe path/namespace", "mars_nwe attributes/archive/fileinfo", "mars_nwe trustee/rights", "AFP-only xattrs via nwatalk")),
0x13: ("inline AFP deleted-info case", "implemented", ("mars_nwe salvage/deleted-entry backend", "AFP FinderInfo snapshot")),
}
BACKEND_RULES: Tuple[Tuple[str, Tuple[str, ...]], ...] = (
("mars_nwe path/namespace", (
"conn_get_kpl_path",
"conn_get_kpl_unxname",
"afp_build_base_relative_path",
"afp_resolve_path_volume",
"afp_namespace_path_from_entry_id",
"afp_resolve_entry_id_path",
"afp_find_dos_name_from_entry_id",
"afp_fill_file_info_response",
"map_directory_number_to_path",
"name_2_base",
"build_dos_name",
"nw_get_directory_path",
)),
("mars_nwe directory handles", (
"nw_alloc_dir_handle",
"nw_open_dir_handle",
"nw_dealloc_dir_handle",
)),
("mars_nwe file handles/I/O", (
"nw_open_creat",
"nw_creat_open_file",
"nw_close_file",
"nw_read_file",
"nw_write_file",
"get_unix_name_from_fhandle",
"file_handle",
)),
("mars_nwe attributes/archive/fileinfo", (
"nw_get_fattrib",
"nw_set_fattrib",
"get_nw_attrib_dword",
"set_nw_attrib_dword",
"mars_nwe_get_archive_info",
"mars_nwe_set_archive_info",
"nwarchive_",
"nw_get_file_info",
"nw_set_file_info",
"nw_sattr",
)),
("mars_nwe trustee/rights", (
"tru_get_eff_rights",
"get_eff_rights",
"TRUSTEE_",
"no Modify rights",
)),
("mars_nwe object lifecycle", (
"nw_creat",
"nw_mk_rd_dir",
"nw_delete_files",
"mv_dir",
"nw_mv_dir_between_handles",
"unlink",
"rmdir",
)),
("AFP-only xattrs via nwatalk", (
"nwatalk_get_entry_id",
"nwatalk_set_entry_id",
"nwatalk_get_finder_info",
"nwatalk_set_finder_info",
"nwatalk_get_afp_attributes",
"nwatalk_set_afp_attributes",
)),
)
GLOBAL_MARKERS: Tuple[Tuple[str, str], ...] = (
("layout=websdk", "transient SetInfo layout marker should be gone"),
("libatalk", "Netatalk/libatalk backend reference should be gone"),
("Netatalk", "Netatalk/libatalk backend reference should be gone"),
("HAVE_NETATALK", "Netatalk build gate should be gone"),
("ENABLE_NETATALK", "Netatalk build gate should be gone"),
("entry-id-only lookup unsupported", "entry-id-only resolver gap may remain"),
("entry-id-only scan unsupported", "entry-id-only scan gap may remain"),
)
AFP_LOCAL_SHORTCUT_RE = re.compile(
r"\b(open|creat|read|write|close|rename|unlink|remove|rmdir|mkdir|"
r"getxattr|setxattr|removexattr)\s*\("
)
def repo_root_from(start: Path) -> Path:
cur = start.resolve()
while True:
if (cur / "src" / "nwconn.c").exists():
return cur
if cur.parent == cur:
raise SystemExit("cannot find repository root containing src/nwconn.c")
cur = cur.parent
def line_for_offset(text: str, offset: int) -> int:
return text.count("\n", 0, offset) + 1
def extract_function_bodies(text: str) -> Dict[str, Tuple[int, str]]:
bodies: Dict[str, Tuple[int, str]] = {}
for match in STATIC_AFP_FUNC_RE.finditer(text):
name = match.group(1)
start = match.end()
depth = 1
pos = start
while pos < len(text) and depth:
ch = text[pos]
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
pos += 1
bodies[name] = (line_for_offset(text, match.start()), text[start:pos - 1])
return bodies
def parse_dispatch(text: str, function_names: Iterable[str]) -> Dict[int, str]:
names = sorted(function_names, key=len, reverse=True)
dispatch: Dict[int, str] = {}
# The AFP dispatcher contains nested switch/if blocks, so a simple
# ``case ... break;`` regex often stops at an inner break and misses the
# real handler call. Walk each AFP case from its label to the next top-level
# case/default label and search that slice instead.
labels = list(re.finditer(r"^\s*case\s+0x([0-9a-fA-F]+)\s*:", text, re.MULTILINE))
for index, label in enumerate(labels):
start = label.end()
end = labels[index + 1].start() if index + 1 < len(labels) else len(text)
body = text[start:end]
for name in names:
if name in body:
dispatch[int(label.group(1), 16)] = name
break
return dispatch
def classify_status(body: str) -> str:
lowered = body.lower()
if "unsupported" in lowered and "return(-0x9c" in lowered:
return "unsupported subset"
if "rejected" in lowered and "return(-0x9c" in lowered:
return "partial / validates rejects"
if "not implemented" in lowered:
return "not implemented marker"
if body.strip():
return "implemented"
return "unknown"
def classify_backends(body: str) -> List[str]:
found: List[str] = []
for label, needles in BACKEND_RULES:
if any(needle in body for needle in needles):
found.append(label)
return found or ["needs manual review"]
def function_for_call(call_name: str, call_no: int, dispatch: Dict[int, str], bodies: Dict[str, Tuple[int, str]]) -> str:
if call_no == 0x13:
return ""
for candidate in HANDLER_CANDIDATES_BY_CALL.get(call_no, ()):
if candidate in bodies:
return candidate
if call_no in dispatch:
return dispatch[call_no]
# afp_call_name() returns human labels such as
# ``AFP 2.0 Get File Information``. Handler functions are named without
# the cosmetic prefix/version marker, e.g. ``afp_get_file_information``.
# Keep several conservative variants so the helper remains useful if a
# handler keeps an older shorter name.
normalized = re.sub(r"[^a-z0-9]+", "_", call_name.lower()).strip("_")
if normalized.startswith("afp_"):
normalized = normalized[4:]
without_version = normalized.replace("2_0_", "")
candidates = [
"afp_" + normalized,
"afp_" + without_version,
"afp_" + without_version.replace("file_information", "file_info"),
"afp_" + without_version.replace("directory", "dir"),
"afp_" + without_version.replace("netware", "netware"),
]
for candidate in candidates:
if candidate in bodies:
return candidate
return ""
def find_shortcuts(func_name: str, body: str) -> List[str]:
allowed_prefixes = (
"nw_",
"nwatalk_",
"nwxattr_",
"conn_",
"afp_",
"GET_",
"U32_",
"mem",
"str",
"XDPRINTF",
)
hits: List[str] = []
for match in AFP_LOCAL_SHORTCUT_RE.finditer(body):
name = match.group(1)
start = max(0, match.start() - 12)
prefix = body[start:match.start()]
token = prefix.split()[-1] if prefix.split() else ""
if token.endswith(allowed_prefixes):
continue
# stat/lstat may still be legitimate verification around mars_nwe
# path resolution, so report it as review material rather than failing.
hits.append(f"{func_name}: {name}()")
return sorted(set(hits))
def render_markdown(rows: List[dict], warnings: List[str]) -> str:
out: List[str] = []
out.append("# AFP endpoint inventory")
out.append("")
out.append("| Subfn | Name | Handler | Status | Backend notes |")
out.append("| --- | --- | --- | --- | --- |")
for row in rows:
out.append(
"| 0x{call_no:02x} | {name} | {handler} | {status} | {backends} |".format(
call_no=row["call_no"],
name=row["name"].replace("|", "\\|"),
handler=row["handler"] or "manual review",
status=row["status"],
backends="<br>".join(row["backends"]),
)
)
out.append("")
out.append("## Review warnings")
out.append("")
if warnings:
for warning in warnings:
out.append(f"- {warning}")
else:
out.append("- none")
out.append("")
return "\n".join(out)
def main(argv: List[str]) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--repo", type=Path, default=Path.cwd(), help="mars_nwe repository root")
parser.add_argument("--json", action="store_true", help="print JSON instead of markdown")
parser.add_argument("--strict", action="store_true", help="exit non-zero when warnings are found")
args = parser.parse_args(argv)
root = repo_root_from(args.repo)
nwconn = root / "src" / "nwconn.c"
text = nwconn.read_text(errors="replace")
call_names = {
int(call_no, 16): name
for call_no, name in CALL_NAME_RE.findall(text)
if name.startswith("AFP ")
}
bodies = extract_function_bodies(text)
dispatch = parse_dispatch(text, bodies)
rows: List[dict] = []
warnings: List[str] = []
for call_no in sorted(call_names):
name = call_names[call_no]
handler = function_for_call(name, call_no, dispatch, bodies)
line = None
body = ""
if handler and handler in bodies:
line, body = bodies[handler]
if body:
status = classify_status(body)
backends = classify_backends(body)
elif call_no in INLINE_ENDPOINTS:
inline_handler, status, inline_backends = INLINE_ENDPOINTS[call_no]
handler = inline_handler
backends = list(inline_backends)
else:
status = "unknown"
backends = ["needs manual review"]
rows.append({
"call_no": call_no,
"name": name,
"handler": handler,
"line": line,
"status": status,
"backends": backends,
})
if body:
for hit in find_shortcuts(handler, body):
warnings.append(f"AFP-local shortcut candidate: {hit}")
for marker, reason in GLOBAL_MARKERS:
if marker in text:
for line_no, line in enumerate(text.splitlines(), 1):
if marker in line:
warnings.append(f"{nwconn.relative_to(root)}:{line_no}: {reason}: {marker}")
payload = {"endpoints": rows, "warnings": sorted(set(warnings))}
if args.json:
print(json.dumps(payload, indent=2, sort_keys=True))
else:
print(render_markdown(rows, payload["warnings"]))
return 1 if args.strict and payload["warnings"] else 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))