diff --git a/pyproject.toml b/pyproject.toml index d3df10b..420693e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "graphtty" -version = "0.1.6" +version = "0.1.7" description = "Turn any directed graph into colored ASCII art for your terminal" readme = "README.md" license = "MIT" diff --git a/samples/call-graph/graph.json b/samples/call-graph/graph.json new file mode 100644 index 0000000..db17403 --- /dev/null +++ b/samples/call-graph/graph.json @@ -0,0 +1,78 @@ +{ + "nodes": [ + { + "id": "main", + "name": "main", + "type": "entrypoint", + "description": "CLI entry point" + }, + { + "id": "order_pizza", + "name": "order_pizza", + "type": "function", + "description": "Orchestrates the full order flow" + }, + { + "id": "select_size", + "name": "select_size", + "type": "function", + "description": "Small, Medium, Large" + }, + { + "id": "select_toppings", + "name": "select_toppings", + "type": "function", + "description": "Pepperoni, Mushrooms, Olives" + }, + { + "id": "validate_order", + "name": "validate_order", + "type": "function", + "description": "Checks item availability" + }, + { + "id": "calculate_price", + "name": "calculate_price", + "type": "function", + "description": "Applies discounts and tax" + }, + { + "id": "process_payment", + "name": "process_payment", + "type": "function", + "description": "Stripe API integration" + }, + { + "id": "send_confirmation", + "name": "send_confirmation", + "type": "function", + "description": "Email via SendGrid" + }, + { + "id": "log_order", + "name": "log_order", + "type": "function", + "description": "Writes to order database" + }, + { + "id": "notify_kitchen", + "name": "notify_kitchen", + "type": "function", + "description": "WebSocket push to kitchen display" + } + ], + "edges": [ + { "source": "main", "target": "order_pizza" }, + { "source": "order_pizza", "target": "select_size" }, + { "source": "order_pizza", "target": "select_toppings" }, + { "source": "select_size", "target": "validate_order" }, + { "source": "select_toppings", "target": "validate_order" }, + { "source": "validate_order", "target": "calculate_price" }, + { "source": "calculate_price", "target": "process_payment" }, + { "source": "process_payment", "target": "send_confirmation" }, + { "source": "process_payment", "target": "log_order" }, + { "source": "process_payment", "target": "notify_kitchen" }, + { "source": "log_order", "target": "send_confirmation" }, + { "source": "notify_kitchen", "target": "send_confirmation" } + ] +} diff --git a/screenshots/call-graph.png b/screenshots/call-graph.png new file mode 100644 index 0000000..628e60c Binary files /dev/null and b/screenshots/call-graph.png differ diff --git a/scripts/generate_screenshots.py b/scripts/generate_screenshots.py index f162788..bfae5fe 100644 --- a/scripts/generate_screenshots.py +++ b/scripts/generate_screenshots.py @@ -213,6 +213,11 @@ def main(): "dracula", "Orchestrator Agent (dracula)", ), + ( + "samples/call-graph/graph.json", + "monokai", + "Call Graph (monokai)", + ), ] images: list[tuple[str, Image.Image]] = [] diff --git a/src/graphtty/__init__.py b/src/graphtty/__init__.py index ad4ad66..3ee11b3 100644 --- a/src/graphtty/__init__.py +++ b/src/graphtty/__init__.py @@ -2,6 +2,7 @@ from .renderer import RenderOptions, render from .themes import Theme, get_theme, list_themes +from .truncate import truncate_graph from .types import AsciiEdge, AsciiGraph, AsciiNode __all__ = [ @@ -13,4 +14,5 @@ "get_theme", "list_themes", "render", + "truncate_graph", ] diff --git a/src/graphtty/__main__.py b/src/graphtty/__main__.py index 0bab5d3..1218aad 100644 --- a/src/graphtty/__main__.py +++ b/src/graphtty/__main__.py @@ -92,6 +92,20 @@ def main(argv: list[str] | None = None) -> None: default=None, help="Max output width in columns (0 = no limit, default = terminal width)", ) + parser.add_argument( + "-d", + "--max-depth", + type=int, + default=None, + help="Max graph depth (layers from root)", + ) + parser.add_argument( + "-b", + "--max-breadth", + type=int, + default=None, + help="Max nodes per layer", + ) args = parser.parse_args(argv) @@ -135,6 +149,8 @@ def main(argv: list[str] | None = None) -> None: show_types=not args.no_types, theme=theme, max_width=max_width, + max_depth=args.max_depth, + max_breadth=args.max_breadth, ) print(render(graph, options)) diff --git a/src/graphtty/renderer.py b/src/graphtty/renderer.py index 0210133..40c3636 100644 --- a/src/graphtty/renderer.py +++ b/src/graphtty/renderer.py @@ -37,6 +37,8 @@ class RenderOptions: padding: int = 2 theme: Theme = field(default_factory=lambda: DEFAULT_THEME) max_width: int | None = None + max_depth: int | None = None + max_breadth: int | None = None def render( @@ -59,6 +61,13 @@ def render( if not graph.nodes: return "" + if options.max_depth is not None or options.max_breadth is not None: + from .truncate import truncate_graph + + graph = truncate_graph( + graph, max_depth=options.max_depth, max_breadth=options.max_breadth + ) + use_color = options.theme is not DEFAULT_THEME canvas = _render_canvas(graph, options) return canvas.to_string(use_color=use_color) @@ -268,7 +277,7 @@ def _do_render_canvas( # --------------------------------------------------------------------------- # Node types that are structural markers — no border label needed. -_HIDDEN_TYPE_LABELS = {"__start__", "__end__"} +_HIDDEN_TYPE_LABELS = {"__start__", "__end__", "__truncated__"} def _type_label(node_type: str, show_types: bool) -> str | None: diff --git a/src/graphtty/truncate.py b/src/graphtty/truncate.py new file mode 100644 index 0000000..faaed11 --- /dev/null +++ b/src/graphtty/truncate.py @@ -0,0 +1,169 @@ +"""Graph truncation — prune large graphs by depth and breadth.""" + +from __future__ import annotations + +from collections import deque + +from .types import AsciiEdge, AsciiGraph, AsciiNode + +_DEPTH_PLACEHOLDER_ID = "__truncated_depth__" + + +def truncate_graph( + graph: AsciiGraph, + *, + max_depth: int | None = None, + max_breadth: int | None = None, +) -> AsciiGraph: + """Return a truncated copy of *graph*. + + * **max_depth** — keep only nodes within this many layers from the roots + (in-degree-0 nodes). Nodes beyond the limit are replaced by a single + ``...`` placeholder node. + * **max_breadth** — keep at most this many nodes per layer. Excess nodes + in a layer are collapsed into a per-layer ``...`` placeholder. + + Returns a new :class:`AsciiGraph`; the original is not modified. + """ + if max_depth is None and max_breadth is None: + return graph + + node_ids = {n.id for n in graph.nodes} + + # Build forward adjacency & in-degree + forward: dict[str, list[str]] = {nid: [] for nid in node_ids} + in_degree: dict[str, int] = {nid: 0 for nid in node_ids} + for e in graph.edges: + if e.source in node_ids and e.target in node_ids and e.source != e.target: + forward[e.source].append(e.target) + in_degree[e.target] += 1 + + # Longest-path layer assignment via topological order (Kahn's algorithm). + # Processing in topo order guarantees all incoming edges are resolved + # before a node is expanded — correct for longest-path in a DAG. + roots = [nid for nid in node_ids if in_degree[nid] == 0] + if not roots: + # Pure cycle — treat all nodes as layer 0 + layers: dict[str, int] = {nid: 0 for nid in node_ids} + else: + layers = {nid: 0 for nid in node_ids} + remaining = dict(in_degree) + topo: deque[str] = deque(roots) + while topo: + nid = topo.popleft() + for child in forward[nid]: + new_layer = layers[nid] + 1 + if new_layer > layers[child]: + layers[child] = new_layer + remaining[child] -= 1 + if remaining[child] == 0: + topo.append(child) + + # --- Depth truncation --- + keep_ids: set[str] = set() + depth_trunc_parents: set[str] = set() # kept nodes with children beyond limit + + if max_depth is not None: + for nid, layer in layers.items(): + if layer <= max_depth: + keep_ids.add(nid) + # Find kept nodes that have children beyond the depth limit + for nid in list(keep_ids): + for child in forward[nid]: + if child not in keep_ids: + depth_trunc_parents.add(nid) + else: + keep_ids = set(node_ids) + + # --- Breadth truncation --- + breadth_replacements: dict[str, str] = {} # removed_id -> placeholder_id + + if max_breadth is not None and max_breadth >= 1: + # Group nodes by layer (preserving original graph order) + max_layer = max(layers.values()) if layers else 0 + nodes_by_layer: list[list[str]] = [[] for _ in range(max_layer + 1)] + for n in graph.nodes: + nodes_by_layer[layers[n.id]].append(n.id) + + for layer_idx, layer_nodes in enumerate(nodes_by_layer): + # Only consider nodes that survived depth truncation + surviving = [nid for nid in layer_nodes if nid in keep_ids] + if len(surviving) <= max_breadth: + continue + # Keep first (max_breadth - 1) nodes, replace rest with placeholder + removed = surviving[max_breadth - 1 :] + placeholder_id = f"__truncated_breadth_{layer_idx}__" + for rid in removed: + keep_ids.discard(rid) + breadth_replacements[rid] = placeholder_id + # If this node was a depth-truncation parent, remove it + depth_trunc_parents.discard(rid) + + # --- Build result nodes --- + result_nodes: list[AsciiNode] = [] + added_placeholders: set[str] = set() + + for n in graph.nodes: + if n.id in keep_ids: + # Recurse into subgraphs + if n.subgraph and n.subgraph.nodes: + sub = truncate_graph( + n.subgraph, max_depth=max_depth, max_breadth=max_breadth + ) + result_nodes.append( + AsciiNode( + id=n.id, + name=n.name, + type=n.type, + description=n.description, + subgraph=sub, + ) + ) + else: + result_nodes.append(n) + elif n.id in breadth_replacements: + pid = breadth_replacements[n.id] + if pid not in added_placeholders: + added_placeholders.add(pid) + result_nodes.append(AsciiNode(id=pid, name="...", type="__truncated__")) + + if depth_trunc_parents: + result_nodes.append( + AsciiNode(id=_DEPTH_PLACEHOLDER_ID, name="...", type="__truncated__") + ) + + # --- Build result edges --- + result_node_ids = {n.id for n in result_nodes} + seen_edges: set[tuple[str, str]] = set() + result_edges: list[AsciiEdge] = [] + + for e in graph.edges: + src = e.source + tgt = e.target + + # Remap breadth-truncated nodes + if src in breadth_replacements: + src = breadth_replacements[src] + if tgt in breadth_replacements: + tgt = breadth_replacements[tgt] + + # Depth-truncated target: redirect to depth placeholder + if src in result_node_ids and tgt not in result_node_ids: + if src in depth_trunc_parents: + tgt = _DEPTH_PLACEHOLDER_ID + + if src not in result_node_ids or tgt not in result_node_ids: + continue + if src == tgt: + continue + + pair = (src, tgt) + if pair in seen_edges: + continue + seen_edges.add(pair) + + # Preserve label only if both endpoints are original (not placeholders) + label = e.label if src == e.source and tgt == e.target else None + result_edges.append(AsciiEdge(source=src, target=tgt, label=label)) + + return AsciiGraph(nodes=result_nodes, edges=result_edges) diff --git a/tests/test_cli.py b/tests/test_cli.py index b9ec401..93c2130 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -153,6 +153,7 @@ class TestCLISamples: "samples/deep-agent/graph.json", "samples/supervisor-agent/graph.json", "samples/world-map/graph.json", + "samples/call-graph/graph.json", ] ) def sample_path(self, request): diff --git a/tests/test_truncate.py b/tests/test_truncate.py new file mode 100644 index 0000000..2d0beca --- /dev/null +++ b/tests/test_truncate.py @@ -0,0 +1,371 @@ +"""Tests for graph truncation (depth/breadth pruning).""" + +from graphtty import AsciiEdge, AsciiGraph, AsciiNode, RenderOptions, render +from graphtty.truncate import truncate_graph + + +def _ids(graph: AsciiGraph) -> set[str]: + return {n.id for n in graph.nodes} + + +def _edge_pairs(graph: AsciiGraph) -> set[tuple[str, str]]: + return {(e.source, e.target) for e in graph.edges} + + +# --------------------------------------------------------------------------- +# Depth truncation +# --------------------------------------------------------------------------- + + +class TestDepthTruncation: + def test_chain_depth_1(self): + """A→B→C→D with max_depth=1 keeps A, B and adds '...' placeholder.""" + g = AsciiGraph( + nodes=[ + AsciiNode(id="A", name="A"), + AsciiNode(id="B", name="B"), + AsciiNode(id="C", name="C"), + AsciiNode(id="D", name="D"), + ], + edges=[ + AsciiEdge(source="A", target="B"), + AsciiEdge(source="B", target="C"), + AsciiEdge(source="C", target="D"), + ], + ) + result = truncate_graph(g, max_depth=1) + ids = _ids(result) + assert "A" in ids + assert "B" in ids + assert "C" not in ids + assert "D" not in ids + assert "__truncated_depth__" in ids + # B should connect to the placeholder + assert ("B", "__truncated_depth__") in _edge_pairs(result) + + def test_longest_path_reconverging(self): + """Longest-path must follow the longest route through re-converging paths. + + A→D→E (short) and A→B→C→D→E (long). D should be at layer 3, E at 4. + With max_depth=2 only A, B, C are kept (layers 0, 1, 2). + """ + g = AsciiGraph( + nodes=[ + AsciiNode(id="A", name="A"), + AsciiNode(id="B", name="B"), + AsciiNode(id="C", name="C"), + AsciiNode(id="D", name="D"), + AsciiNode(id="E", name="E"), + ], + edges=[ + AsciiEdge(source="A", target="D"), + AsciiEdge(source="A", target="B"), + AsciiEdge(source="B", target="C"), + AsciiEdge(source="C", target="D"), + AsciiEdge(source="D", target="E"), + ], + ) + result = truncate_graph(g, max_depth=2) + ids = _ids(result) + assert {"A", "B", "C"} <= ids + # D is at layer 3 (longest path A→B→C→D), so it's truncated + assert "D" not in ids + assert "E" not in ids + assert "__truncated_depth__" in ids + + def test_fan_out_depth_0(self): + """max_depth=0 keeps only root(s) + placeholder.""" + g = AsciiGraph( + nodes=[ + AsciiNode(id="root", name="root"), + AsciiNode(id="a", name="a"), + AsciiNode(id="b", name="b"), + ], + edges=[ + AsciiEdge(source="root", target="a"), + AsciiEdge(source="root", target="b"), + ], + ) + result = truncate_graph(g, max_depth=0) + ids = _ids(result) + assert ids == {"root", "__truncated_depth__"} + assert ("root", "__truncated_depth__") in _edge_pairs(result) + + def test_depth_no_truncation_needed(self): + """Graph fits within max_depth — no placeholder added.""" + g = AsciiGraph( + nodes=[ + AsciiNode(id="A", name="A"), + AsciiNode(id="B", name="B"), + ], + edges=[AsciiEdge(source="A", target="B")], + ) + result = truncate_graph(g, max_depth=5) + assert _ids(result) == {"A", "B"} + assert "__truncated_depth__" not in _ids(result) + + def test_diamond_depth(self): + """Diamond: A→B, A→C, B→D, C→D — longest-path layers: A=0,B=1,C=1,D=2.""" + g = AsciiGraph( + nodes=[ + AsciiNode(id="A", name="A"), + AsciiNode(id="B", name="B"), + AsciiNode(id="C", name="C"), + AsciiNode(id="D", name="D"), + ], + edges=[ + AsciiEdge(source="A", target="B"), + AsciiEdge(source="A", target="C"), + AsciiEdge(source="B", target="D"), + AsciiEdge(source="C", target="D"), + ], + ) + result = truncate_graph(g, max_depth=1) + ids = _ids(result) + assert {"A", "B", "C"} <= ids + assert "D" not in ids + assert "__truncated_depth__" in ids + + +# --------------------------------------------------------------------------- +# Breadth truncation +# --------------------------------------------------------------------------- + + +class TestBreadthTruncation: + def test_wide_layer(self): + """5-node layer with max_breadth=3 → 2 nodes + placeholder.""" + nodes = [AsciiNode(id="root", name="root")] + edges = [] + for i in range(5): + nid = f"n{i}" + nodes.append(AsciiNode(id=nid, name=nid)) + edges.append(AsciiEdge(source="root", target=nid)) + g = AsciiGraph(nodes=nodes, edges=edges) + + result = truncate_graph(g, max_breadth=3) + ids = _ids(result) + assert "root" in ids + # Should have 2 original nodes + 1 placeholder = 3 at layer 1 + layer1_ids = ids - {"root"} + assert len(layer1_ids) == 3 + assert any("__truncated_breadth_" in nid for nid in layer1_ids) + + def test_breadth_no_truncation_needed(self): + """Layers within limit — no placeholder.""" + g = AsciiGraph( + nodes=[ + AsciiNode(id="A", name="A"), + AsciiNode(id="B", name="B"), + AsciiNode(id="C", name="C"), + ], + edges=[ + AsciiEdge(source="A", target="B"), + AsciiEdge(source="A", target="C"), + ], + ) + result = truncate_graph(g, max_breadth=5) + assert _ids(result) == {"A", "B", "C"} + + +# --------------------------------------------------------------------------- +# Combined +# --------------------------------------------------------------------------- + + +class TestCombinedTruncation: + def test_depth_and_breadth(self): + """Both limits applied together.""" + # root → a, b, c, d (layer 1, 4 nodes) + # a → leaf (layer 2) + nodes = [ + AsciiNode(id="root", name="root"), + AsciiNode(id="a", name="a"), + AsciiNode(id="b", name="b"), + AsciiNode(id="c", name="c"), + AsciiNode(id="d", name="d"), + AsciiNode(id="leaf", name="leaf"), + ] + edges = [ + AsciiEdge(source="root", target="a"), + AsciiEdge(source="root", target="b"), + AsciiEdge(source="root", target="c"), + AsciiEdge(source="root", target="d"), + AsciiEdge(source="a", target="leaf"), + ] + g = AsciiGraph(nodes=nodes, edges=edges) + + result = truncate_graph(g, max_depth=1, max_breadth=3) + ids = _ids(result) + # root kept (layer 0) + assert "root" in ids + # layer 1 breadth-truncated to 2 nodes + placeholder + # leaf removed by depth truncation + assert "leaf" not in ids + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + + +class TestEdgeCases: + def test_noop_no_limits(self): + """No limits → return original graph.""" + g = AsciiGraph( + nodes=[AsciiNode(id="A", name="A")], + edges=[], + ) + result = truncate_graph(g, max_depth=None, max_breadth=None) + assert result is g + + def test_single_node(self): + """Single node graph with depth=0.""" + g = AsciiGraph( + nodes=[AsciiNode(id="only", name="only")], + edges=[], + ) + result = truncate_graph(g, max_depth=0) + assert _ids(result) == {"only"} + + def test_empty_graph(self): + """Empty graph stays empty.""" + g = AsciiGraph(nodes=[], edges=[]) + result = truncate_graph(g, max_depth=1, max_breadth=2) + assert len(result.nodes) == 0 + + def test_edge_deduplication(self): + """Multiple parents → same placeholder should produce one edge each.""" + g = AsciiGraph( + nodes=[ + AsciiNode(id="p1", name="p1"), + AsciiNode(id="p2", name="p2"), + AsciiNode(id="c1", name="c1"), + AsciiNode(id="c2", name="c2"), + ], + edges=[ + AsciiEdge(source="p1", target="c1"), + AsciiEdge(source="p1", target="c2"), + AsciiEdge(source="p2", target="c1"), + AsciiEdge(source="p2", target="c2"), + ], + ) + # p1, p2 are layer 0; c1, c2 are layer 1 + result = truncate_graph(g, max_depth=0) + edges = _edge_pairs(result) + # Each parent should have exactly one edge to the placeholder + p1_edges = [(s, t) for s, t in edges if s == "p1"] + p2_edges = [(s, t) for s, t in edges if s == "p2"] + assert len(p1_edges) == 1 + assert len(p2_edges) == 1 + + def test_self_loop_ignored(self): + """Self-loops should not create placeholder edges to self.""" + g = AsciiGraph( + nodes=[ + AsciiNode(id="A", name="A"), + AsciiNode(id="B", name="B"), + ], + edges=[ + AsciiEdge(source="A", target="B"), + AsciiEdge(source="A", target="A"), # self-loop + ], + ) + result = truncate_graph(g, max_depth=1) + # Self-loop should not cause issues + assert "A" in _ids(result) + + def test_placeholder_node_name_is_ellipsis(self): + """Placeholder nodes have name='...'.""" + g = AsciiGraph( + nodes=[ + AsciiNode(id="A", name="A"), + AsciiNode(id="B", name="B"), + AsciiNode(id="C", name="C"), + ], + edges=[ + AsciiEdge(source="A", target="B"), + AsciiEdge(source="B", target="C"), + ], + ) + result = truncate_graph(g, max_depth=1) + placeholders = [n for n in result.nodes if n.type == "__truncated__"] + assert len(placeholders) == 1 + assert placeholders[0].name == "..." + + +# --------------------------------------------------------------------------- +# Subgraph recursion +# --------------------------------------------------------------------------- + + +class TestSubgraphRecursion: + def test_subgraph_truncated(self): + """Truncation should recurse into subgraphs.""" + inner = AsciiGraph( + nodes=[ + AsciiNode(id="s1", name="s1"), + AsciiNode(id="s2", name="s2"), + AsciiNode(id="s3", name="s3"), + ], + edges=[ + AsciiEdge(source="s1", target="s2"), + AsciiEdge(source="s2", target="s3"), + ], + ) + g = AsciiGraph( + nodes=[ + AsciiNode(id="outer", name="outer", subgraph=inner), + ], + edges=[], + ) + result = truncate_graph(g, max_depth=1) + sub = result.nodes[0].subgraph + assert sub is not None + sub_ids = _ids(sub) + assert "s1" in sub_ids + assert "s2" in sub_ids + assert "s3" not in sub_ids + + +# --------------------------------------------------------------------------- +# Render integration +# --------------------------------------------------------------------------- + + +class TestRenderIntegration: + def test_render_with_depth(self): + """render() with max_depth produces output containing '...'.""" + g = AsciiGraph( + nodes=[ + AsciiNode(id="A", name="A"), + AsciiNode(id="B", name="B"), + AsciiNode(id="C", name="C"), + ], + edges=[ + AsciiEdge(source="A", target="B"), + AsciiEdge(source="B", target="C"), + ], + ) + opts = RenderOptions(max_depth=1) + out = render(g, opts) + assert "..." in out + assert "A" in out + assert "B" in out + # C should be replaced by "..." + assert "C" not in out + + def test_render_with_breadth(self): + """render() with max_breadth produces output containing '...'.""" + nodes = [AsciiNode(id="root", name="root")] + edges = [] + for i in range(5): + nid = f"child{i}" + nodes.append(AsciiNode(id=nid, name=nid)) + edges.append(AsciiEdge(source="root", target=nid)) + g = AsciiGraph(nodes=nodes, edges=edges) + + opts = RenderOptions(max_breadth=3) + out = render(g, opts) + assert "..." in out + assert "root" in out diff --git a/uv.lock b/uv.lock index c4cf558..87b0583 100644 --- a/uv.lock +++ b/uv.lock @@ -105,7 +105,7 @@ toml = [ [[package]] name = "graphtty" -version = "0.1.6" +version = "0.1.7" source = { editable = "." } [package.dev-dependencies]