tests: add AFP endpoint inventory helper

This commit is contained in:
OpenAI
2026-05-31 06:08:45 +00:00
committed by Mario Fetka
parent 4443c33274
commit 438a6d7289

View File

@@ -0,0 +1,302 @@
#!/usr/bin/env python3
"""Inventory mars_nwe AFP endpoint coverage before the final WebSDK/header audit.
The helper is intentionally read-only. It scans src/nwconn.c and prints a
compact table of AFP subfunction names, likely handler functions, implementation
status, and the mars_nwe backend families touched by each handler.
It does not replace the final manual WebSDK / Novell header comparison. Its
job is to make that review reproducible and to flag drift-prone markers such as
old transition labels, Netatalk/libatalk references, or AFP-local shortcuts.
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
CALL_NAME_RE = re.compile(
r"case\s+0x([0-9a-fA-F]+)\s*:\s*return\s*\(\s*\"([^\"]+)\"\s*\)\s*;"
)
STATIC_AFP_FUNC_RE = re.compile(
r"static\s+int\s+(afp_[A-Za-z0-9_]+)\s*\([^)]*\)\s*\{", re.MULTILINE
)
DISPATCH_RE = re.compile(
r"case\s+0x([0-9a-fA-F]+)\s*:\s*\{(?P<body>.*?)break\s*;",
re.DOTALL,
)
BACKEND_RULES: Tuple[Tuple[str, Tuple[str, ...]], ...] = (
("mars_nwe path/namespace", (
"conn_get_kpl_path",
"afp_resolve_path_volume",
"afp_namespace_path_from_entry_id",
"afp_resolve_entry_id_path",
"name_2_base",
"build_dos_name",
"nw_get_directory_path",
)),
("mars_nwe directory handles", (
"nw_alloc_dir_handle",
"nw_open_dir_handle",
"nw_dealloc_dir_handle",
)),
("mars_nwe file handles/I/O", (
"nw_open_creat",
"nw_close_file",
"nw_read_file",
"nw_write_file",
"get_unix_name_from_fhandle",
"file_handle",
)),
("mars_nwe attributes/archive/fileinfo", (
"nw_get_fattrib",
"nw_set_fattrib",
"nwarchive_",
"nw_get_file_info",
"nw_set_file_info",
"nw_sattr",
)),
("mars_nwe trustee/rights", (
"tru_get_eff_rights",
"get_eff_rights",
"TRUSTEE_",
"no Modify rights",
)),
("mars_nwe object lifecycle", (
"nw_creat",
"mv_dir",
"nw_mv_dir_between_handles",
"unlink",
"rmdir",
)),
("AFP-only xattrs via nwatalk", (
"nwatalk_get_entry_id",
"nwatalk_set_entry_id",
"nwatalk_get_finder_info",
"nwatalk_set_finder_info",
"nwatalk_get_afp_attributes",
"nwatalk_set_afp_attributes",
)),
)
GLOBAL_MARKERS: Tuple[Tuple[str, str], ...] = (
("layout=websdk", "transient SetInfo layout marker should be gone"),
("libatalk", "Netatalk/libatalk backend reference should be gone"),
("Netatalk", "Netatalk/libatalk backend reference should be gone"),
("HAVE_NETATALK", "Netatalk build gate should be gone"),
("ENABLE_NETATALK", "Netatalk build gate should be gone"),
("entry-id-only lookup unsupported", "entry-id-only resolver gap may remain"),
("entry-id-only scan unsupported", "entry-id-only scan gap may remain"),
)
AFP_LOCAL_SHORTCUT_RE = re.compile(
r"\b(open|creat|read|write|close|rename|unlink|remove|rmdir|mkdir|stat|lstat|"
r"getxattr|setxattr|removexattr)\s*\("
)
def repo_root_from(start: Path) -> Path:
cur = start.resolve()
while True:
if (cur / "src" / "nwconn.c").exists():
return cur
if cur.parent == cur:
raise SystemExit("cannot find repository root containing src/nwconn.c")
cur = cur.parent
def line_for_offset(text: str, offset: int) -> int:
return text.count("\n", 0, offset) + 1
def extract_function_bodies(text: str) -> Dict[str, Tuple[int, str]]:
bodies: Dict[str, Tuple[int, str]] = {}
for match in STATIC_AFP_FUNC_RE.finditer(text):
name = match.group(1)
start = match.end()
depth = 1
pos = start
while pos < len(text) and depth:
ch = text[pos]
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
pos += 1
bodies[name] = (line_for_offset(text, match.start()), text[start:pos - 1])
return bodies
def parse_dispatch(text: str, function_names: Iterable[str]) -> Dict[int, str]:
names = sorted(function_names, key=len, reverse=True)
dispatch: Dict[int, str] = {}
# Restrict the broad case scan to areas that mention AFP handler names. This
# avoids confusing non-AFP NCP cases with AFP subfunctions.
for match in DISPATCH_RE.finditer(text):
body = match.group("body")
for name in names:
if name in body:
dispatch[int(match.group(1), 16)] = name
break
return dispatch
def classify_status(body: str) -> str:
lowered = body.lower()
if "unsupported" in lowered and "return(-0x9c" in lowered:
return "unsupported subset"
if "rejected" in lowered and "return(-0x9c" in lowered:
return "partial / validates rejects"
if "not implemented" in lowered:
return "not implemented marker"
if body.strip():
return "implemented"
return "unknown"
def classify_backends(body: str) -> List[str]:
found: List[str] = []
for label, needles in BACKEND_RULES:
if any(needle in body for needle in needles):
found.append(label)
return found or ["needs manual review"]
def function_for_call(call_name: str, call_no: int, dispatch: Dict[int, str], bodies: Dict[str, Tuple[int, str]]) -> str:
if call_no in dispatch:
return dispatch[call_no]
normalized = re.sub(r"[^a-z0-9]+", "_", call_name.lower()).strip("_")
candidates = [
"afp_" + normalized,
"afp_" + normalized.replace("2_0_", ""),
]
for candidate in candidates:
if candidate in bodies:
return candidate
return ""
def find_shortcuts(func_name: str, body: str) -> List[str]:
allowed_prefixes = (
"nw_",
"nwatalk_",
"nwxattr_",
"conn_",
"afp_",
"GET_",
"U32_",
"mem",
"str",
"XDPRINTF",
)
hits: List[str] = []
for match in AFP_LOCAL_SHORTCUT_RE.finditer(body):
name = match.group(1)
start = max(0, match.start() - 12)
prefix = body[start:match.start()]
token = prefix.split()[-1] if prefix.split() else ""
if token.endswith(allowed_prefixes):
continue
# stat/lstat may still be legitimate verification around mars_nwe
# path resolution, so report it as review material rather than failing.
hits.append(f"{func_name}: {name}()")
return sorted(set(hits))
def render_markdown(rows: List[dict], warnings: List[str]) -> str:
out: List[str] = []
out.append("# AFP endpoint inventory")
out.append("")
out.append("| Subfn | Name | Handler | Status | Backend notes |")
out.append("| --- | --- | --- | --- | --- |")
for row in rows:
out.append(
"| 0x{call_no:02x} | {name} | {handler} | {status} | {backends} |".format(
call_no=row["call_no"],
name=row["name"].replace("|", "\\|"),
handler=row["handler"] or "manual review",
status=row["status"],
backends="<br>".join(row["backends"]),
)
)
out.append("")
out.append("## Review warnings")
out.append("")
if warnings:
for warning in warnings:
out.append(f"- {warning}")
else:
out.append("- none")
out.append("")
return "\n".join(out)
def main(argv: List[str]) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--repo", type=Path, default=Path.cwd(), help="mars_nwe repository root")
parser.add_argument("--json", action="store_true", help="print JSON instead of markdown")
parser.add_argument("--strict", action="store_true", help="exit non-zero when warnings are found")
args = parser.parse_args(argv)
root = repo_root_from(args.repo)
nwconn = root / "src" / "nwconn.c"
text = nwconn.read_text(errors="replace")
call_names = {
int(call_no, 16): name
for call_no, name in CALL_NAME_RE.findall(text)
if name.startswith("AFP ")
}
bodies = extract_function_bodies(text)
dispatch = parse_dispatch(text, bodies)
rows: List[dict] = []
warnings: List[str] = []
for call_no in sorted(call_names):
name = call_names[call_no]
handler = function_for_call(name, call_no, dispatch, bodies)
line = None
body = ""
if handler and handler in bodies:
line, body = bodies[handler]
status = classify_status(body)
backends = classify_backends(body)
rows.append({
"call_no": call_no,
"name": name,
"handler": handler,
"line": line,
"status": status,
"backends": backends,
})
if body:
for hit in find_shortcuts(handler, body):
warnings.append(f"AFP-local shortcut candidate: {hit}")
for marker, reason in GLOBAL_MARKERS:
if marker in text:
for line_no, line in enumerate(text.splitlines(), 1):
if marker in line:
warnings.append(f"{nwconn.relative_to(root)}:{line_no}: {reason}: {marker}")
payload = {"endpoints": rows, "warnings": sorted(set(warnings))}
if args.json:
print(json.dumps(payload, indent=2, sort_keys=True))
else:
print(render_markdown(rows, payload["warnings"]))
return 1 if args.strict and payload["warnings"] else 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))