#!/usr/bin/env python3
"""
test_merge_batch_graphs.py — Tests for the deterministic tested_by linker.
Run from the repo root:
python -m unittest tests.skill.understand.test_merge_batch_graphs -v
"""
from __future__ import annotations
import importlib.util
import sys
import unittest
from pathlib import Path
from typing import Any
# ── Module loader ─────────────────────────────────────────────────────────
# `merge-batch-graphs.py` has a hyphen in its name, so we cannot `import` it
# directly. Load it via importlib so we can call its module-level helpers.
_HERE = Path(__file__).resolve().parent
_REPO_ROOT = _HERE.parent.parent.parent
_MODULE_PATH = (
_REPO_ROOT
/ "understand-anything-plugin"
/ "skills"
/ "understand"
/ "merge-batch-graphs.py"
)
def _load_module() -> Any:
spec = importlib.util.spec_from_file_location("merge_batch_graphs", _MODULE_PATH)
if spec is None or spec.loader is None:
raise RuntimeError(f"Could not load module from {_MODULE_PATH}")
module = importlib.util.module_from_spec(spec)
sys.modules["merge_batch_graphs"] = module
spec.loader.exec_module(module)
return module
mbg = _load_module()
# ── Helpers ───────────────────────────────────────────────────────────────
def _file_node(path: str, **extra: Any) -> dict[str, Any]:
"""Build a minimal file node with the given relative path."""
node: dict[str, Any] = {
"id": f"file:{path}",
"type": "file",
"name": path.rsplit("/", 1)[-1],
"filePath": path,
"summary": "",
"tags": [],
"complexity": "simple",
}
node.update(extra)
return node
# ── is_test_path ──────────────────────────────────────────────────────────
class IsTestPathTests(unittest.TestCase):
"""Path classification: production vs. test."""
def test_js_ts_sibling_test_extensions(self) -> None:
for path in [
"src/foo.test.ts",
"src/foo.test.tsx",
"src/foo.test.js",
"src/foo.test.jsx",
"src/foo.test.mjs",
"src/foo.test.cjs",
"src/Component.test.vue",
"src/foo.spec.ts",
"src/foo.spec.tsx",
"src/foo.spec.js",
"src/Component.spec.vue",
]:
with self.subTest(path=path):
self.assertTrue(mbg.is_test_path(path), f"{path} should be a test")
def test_underscore_test_dir_with_test_extension(self) -> None:
self.assertTrue(mbg.is_test_path("src/__tests__/foo.test.js"))
self.assertTrue(mbg.is_test_path("src/__tests__/foo.test.ts"))
def test_tests_directory_with_test_extension(self) -> None:
self.assertTrue(mbg.is_test_path("tests/foo/X.test.ts"))
self.assertTrue(mbg.is_test_path("test/foo/X.test.ts"))
self.assertTrue(mbg.is_test_path("spec/foo/X.spec.ts"))
def test_go_test_files(self) -> None:
self.assertTrue(mbg.is_test_path("internal/bar_test.go"))
self.assertTrue(mbg.is_test_path("bar_test.go"))
def test_python_test_files(self) -> None:
self.assertTrue(mbg.is_test_path("tests/test_bar.py"))
self.assertTrue(mbg.is_test_path("bar_test.py"))
self.assertTrue(mbg.is_test_path("test_bar.py"))
def test_java_test_files(self) -> None:
self.assertTrue(mbg.is_test_path("src/test/java/com/foo/BarTest.java"))
self.assertTrue(mbg.is_test_path("src/test/java/com/foo/BarTests.java"))
self.assertTrue(mbg.is_test_path("src/test/java/com/foo/BarIT.java"))
def test_kotlin_test_files(self) -> None:
self.assertTrue(mbg.is_test_path("src/test/kotlin/com/foo/BarTest.kt"))
self.assertTrue(mbg.is_test_path("src/test/kotlin/com/foo/BarTests.kt"))
def test_csharp_test_files(self) -> None:
self.assertTrue(mbg.is_test_path("Foo.Tests/BarTests.cs"))
self.assertTrue(mbg.is_test_path("Foo.Tests/BarTest.cs"))
def test_c_cpp_test_files(self) -> None:
self.assertTrue(mbg.is_test_path("test/bar_test.c"))
self.assertTrue(mbg.is_test_path("test/test_bar.c"))
self.assertTrue(mbg.is_test_path("test/bar_test.cpp"))
self.assertTrue(mbg.is_test_path("test/bar_test.cc"))
self.assertTrue(mbg.is_test_path("test/test_bar.cpp"))
def test_production_files_rejected(self) -> None:
for path in [
"src/foo.ts",
"src/foo.tsx",
"internal/bar.go",
"src/index.tsx",
"README.md",
"docs/guide.md",
"main.py",
"src/foo/bar.js",
"Foo.cs",
"Bar.kt",
"Bar.java",
]:
with self.subTest(path=path):
self.assertFalse(mbg.is_test_path(path), f"{path} should be production")
def test_helper_in_tests_dir_without_test_extension_is_not_test(self) -> None:
# Files that live inside a __tests__ directory but don't carry a test
# extension are treated as helpers, not tests. We only count code files
# whose basename matches a test pattern. Assets/non-code files in
# tests/ are not flagged.
self.assertFalse(mbg.is_test_path("src/__tests__/helpers.ts"))
self.assertFalse(mbg.is_test_path("tests/fixtures/data.json"))
# ── production_candidates ─────────────────────────────────────────────────
class ProductionCandidatesTests(unittest.TestCase):
"""For each test path, what production paths should we try?"""
def test_js_ts_sibling(self) -> None:
cands = mbg.production_candidates("src/foo/X.test.ts")
# Sibling de-infix should be in the candidate list, with .ts as the
# most natural target. Several extensions are tried because a .test.ts
# file might test a .tsx file.
self.assertIn("src/foo/X.ts", cands)
self.assertIn("src/foo/X.tsx", cands)
def test_js_ts_spec_sibling(self) -> None:
cands = mbg.production_candidates("src/foo/X.spec.tsx")
self.assertIn("src/foo/X.tsx", cands)
self.assertIn("src/foo/X.ts", cands)
def test_underscore_tests_dir(self) -> None:
cands = mbg.production_candidates("src/foo/__tests__/X.test.ts")
# Walking out of __tests__/ should produce src/foo/X.ts
self.assertIn("src/foo/X.ts", cands)
def test_mirrored_tests_tree(self) -> None:
cands = mbg.production_candidates("tests/foo/X.test.ts")
# Should try src/foo/X.ts, app/foo/X.ts, lib/foo/X.ts, foo/X.ts
self.assertIn("src/foo/X.ts", cands)
self.assertIn("foo/X.ts", cands)
def test_go_sibling(self) -> None:
cands = mbg.production_candidates("internal/bar_test.go")
self.assertIn("internal/bar.go", cands)
def test_python_test_prefix(self) -> None:
cands = mbg.production_candidates("tests/test_bar.py")
self.assertIn("tests/bar.py", cands)
# Also try mirrored layout
self.assertIn("bar.py", cands)
self.assertIn("src/bar.py", cands)
def test_python_test_suffix(self) -> None:
cands = mbg.production_candidates("foo/bar_test.py")
self.assertIn("foo/bar.py", cands)
def test_java_maven_layout(self) -> None:
cands = mbg.production_candidates("src/test/java/com/foo/BarTest.java")
self.assertIn("src/main/java/com/foo/Bar.java", cands)
def test_java_tests_suffix(self) -> None:
cands = mbg.production_candidates("src/test/java/com/foo/BarTests.java")
self.assertIn("src/main/java/com/foo/Bar.java", cands)
def test_java_it_suffix(self) -> None:
cands = mbg.production_candidates("src/test/java/com/foo/BarIT.java")
self.assertIn("src/main/java/com/foo/Bar.java", cands)
def test_kotlin_maven_layout(self) -> None:
cands = mbg.production_candidates("src/test/kotlin/com/foo/BarTest.kt")
self.assertIn("src/main/kotlin/com/foo/Bar.kt", cands)
def test_js_ts_test_subdir_walkout(self) -> None:
# Some JS/TS projects use `
/test/` or `/spec/` instead of
# the more idiomatic `__tests__/`. Walk out of either.
cands_test = mbg.production_candidates("src/foo/test/X.test.ts")
self.assertIn("src/foo/X.ts", cands_test)
cands_spec = mbg.production_candidates("src/foo/spec/X.spec.ts")
self.assertIn("src/foo/X.ts", cands_spec)
def test_python_in_package_tests_walkout(self) -> None:
# `mypkg/tests/test_bar.py` (Django-app style) should pair with
# `mypkg/bar.py` — walk out of the in-package tests/ dir.
cands = mbg.production_candidates("mypkg/tests/test_bar.py")
self.assertIn("mypkg/bar.py", cands)
# Also nested:
cands_nested = mbg.production_candidates("a/b/test/test_bar.py")
self.assertIn("a/b/bar.py", cands_nested)
def test_csharp_tests_subdir_mirror_to_src(self) -> None:
# Real case from microservices-demo cartservice:
# `src/cartservice/tests/CartServiceTests.cs` ↔
# `src/cartservice/src/services/CartService.cs`. The candidate list
# only knows the basename; the matcher must produce a parent-level
# candidate that the linker can verify against the actual file index.
cands = mbg.production_candidates(
"src/cartservice/tests/CartServiceTests.cs"
)
# Drop tests/ entirely:
self.assertIn("src/cartservice/CartService.cs", cands)
# Mirror through `src/`:
self.assertIn("src/cartservice/src/CartService.cs", cands)
# Sibling fallback retained:
self.assertIn("src/cartservice/tests/CartService.cs", cands)
def test_csharp_dotnet_sibling_project_mirror(self) -> None:
# `.NET` convention: `MyApp.Tests/Foo/BarTests.cs` ↔
# `MyApp/Foo/Bar.cs`. Strip the `.Tests` suffix from the top dir
# and try the same tail under the sibling project.
cands = mbg.production_candidates("MyApp.Tests/Foo/BarTests.cs")
self.assertIn("MyApp/Foo/Bar.cs", cands)
# Also `.Test` (singular) is sometimes used.
cands_singular = mbg.production_candidates("MyApp.Test/BarTest.cs")
self.assertIn("MyApp/Bar.cs", cands_singular)
def test_priority_underscore_tests_sibling_before_walkup(self) -> None:
# When a test sits in `src/__tests__/`, the sibling-de-infix path
# (same directory) ranks before the walk-out path (parent directory).
# This is load-bearing: if a project happens to have both
# `src/__tests__/X.ts` and `src/X.ts`, we should pair with the
# nearer one.
cands = mbg.production_candidates("src/__tests__/X.test.ts")
self.assertEqual(cands[0], "src/__tests__/X.ts")
self.assertIn("src/X.ts", cands)
self.assertLess(cands.index("src/__tests__/X.ts"), cands.index("src/X.ts"))
def test_priority_mirrored_tree_sibling_before_mirror(self) -> None:
# `tests/foo/X.test.ts` sibling path is `tests/foo/X.ts`, which must
# rank above the mirrored `src/foo/X.ts` variant. Same rationale:
# closer pairing wins.
cands = mbg.production_candidates("tests/foo/X.test.ts")
self.assertEqual(cands[0], "tests/foo/X.ts")
self.assertIn("src/foo/X.ts", cands)
self.assertLess(cands.index("tests/foo/X.ts"), cands.index("src/foo/X.ts"))
# ── link_tests (end-to-end) ───────────────────────────────────────────────
class LinkTestsTests(unittest.TestCase):
"""End-to-end behaviour of the linker against a node/edge set."""
def test_basic_pairing_emits_forward_edge(self) -> None:
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
}
edges: list[dict[str, Any]] = []
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual(added, 1)
self.assertEqual(dropped, 0)
self.assertEqual(tagged, 1)
self.assertEqual(swapped, 0)
self.assertEqual(len(edges), 1)
edge = edges[0]
self.assertEqual(edge["source"], "file:src/foo.ts")
self.assertEqual(edge["target"], "file:src/foo.test.ts")
self.assertEqual(edge["type"], "tested_by")
self.assertEqual(edge["direction"], "forward")
self.assertEqual(edge["weight"], 0.5)
self.assertIn("tested", nodes_by_id["file:src/foo.ts"]["tags"])
# Test node is not tagged with "tested"
self.assertNotIn("tested", nodes_by_id["file:src/foo.test.ts"]["tags"])
def test_no_production_counterpart_no_edge(self) -> None:
nodes_by_id = {
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
}
edges: list[dict[str, Any]] = []
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual(added, 0)
self.assertEqual(tagged, 0)
self.assertEqual(swapped, 0)
self.assertEqual(len(edges), 0)
def test_inverted_llm_edge_is_swapped_not_stripped(self) -> None:
# The LLM systematically emits tested_by edges as test → production
# (it sees the import only when analyzing the test file). The pairing
# is real evidence; we keep it and flip the direction in place.
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
}
edges: list[dict[str, Any]] = [
{
"source": "file:src/foo.test.ts",
"target": "file:src/foo.ts",
"type": "tested_by",
"direction": "forward",
"weight": 0.5,
"description": "from LLM",
},
]
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
# No supplement needed (the LLM edge already covers this pair).
self.assertEqual(added, 0)
self.assertEqual(swapped, 1)
self.assertEqual(dropped, 0)
self.assertEqual(tagged, 1)
tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
self.assertEqual(len(tested_by_edges), 1)
edge = tested_by_edges[0]
self.assertEqual(edge["source"], "file:src/foo.ts")
self.assertEqual(edge["target"], "file:src/foo.test.ts")
# Provenance recorded so reviewers can audit the swap.
self.assertIn("direction corrected", edge["description"].lower())
def test_canonical_llm_edge_kept_unchanged(self) -> None:
# An LLM edge already in canonical direction should pass through
# untouched (no swap, no drop), and Pass 2 must not produce a
# duplicate.
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
}
edges: list[dict[str, Any]] = [
{
"source": "file:src/foo.ts",
"target": "file:src/foo.test.ts",
"type": "tested_by",
"direction": "forward",
"weight": 0.5,
"description": "original",
},
]
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual((added, dropped, swapped), (0, 0, 0))
self.assertEqual(tagged, 1)
tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
self.assertEqual(len(tested_by_edges), 1)
self.assertEqual(tested_by_edges[0]["description"], "original")
def test_drops_test_to_test_edge(self) -> None:
# An LLM edge between two test files has no recoverable meaning.
nodes_by_id = {
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
"file:src/bar.test.ts": _file_node("src/bar.test.ts"),
}
edges: list[dict[str, Any]] = [
{
"source": "file:src/foo.test.ts",
"target": "file:src/bar.test.ts",
"type": "tested_by",
"direction": "forward",
"weight": 0.5,
},
]
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual(added, 0)
self.assertEqual(swapped, 0)
self.assertEqual(dropped, 1)
self.assertEqual(tagged, 0)
tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
self.assertEqual(tested_by_edges, [])
def test_drops_orphan_endpoint_edge(self) -> None:
# Endpoint references a node that doesn't exist in nodes_by_id —
# nothing to canonicalize against, drop it.
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
}
edges: list[dict[str, Any]] = [
{
"source": "file:src/foo.ts",
"target": "file:src/missing.test.ts",
"type": "tested_by",
"direction": "forward",
"weight": 0.5,
},
]
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual((added, dropped, tagged, swapped), (0, 1, 0, 0))
self.assertEqual([e for e in edges if e["type"] == "tested_by"], [])
def test_dup_keeps_higher_weight_canonical(self) -> None:
# Two canonical tested_by edges for the same pair, weights 0.3 and
# 0.9. The heavier one must be kept — mirroring the weight-aware
# dedup at Step 6 (which never sees the discarded duplicate).
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
}
edges: list[dict[str, Any]] = [
{"source": "file:src/foo.ts", "target": "file:src/foo.test.ts",
"type": "tested_by", "direction": "forward", "weight": 0.3},
{"source": "file:src/foo.ts", "target": "file:src/foo.test.ts",
"type": "tested_by", "direction": "forward", "weight": 0.9},
]
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual((added, dropped, swapped), (0, 1, 0))
tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
self.assertEqual(len(tested_by_edges), 1)
self.assertEqual(tested_by_edges[0]["weight"], 0.9)
def test_dup_lighter_inverted_dropped_no_swap_counted(self) -> None:
# Heavier canonical first, lighter inverted second. The lighter
# inverted edge is dropped without being swapped — no point
# canonicalizing an edge that's about to die in the dedup.
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
}
edges: list[dict[str, Any]] = [
{"source": "file:src/foo.ts", "target": "file:src/foo.test.ts",
"type": "tested_by", "direction": "forward", "weight": 0.9},
{"source": "file:src/foo.test.ts", "target": "file:src/foo.ts",
"type": "tested_by", "direction": "forward", "weight": 0.3},
]
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual((added, dropped, swapped), (0, 1, 0))
tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
self.assertEqual(len(tested_by_edges), 1)
self.assertEqual(tested_by_edges[0]["weight"], 0.9)
# Surviving edge is the original canonical — no audit marker.
self.assertNotIn(
"direction corrected",
(tested_by_edges[0].get("description") or "").lower(),
)
def test_dup_replaces_with_heavier_inverted(self) -> None:
# Lighter canonical first, heavier inverted second. The inverted
# edge gets swapped AND replaces the kept slot, since it's heavier.
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
}
edges: list[dict[str, Any]] = [
{"source": "file:src/foo.ts", "target": "file:src/foo.test.ts",
"type": "tested_by", "direction": "forward", "weight": 0.3},
{"source": "file:src/foo.test.ts", "target": "file:src/foo.ts",
"type": "tested_by", "direction": "forward", "weight": 0.9},
]
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual(added, 0)
self.assertEqual(dropped, 1)
self.assertEqual(swapped, 1) # surviving edge IS a swap
tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
self.assertEqual(len(tested_by_edges), 1)
edge = tested_by_edges[0]
self.assertEqual(edge["source"], "file:src/foo.ts")
self.assertEqual(edge["target"], "file:src/foo.test.ts")
self.assertEqual(edge["weight"], 0.9)
self.assertIn("direction corrected", edge["description"].lower())
def test_dup_swapped_then_canonical_heavier_clears_swapped_count(self) -> None:
# Inverted lighter first (swap is applied, swapped_pairs={pair}),
# then canonical heavier replaces — the surviving edge is canonical
# so `swapped` must drop back to 0.
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
}
edges: list[dict[str, Any]] = [
{"source": "file:src/foo.test.ts", "target": "file:src/foo.ts",
"type": "tested_by", "direction": "forward", "weight": 0.3},
{"source": "file:src/foo.ts", "target": "file:src/foo.test.ts",
"type": "tested_by", "direction": "forward", "weight": 0.9},
]
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual(added, 0)
self.assertEqual(dropped, 1)
self.assertEqual(swapped, 0) # surviving edge is canonical, not a swap
tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
self.assertEqual(len(tested_by_edges), 1)
self.assertEqual(tested_by_edges[0]["weight"], 0.9)
def test_dup_two_inverted_keeps_heavier_swapped_once(self) -> None:
# Both inverted, different weights. The heavier one wins the slot
# after both get swapped; `swapped` reflects the surviving edge,
# not the wasted swap on the dropped lighter one.
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
}
edges: list[dict[str, Any]] = [
{"source": "file:src/foo.test.ts", "target": "file:src/foo.ts",
"type": "tested_by", "direction": "forward", "weight": 0.3},
{"source": "file:src/foo.test.ts", "target": "file:src/foo.ts",
"type": "tested_by", "direction": "forward", "weight": 0.9},
]
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual(added, 0)
self.assertEqual(dropped, 1)
self.assertEqual(swapped, 1)
tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
self.assertEqual(len(tested_by_edges), 1)
edge = tested_by_edges[0]
self.assertEqual(edge["weight"], 0.9)
self.assertIn("direction corrected", edge["description"].lower())
def test_drops_duplicate_canonical_edges(self) -> None:
# Two LLM edges describing the same (production, test) pair — keep
# one, drop the other.
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
}
edges: list[dict[str, Any]] = [
{
"source": "file:src/foo.ts",
"target": "file:src/foo.test.ts",
"type": "tested_by",
"direction": "forward",
"weight": 0.5,
},
{
"source": "file:src/foo.test.ts",
"target": "file:src/foo.ts",
"type": "tested_by",
"direction": "forward",
"weight": 0.5,
},
]
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual(added, 0)
# First edge was canonical; second was inverted but described the
# same pair → dropped as a duplicate (not a swap).
self.assertEqual(dropped, 1)
self.assertEqual(swapped, 0)
self.assertEqual(tagged, 1)
self.assertEqual(len([e for e in edges if e["type"] == "tested_by"]), 1)
def test_supplement_skips_pair_already_covered_by_llm(self) -> None:
# If the LLM (after swap) already covers a (production, test) pair
# that a path-convention candidate would also produce, Pass 2 must
# not emit a duplicate.
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
"file:src/bar.ts": _file_node("src/bar.ts"),
"file:src/bar.test.ts": _file_node("src/bar.test.ts"),
}
# LLM only emitted (and inverted) the foo pair. The bar pair is
# covered by Pass 2 (path convention).
edges: list[dict[str, Any]] = [
{
"source": "file:src/foo.test.ts",
"target": "file:src/foo.ts",
"type": "tested_by",
"direction": "forward",
"weight": 0.5,
},
]
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual(swapped, 1)
self.assertEqual(added, 1) # only bar; foo is already covered
self.assertEqual(dropped, 0)
self.assertEqual(tagged, 2)
tested_by_edges = sorted(
[e for e in edges if e["type"] == "tested_by"],
key=lambda e: e["source"],
)
self.assertEqual(len(tested_by_edges), 2)
def test_swap_recovers_real_world_one_test_many_production(self) -> None:
# Real case from microservices-demo: shippingservice_test.go does
# not have a `shippingservice.go` sibling — it tests `main.go`,
# `tracker.go`, and `quote.go`. Path convention can't pair these,
# but the LLM saw the same-package usage and emitted the edges
# (with wrong direction). Swap should recover them.
nodes_by_id = {
"file:src/shippingservice/main.go": _file_node("src/shippingservice/main.go"),
"file:src/shippingservice/tracker.go": _file_node("src/shippingservice/tracker.go"),
"file:src/shippingservice/quote.go": _file_node("src/shippingservice/quote.go"),
"file:src/shippingservice/shippingservice_test.go": _file_node("src/shippingservice/shippingservice_test.go"),
}
edges: list[dict[str, Any]] = [
{
"source": "file:src/shippingservice/shippingservice_test.go",
"target": "file:src/shippingservice/main.go",
"type": "tested_by",
"direction": "forward",
"weight": 0.5,
},
{
"source": "file:src/shippingservice/shippingservice_test.go",
"target": "file:src/shippingservice/tracker.go",
"type": "tested_by",
"direction": "forward",
"weight": 0.5,
},
]
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual(swapped, 2)
# Pass 2 fallback: the test file with no shippingservice.go sibling
# produces no path-convention candidate — we rely entirely on swap.
self.assertEqual(added, 0)
self.assertEqual(dropped, 0)
# main.go and tracker.go were tagged; quote.go was not (LLM didn't
# emit an edge for it, and there's no path-convention pair).
self.assertEqual(tagged, 2)
self.assertIn("tested", nodes_by_id["file:src/shippingservice/main.go"]["tags"])
self.assertIn("tested", nodes_by_id["file:src/shippingservice/tracker.go"]["tags"])
self.assertNotIn("tested", nodes_by_id["file:src/shippingservice/quote.go"]["tags"])
def test_unrelated_edges_pass_through(self) -> None:
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
}
edges: list[dict[str, Any]] = [
{
"source": "file:src/foo.test.ts",
"target": "file:src/foo.ts",
"type": "tested_by",
"direction": "forward",
"weight": 0.5,
},
{
"source": "file:src/foo.ts",
"target": "file:src/foo.test.ts",
"type": "imports",
"direction": "forward",
"weight": 0.7,
},
]
mbg.link_tests(nodes_by_id, edges)
import_edges = [e for e in edges if e["type"] == "imports"]
self.assertEqual(len(import_edges), 1)
self.assertEqual(import_edges[0]["source"], "file:src/foo.ts")
self.assertEqual(import_edges[0]["target"], "file:src/foo.test.ts")
self.assertEqual(import_edges[0]["weight"], 0.7)
def test_direction_always_forward_production_to_test(self) -> None:
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
"file:src/__tests__/foo.test.ts": _file_node("src/__tests__/foo.test.ts"),
"file:internal/bar.go": _file_node("internal/bar.go"),
"file:internal/bar_test.go": _file_node("internal/bar_test.go"),
"file:src/main/java/com/foo/Bar.java": _file_node("src/main/java/com/foo/Bar.java"),
"file:src/test/java/com/foo/BarTest.java": _file_node("src/test/java/com/foo/BarTest.java"),
}
edges: list[dict[str, Any]] = []
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual(added, 3)
for edge in edges:
self.assertEqual(edge["type"], "tested_by")
self.assertEqual(edge["direction"], "forward")
# Target must be the test file (basename gives it away)
self.assertTrue(
mbg.is_test_path(edge["target"][len("file:"):]),
f"target {edge['target']} should classify as test",
)
self.assertFalse(
mbg.is_test_path(edge["source"][len("file:"):]),
f"source {edge['source']} should classify as production",
)
def test_idempotent(self) -> None:
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
}
edges: list[dict[str, Any]] = []
mbg.link_tests(nodes_by_id, edges)
# Second invocation must not duplicate edges or tags. The first run
# added a canonical supplement edge; the second sees it as canonical
# in Pass 1 and keeps it without flipping or duplicating.
added2, dropped2, tagged2, swapped2 = mbg.link_tests(nodes_by_id, edges)
self.assertEqual((added2, dropped2, swapped2), (0, 0, 0))
# Tag was already present, so tagged counter for second call is 0.
self.assertEqual(tagged2, 0)
tested_by_edges = [e for e in edges if e["type"] == "tested_by"]
self.assertEqual(len(tested_by_edges), 1)
tags = nodes_by_id["file:src/foo.ts"]["tags"]
self.assertEqual(tags.count("tested"), 1)
def test_first_matching_candidate_wins(self) -> None:
# If both src/foo.ts and src/foo.tsx exist, the linker should match
# exactly one of them (the first candidate). Sibling de-infix yields
# .ts before .tsx (since the test is named foo.test.ts).
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts"),
"file:src/foo.tsx": _file_node("src/foo.tsx"),
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
}
edges: list[dict[str, Any]] = []
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual(added, 1)
self.assertEqual(tagged, 1)
# Only one of them gets tagged.
ts_tagged = "tested" in nodes_by_id["file:src/foo.ts"]["tags"]
tsx_tagged = "tested" in nodes_by_id["file:src/foo.tsx"]["tags"]
self.assertTrue(ts_tagged != tsx_tagged, "exactly one should be tagged")
# The .ts file should win (it matches the test-file extension).
self.assertTrue(ts_tagged)
def test_does_not_match_test_to_test(self) -> None:
# If only test files exist, no edges are produced — we never link a
# test to another test.
nodes_by_id = {
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
"file:src/foo.spec.ts": _file_node("src/foo.spec.ts"),
}
edges: list[dict[str, Any]] = []
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual(added, 0)
self.assertEqual(tagged, 0)
def test_does_not_duplicate_existing_tag(self) -> None:
# Production node already carries the "tested" tag — linker should
# not duplicate it.
nodes_by_id = {
"file:src/foo.ts": _file_node("src/foo.ts", tags=["tested", "core"]),
"file:src/foo.test.ts": _file_node("src/foo.test.ts"),
}
edges: list[dict[str, Any]] = []
mbg.link_tests(nodes_by_id, edges)
tags = nodes_by_id["file:src/foo.ts"]["tags"]
self.assertEqual(tags.count("tested"), 1)
self.assertIn("core", tags)
def test_empty_input(self) -> None:
edges: list[dict[str, Any]] = []
added, dropped, tagged, swapped = mbg.link_tests({}, edges)
self.assertEqual((added, dropped, tagged, swapped), (0, 0, 0, 0))
self.assertEqual(edges, [])
def test_node_without_filepath_falls_back_to_id(self) -> None:
# A file node with only `id` (no `filePath`) should still pair via
# the path embedded in the ID.
prod = {"id": "file:src/foo.ts", "type": "file", "name": "foo.ts", "tags": []}
test = {
"id": "file:src/foo.test.ts",
"type": "file",
"name": "foo.test.ts",
"tags": [],
}
nodes_by_id = {prod["id"]: prod, test["id"]: test}
edges: list[dict[str, Any]] = []
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual((added, dropped, tagged, swapped), (1, 0, 1, 0))
self.assertEqual(edges[0]["source"], "file:src/foo.ts")
self.assertEqual(edges[0]["target"], "file:src/foo.test.ts")
self.assertIn("tested", prod["tags"])
def test_malformed_tags_is_replaced_not_crashed(self) -> None:
# Raw LLM batch JSON can ship `tags` as None, a string, or other
# non-list values — the TypeScript autoFixGraph normalizer runs
# downstream of this script. The linker must coerce instead of crash.
for bad_tags in (None, "tested,foo", "single", 0, {"k": "v"}):
with self.subTest(bad_tags=bad_tags):
prod = {
"id": "file:src/foo.ts",
"type": "file",
"name": "foo.ts",
"filePath": "src/foo.ts",
"tags": bad_tags,
}
test = _file_node("src/foo.test.ts")
nodes_by_id = {prod["id"]: prod, test["id"]: test}
edges: list[dict[str, Any]] = []
added, dropped, tagged, swapped = mbg.link_tests(nodes_by_id, edges)
self.assertEqual((added, dropped, tagged, swapped), (1, 0, 1, 0))
self.assertEqual(prod["tags"], ["tested"])
# ── merge_and_normalize integration ───────────────────────────────────────
class MergeIntegrationTests(unittest.TestCase):
"""Verify the linker is wired into merge_and_normalize correctly."""
def test_linker_runs_during_merge(self) -> None:
batch = {
"nodes": [
{
"id": "file:src/foo.ts",
"type": "file",
"name": "foo.ts",
"filePath": "src/foo.ts",
"summary": "",
"tags": [],
"complexity": "simple",
},
{
"id": "file:src/foo.test.ts",
"type": "file",
"name": "foo.test.ts",
"filePath": "src/foo.test.ts",
"summary": "",
"tags": [],
"complexity": "simple",
},
],
"edges": [
# An LLM-emitted (inverted) tested_by edge — should be dropped
{
"source": "file:src/foo.test.ts",
"target": "file:src/foo.ts",
"type": "tested_by",
"direction": "forward",
"weight": 0.5,
},
],
}
assembled, _report = mbg.merge_and_normalize([batch])
# Output should have exactly one tested_by edge with canonical direction
tested_by_edges = [e for e in assembled["edges"] if e["type"] == "tested_by"]
self.assertEqual(len(tested_by_edges), 1)
self.assertEqual(tested_by_edges[0]["source"], "file:src/foo.ts")
self.assertEqual(tested_by_edges[0]["target"], "file:src/foo.test.ts")
# Production node tagged
prod_node = next(n for n in assembled["nodes"] if n["id"] == "file:src/foo.ts")
self.assertIn("tested", prod_node["tags"])
class NormalizeDirectionTests(unittest.TestCase):
"""`direction` canonicalization mirrors the dashboard schema validator."""
def test_missing_defaults_to_forward(self) -> None:
self.assertEqual(mbg.normalize_direction(None), "forward")
self.assertEqual(mbg.normalize_direction(""), "forward")
def test_valid_values_pass_through(self) -> None:
for value in ("forward", "backward", "bidirectional"):
with self.subTest(value=value):
self.assertEqual(mbg.normalize_direction(value), value)
def test_case_is_normalized(self) -> None:
self.assertEqual(mbg.normalize_direction("Forward"), "forward")
self.assertEqual(mbg.normalize_direction("BIDIRECTIONAL"), "bidirectional")
def test_aliases_are_mapped(self) -> None:
self.assertEqual(mbg.normalize_direction("both"), "bidirectional")
self.assertEqual(mbg.normalize_direction("Mutual"), "bidirectional")
def test_unknown_values_fall_back_to_forward(self) -> None:
self.assertEqual(mbg.normalize_direction("sideways"), "forward")
self.assertEqual(mbg.normalize_direction(42), "forward")
class MergeEdgeDirectionTests(unittest.TestCase):
"""End-to-end: merge_and_normalize persists a canonical `direction`."""
def _two_node_batch(self, edge: dict[str, Any]) -> dict[str, Any]:
return {
"nodes": [_file_node("src/a.ts"), _file_node("src/b.ts")],
"edges": [edge],
}
def test_missing_direction_is_persisted_as_forward(self) -> None:
# Reproduces issue #140: edges without a `direction` field still
# reach the final graph and trigger dashboard auto-corrections.
batch = self._two_node_batch({
"source": "file:src/a.ts",
"target": "file:src/b.ts",
"type": "depends_on",
"weight": 0.5,
})
assembled, _report = mbg.merge_and_normalize([batch])
edges = [e for e in assembled["edges"] if e["type"] == "depends_on"]
self.assertEqual(len(edges), 1)
self.assertEqual(edges[0]["direction"], "forward")
def test_alias_is_canonicalized_before_dedup(self) -> None:
# `"both"` and `"bidirectional"` describe the same relationship; without
# canonicalization they get separate dedup keys and leak duplicates.
batch = {
"nodes": [_file_node("src/a.ts"), _file_node("src/b.ts")],
"edges": [
{"source": "file:src/a.ts", "target": "file:src/b.ts",
"type": "depends_on", "direction": "both", "weight": 0.3},
{"source": "file:src/a.ts", "target": "file:src/b.ts",
"type": "depends_on", "direction": "bidirectional", "weight": 0.9},
],
}
assembled, _report = mbg.merge_and_normalize([batch])
edges = [e for e in assembled["edges"] if e["type"] == "depends_on"]
self.assertEqual(len(edges), 1)
self.assertEqual(edges[0]["direction"], "bidirectional")
self.assertEqual(edges[0]["weight"], 0.9)
# ── Multi-part batch handling ─────────────────────────────────────────────
class TestMultiPart(unittest.TestCase):
"""End-to-end tests for batch--part-.json input handling.
These tests invoke merge-batch-graphs.py as a subprocess in a temp
directory so we exercise the full path: glob → load → merge → write.
"""
def setUp(self) -> None:
import tempfile
self.tmp = Path(tempfile.mkdtemp(prefix="ua-mbg-"))
self.intermediate = self.tmp / ".understand-anything" / "intermediate"
self.intermediate.mkdir(parents=True, exist_ok=True)
def tearDown(self) -> None:
import shutil
shutil.rmtree(self.tmp, ignore_errors=True)
def _write_batch(self, name: str, nodes: list, edges: list) -> None:
import json as _j
(self.intermediate / name).write_text(
_j.dumps({"nodes": nodes, "edges": edges}),
encoding="utf-8",
)
def _run_merge(self) -> tuple[int, str, dict]:
import subprocess
import json as _j
result = subprocess.run(
["python3", str(_MODULE_PATH), str(self.tmp)],
capture_output=True, text=True,
)
out_path = self.intermediate / "assembled-graph.json"
assembled = _j.loads(out_path.read_text()) if out_path.exists() else {}
return result.returncode, result.stderr, assembled
def test_two_parts_of_one_logical_batch_merge(self) -> None:
self._write_batch("batch-1-part-1.json",
[_file_node("src/a.ts")],
[{"source": "file:src/a.ts", "target": "file:src/b.ts",
"type": "imports", "direction": "forward", "weight": 0.7}])
self._write_batch("batch-1-part-2.json",
[_file_node("src/b.ts")],
[])
rc, _stderr, assembled = self._run_merge()
self.assertEqual(rc, 0)
node_ids = {n["id"] for n in assembled["nodes"]}
self.assertEqual(node_ids, {"file:src/a.ts", "file:src/b.ts"})
# Cross-part edge survived
edge_keys = {(e["source"], e["target"], e["type"]) for e in assembled["edges"]}
self.assertIn(
("file:src/a.ts", "file:src/b.ts", "imports"), edge_keys)
def test_three_parts_of_one_logical_batch_merge(self) -> None:
for k, path in enumerate(["src/a.ts", "src/b.ts", "src/c.ts"], start=1):
self._write_batch(f"batch-1-part-{k}.json",
[_file_node(path)], [])
rc, _stderr, assembled = self._run_merge()
self.assertEqual(rc, 0)
node_ids = {n["id"] for n in assembled["nodes"]}
self.assertEqual(node_ids,
{"file:src/a.ts", "file:src/b.ts", "file:src/c.ts"})
def test_malformed_part_is_skipped_with_warning(self) -> None:
(self.intermediate / "batch-1-part-1.json").write_text(
"{ this is not valid json", encoding="utf-8")
self._write_batch("batch-1-part-2.json",
[_file_node("src/b.ts")], [])
rc, stderr, assembled = self._run_merge()
self.assertEqual(rc, 0)
# The skip warning is from existing load_batch logic
self.assertIn("skipping batch-1-part-1.json", stderr)
# part-2 content still made it in
node_ids = {n["id"] for n in assembled["nodes"]}
self.assertEqual(node_ids, {"file:src/b.ts"})
def test_mixed_single_and_multi_part(self) -> None:
self._write_batch("batch-1.json",
[_file_node("src/single.ts")], [])
self._write_batch("batch-2-part-1.json",
[_file_node("src/multi-a.ts")], [])
self._write_batch("batch-2-part-2.json",
[_file_node("src/multi-b.ts")], [])
self._write_batch("batch-3.json",
[_file_node("src/another-single.ts")], [])
rc, _stderr, assembled = self._run_merge()
self.assertEqual(rc, 0)
node_ids = {n["id"] for n in assembled["nodes"]}
self.assertEqual(node_ids, {
"file:src/single.ts", "file:src/multi-a.ts",
"file:src/multi-b.ts", "file:src/another-single.ts",
})
def test_missing_part_emits_warning(self) -> None:
# parts {2, 3} present, part-1 missing
self._write_batch("batch-1-part-2.json",
[_file_node("src/b.ts")], [])
self._write_batch("batch-1-part-3.json",
[_file_node("src/c.ts")], [])
rc, stderr, assembled = self._run_merge()
self.assertEqual(rc, 0)
self.assertRegex(stderr,
r"Warning: merge: batch 1 has parts \[2, 3\] but "
r"missing part \[1\] — possible truncated write")
def test_stderr_report_format(self) -> None:
self._write_batch("batch-1.json", [_file_node("src/a.ts")], [])
self._write_batch("batch-2-part-1.json", [_file_node("src/b.ts")], [])
self._write_batch("batch-2-part-2.json", [_file_node("src/c.ts")], [])
rc, stderr, _assembled = self._run_merge()
self.assertEqual(rc, 0)
# 3 files on disk, 2 logical batches, 1 multi-part
self.assertIn(
"Found 3 batch files (2 logical batches, 1 multi-part)", stderr)
# ── Unrecognized batch filename handling ───────────────────────────────────
class TestUnrecognizedBatchFilename(unittest.TestCase):
"""File-analyzer fuses multiple batches into one output (e.g.,
`batch-fused-8-13.json`, `batch-8-13.json`) — the merge script's regex
requires `batch-.json` or `batch--part-.json` and would
otherwise silently drop the contents. The script must warn loudly and
surface the drop in its report so the downstream review step catches it.
"""
def setUp(self) -> None:
import tempfile
self.tmp = Path(tempfile.mkdtemp(prefix="ua-mbg-unrec-"))
self.intermediate = self.tmp / ".understand-anything" / "intermediate"
self.intermediate.mkdir(parents=True, exist_ok=True)
def tearDown(self) -> None:
import shutil
shutil.rmtree(self.tmp, ignore_errors=True)
def _write_batch(self, name: str, nodes: list, edges: list) -> None:
import json as _j
(self.intermediate / name).write_text(
_j.dumps({"nodes": nodes, "edges": edges}),
encoding="utf-8",
)
def _run_merge(self) -> tuple[int, str, dict]:
import subprocess
import json as _j
result = subprocess.run(
["python3", str(_MODULE_PATH), str(self.tmp)],
capture_output=True, text=True,
)
out_path = self.intermediate / "assembled-graph.json"
assembled = _j.loads(out_path.read_text()) if out_path.exists() else {}
return result.returncode, result.stderr, assembled
def test_fused_filename_emits_stderr_warning(self) -> None:
# `batch-fused-3-5.json` does not match the merge regex —
# script must warn on stderr (not silently drop).
self._write_batch("batch-1.json", [_file_node("src/a.ts")], [])
self._write_batch("batch-2.json", [_file_node("src/b.ts")], [])
self._write_batch(
"batch-fused-3-5.json",
[_file_node("src/c.ts"), _file_node("src/d.ts"), _file_node("src/e.ts")],
[],
)
rc, stderr, _assembled = self._run_merge()
self.assertEqual(rc, 0)
self.assertIn("Warning: merge-batch-graphs:", stderr)
self.assertIn("unrecognized filenames", stderr)
self.assertIn("batch-fused-3-5.json", stderr)
# Remediation hint must be present so users know what to fix.
self.assertIn("file-analyzer", stderr)
self.assertIn("batch-.json", stderr)
def test_fused_filename_surfaces_in_report(self) -> None:
# The merge report (printed after the per-file load lines) must
# also flag the drop so Phase 3 review picks it up.
self._write_batch("batch-1.json", [_file_node("src/a.ts")], [])
self._write_batch(
"batch-fused-2-4.json", [_file_node("src/x.ts")], [],
)
rc, stderr, _assembled = self._run_merge()
self.assertEqual(rc, 0)
# "dropped N batch file(s) with unrecognized filenames" appears in the
# report section (printed after "Output: ..." line).
self.assertIn("dropped 1 batch file(s) with unrecognized filenames", stderr)
self.assertIn("batch-fused-2-4.json", stderr)
self.assertIn(
"every node/edge in these files was excluded from the final graph",
stderr,
)
def test_recognized_batches_still_loaded(self) -> None:
# With both recognized and unrecognized files present, recognized
# ones must still produce a valid assembled graph.
self._write_batch("batch-1.json", [_file_node("src/a.ts")], [])
self._write_batch("batch-2.json", [_file_node("src/b.ts")], [])
self._write_batch(
"batch-fused-3-5.json",
[_file_node("src/dropped-c.ts")],
[],
)
rc, _stderr, assembled = self._run_merge()
self.assertEqual(rc, 0)
node_ids = {n["id"] for n in assembled["nodes"]}
# batch-1 + batch-2 survive
self.assertIn("file:src/a.ts", node_ids)
self.assertIn("file:src/b.ts", node_ids)
# batch-fused-3-5.json content is excluded
self.assertNotIn("file:src/dropped-c.ts", node_ids)
self.assertEqual(node_ids, {"file:src/a.ts", "file:src/b.ts"})
def test_range_filename_also_unrecognized(self) -> None:
# A bare range like `batch-8-13.json` is just as broken as
# `batch-fused-8-13.json` — both must be flagged. The regex
# `batch-(\d+)(?:-part-(\d+))?\.json` requires the literal
# `-part-` separator before a second number.
self._write_batch("batch-1.json", [_file_node("src/a.ts")], [])
self._write_batch(
"batch-8-13.json",
[_file_node("src/x.ts"), _file_node("src/y.ts")],
[],
)
rc, stderr, assembled = self._run_merge()
self.assertEqual(rc, 0)
self.assertIn("Warning: merge-batch-graphs:", stderr)
self.assertIn("batch-8-13.json", stderr)
# Content is dropped
node_ids = {n["id"] for n in assembled["nodes"]}
self.assertNotIn("file:src/x.ts", node_ids)
self.assertNotIn("file:src/y.ts", node_ids)
if __name__ == "__main__":
unittest.main()