diff --git a/tests/afp/afp_endpoint_inventory.py b/tests/afp/afp_endpoint_inventory.py new file mode 100755 index 0000000..56b963e --- /dev/null +++ b/tests/afp/afp_endpoint_inventory.py @@ -0,0 +1,302 @@ +#!/usr/bin/env python3 +"""Inventory mars_nwe AFP endpoint coverage before the final WebSDK/header audit. + +The helper is intentionally read-only. It scans src/nwconn.c and prints a +compact table of AFP subfunction names, likely handler functions, implementation +status, and the mars_nwe backend families touched by each handler. + +It does not replace the final manual WebSDK / Novell header comparison. Its +job is to make that review reproducible and to flag drift-prone markers such as +old transition labels, Netatalk/libatalk references, or AFP-local shortcuts. +""" + +from __future__ import annotations + +import argparse +import json +import re +import sys +from pathlib import Path +from typing import Dict, Iterable, List, Tuple + + +CALL_NAME_RE = re.compile( + r"case\s+0x([0-9a-fA-F]+)\s*:\s*return\s*\(\s*\"([^\"]+)\"\s*\)\s*;" +) +STATIC_AFP_FUNC_RE = re.compile( + r"static\s+int\s+(afp_[A-Za-z0-9_]+)\s*\([^)]*\)\s*\{", re.MULTILINE +) +DISPATCH_RE = re.compile( + r"case\s+0x([0-9a-fA-F]+)\s*:\s*\{(?P.*?)break\s*;", + re.DOTALL, +) + + +BACKEND_RULES: Tuple[Tuple[str, Tuple[str, ...]], ...] = ( + ("mars_nwe path/namespace", ( + "conn_get_kpl_path", + "afp_resolve_path_volume", + "afp_namespace_path_from_entry_id", + "afp_resolve_entry_id_path", + "name_2_base", + "build_dos_name", + "nw_get_directory_path", + )), + ("mars_nwe directory handles", ( + "nw_alloc_dir_handle", + "nw_open_dir_handle", + "nw_dealloc_dir_handle", + )), + ("mars_nwe file handles/I/O", ( + "nw_open_creat", + "nw_close_file", + "nw_read_file", + "nw_write_file", + "get_unix_name_from_fhandle", + "file_handle", + )), + ("mars_nwe attributes/archive/fileinfo", ( + "nw_get_fattrib", + "nw_set_fattrib", + "nwarchive_", + "nw_get_file_info", + "nw_set_file_info", + "nw_sattr", + )), + ("mars_nwe trustee/rights", ( + "tru_get_eff_rights", + "get_eff_rights", + "TRUSTEE_", + "no Modify rights", + )), + ("mars_nwe object lifecycle", ( + "nw_creat", + "mv_dir", + "nw_mv_dir_between_handles", + "unlink", + "rmdir", + )), + ("AFP-only xattrs via nwatalk", ( + "nwatalk_get_entry_id", + "nwatalk_set_entry_id", + "nwatalk_get_finder_info", + "nwatalk_set_finder_info", + "nwatalk_get_afp_attributes", + "nwatalk_set_afp_attributes", + )), +) + + +GLOBAL_MARKERS: Tuple[Tuple[str, str], ...] = ( + ("layout=websdk", "transient SetInfo layout marker should be gone"), + ("libatalk", "Netatalk/libatalk backend reference should be gone"), + ("Netatalk", "Netatalk/libatalk backend reference should be gone"), + ("HAVE_NETATALK", "Netatalk build gate should be gone"), + ("ENABLE_NETATALK", "Netatalk build gate should be gone"), + ("entry-id-only lookup unsupported", "entry-id-only resolver gap may remain"), + ("entry-id-only scan unsupported", "entry-id-only scan gap may remain"), +) + + +AFP_LOCAL_SHORTCUT_RE = re.compile( + r"\b(open|creat|read|write|close|rename|unlink|remove|rmdir|mkdir|stat|lstat|" + r"getxattr|setxattr|removexattr)\s*\(" +) + + +def repo_root_from(start: Path) -> Path: + cur = start.resolve() + while True: + if (cur / "src" / "nwconn.c").exists(): + return cur + if cur.parent == cur: + raise SystemExit("cannot find repository root containing src/nwconn.c") + cur = cur.parent + + +def line_for_offset(text: str, offset: int) -> int: + return text.count("\n", 0, offset) + 1 + + +def extract_function_bodies(text: str) -> Dict[str, Tuple[int, str]]: + bodies: Dict[str, Tuple[int, str]] = {} + for match in STATIC_AFP_FUNC_RE.finditer(text): + name = match.group(1) + start = match.end() + depth = 1 + pos = start + while pos < len(text) and depth: + ch = text[pos] + if ch == "{": + depth += 1 + elif ch == "}": + depth -= 1 + pos += 1 + bodies[name] = (line_for_offset(text, match.start()), text[start:pos - 1]) + return bodies + + +def parse_dispatch(text: str, function_names: Iterable[str]) -> Dict[int, str]: + names = sorted(function_names, key=len, reverse=True) + dispatch: Dict[int, str] = {} + # Restrict the broad case scan to areas that mention AFP handler names. This + # avoids confusing non-AFP NCP cases with AFP subfunctions. + for match in DISPATCH_RE.finditer(text): + body = match.group("body") + for name in names: + if name in body: + dispatch[int(match.group(1), 16)] = name + break + return dispatch + + +def classify_status(body: str) -> str: + lowered = body.lower() + if "unsupported" in lowered and "return(-0x9c" in lowered: + return "unsupported subset" + if "rejected" in lowered and "return(-0x9c" in lowered: + return "partial / validates rejects" + if "not implemented" in lowered: + return "not implemented marker" + if body.strip(): + return "implemented" + return "unknown" + + +def classify_backends(body: str) -> List[str]: + found: List[str] = [] + for label, needles in BACKEND_RULES: + if any(needle in body for needle in needles): + found.append(label) + return found or ["needs manual review"] + + +def function_for_call(call_name: str, call_no: int, dispatch: Dict[int, str], bodies: Dict[str, Tuple[int, str]]) -> str: + if call_no in dispatch: + return dispatch[call_no] + normalized = re.sub(r"[^a-z0-9]+", "_", call_name.lower()).strip("_") + candidates = [ + "afp_" + normalized, + "afp_" + normalized.replace("2_0_", ""), + ] + for candidate in candidates: + if candidate in bodies: + return candidate + return "" + + +def find_shortcuts(func_name: str, body: str) -> List[str]: + allowed_prefixes = ( + "nw_", + "nwatalk_", + "nwxattr_", + "conn_", + "afp_", + "GET_", + "U32_", + "mem", + "str", + "XDPRINTF", + ) + hits: List[str] = [] + for match in AFP_LOCAL_SHORTCUT_RE.finditer(body): + name = match.group(1) + start = max(0, match.start() - 12) + prefix = body[start:match.start()] + token = prefix.split()[-1] if prefix.split() else "" + if token.endswith(allowed_prefixes): + continue + # stat/lstat may still be legitimate verification around mars_nwe + # path resolution, so report it as review material rather than failing. + hits.append(f"{func_name}: {name}()") + return sorted(set(hits)) + + +def render_markdown(rows: List[dict], warnings: List[str]) -> str: + out: List[str] = [] + out.append("# AFP endpoint inventory") + out.append("") + out.append("| Subfn | Name | Handler | Status | Backend notes |") + out.append("| --- | --- | --- | --- | --- |") + for row in rows: + out.append( + "| 0x{call_no:02x} | {name} | {handler} | {status} | {backends} |".format( + call_no=row["call_no"], + name=row["name"].replace("|", "\\|"), + handler=row["handler"] or "manual review", + status=row["status"], + backends="
".join(row["backends"]), + ) + ) + out.append("") + out.append("## Review warnings") + out.append("") + if warnings: + for warning in warnings: + out.append(f"- {warning}") + else: + out.append("- none") + out.append("") + return "\n".join(out) + + +def main(argv: List[str]) -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--repo", type=Path, default=Path.cwd(), help="mars_nwe repository root") + parser.add_argument("--json", action="store_true", help="print JSON instead of markdown") + parser.add_argument("--strict", action="store_true", help="exit non-zero when warnings are found") + args = parser.parse_args(argv) + + root = repo_root_from(args.repo) + nwconn = root / "src" / "nwconn.c" + text = nwconn.read_text(errors="replace") + + call_names = { + int(call_no, 16): name + for call_no, name in CALL_NAME_RE.findall(text) + if name.startswith("AFP ") + } + bodies = extract_function_bodies(text) + dispatch = parse_dispatch(text, bodies) + + rows: List[dict] = [] + warnings: List[str] = [] + + for call_no in sorted(call_names): + name = call_names[call_no] + handler = function_for_call(name, call_no, dispatch, bodies) + line = None + body = "" + if handler and handler in bodies: + line, body = bodies[handler] + status = classify_status(body) + backends = classify_backends(body) + rows.append({ + "call_no": call_no, + "name": name, + "handler": handler, + "line": line, + "status": status, + "backends": backends, + }) + if body: + for hit in find_shortcuts(handler, body): + warnings.append(f"AFP-local shortcut candidate: {hit}") + + for marker, reason in GLOBAL_MARKERS: + if marker in text: + for line_no, line in enumerate(text.splitlines(), 1): + if marker in line: + warnings.append(f"{nwconn.relative_to(root)}:{line_no}: {reason}: {marker}") + + payload = {"endpoints": rows, "warnings": sorted(set(warnings))} + if args.json: + print(json.dumps(payload, indent=2, sort_keys=True)) + else: + print(render_markdown(rows, payload["warnings"])) + + return 1 if args.strict and payload["warnings"] else 0 + + +if __name__ == "__main__": + raise SystemExit(main(sys.argv[1:]))