diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..351a17f --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,98 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +panel-reactflow is a Panel extension that wraps the React Flow JS library (@xyflow/react), providing Python-first interactive node-graph editing with bidirectional sync between Python and the browser. It is part of the panel-extensions ecosystem under HoloViz. + +## Development Setup + +This project uses [pixi](https://pixi.sh) for environment management. After cloning: + +```bash +pixi run pre-commit-install +pixi run postinstall # editable pip install +pixi run test # run unit tests +pixi run test-coverage # run tests with coverage +``` + +For UI tests (requires Playwright + Chromium): +```bash +pixi run -e test-ui test-ui +``` + +For docs: +```bash +pixi run docs-serve # live-reload dev server +pixi run docs-build # build static site to builtdocs/ +``` + +Alternative setup with uv: +```bash +uv venv && source .venv/bin/activate +uv pip install -e .[dev] +pre-commit run install +pytest tests +``` + +### Running a Single Test + +```bash +pixi run pytest tests/test_api.py::test_reactflow_add_remove -xvs +``` + +UI tests require `--ui` flag and are skipped by default. + +### Linting + +Pre-commit hooks handle linting (ruff, prettier, codespell, eslint for JS). Run all checks: +```bash +pixi run -e lint pre-commit-run +``` + +### Building + +```bash +pixi run -e build build-wheel +pixi run -e build check-wheel +``` + +## Architecture + +### Python Side (`src/panel_reactflow/`) + +- **`base.py`** — The core module. Contains: + - `ReactFlow` — Main component, extends `panel.custom.ReactComponent`. Manages nodes/edges as list-of-dicts params, syncs state bidirectionally with the JS frontend via `_handle_msg`/`_send_msg`. Supports event callbacks via `.on(event_type, callback)`. + - `NodeSpec`/`EdgeSpec` — Dataclasses for constructing node/edge dicts with `.to_dict()`/`.from_dict()` roundtrip. + - `NodeType`/`EdgeType` — Dataclasses for type definitions with optional schema (JSON Schema dict, `param.Parameterized` subclass, or Pydantic `BaseModel`). Schemas are normalized to JSON Schema via `_normalize_schema()`. + - `Editor`/`JsonEditor`/`SchemaEditor` — Editor classes (extend `panel.viewable.Viewer`) following the signature `(data, schema, *, id, type, on_patch)`. `SchemaEditor` auto-generates widget forms from JSON Schema when properties are available, falling back to raw JSON editor. + - Schema helpers: `_param_to_jsonschema()`, `_pydantic_to_jsonschema()`, `_normalize_schema()`, `_validate_data()`, `_coerce_spec_map()`. + +- **`schema.py`** — `JSONSchema` pane (extends `panel.pane.base.PaneBase`). Converts JSON Schema property definitions into Panel Material UI widgets. Used by `SchemaEditor` when a schema with properties is available. + +- **`models/reactflow.jsx`** — The React/JSX frontend component. Uses `@xyflow/react` v12. Handles canvas rendering, drag/select/connect/delete interactions, and syncs state back to Python via message passing. + +### Build System + +- Uses `hatchling` with a custom build hook (`hatch_build.py`) that compiles the JSX into a JS bundle via `panel.io.compile.compile_components`. +- Version is managed by `setuptools-scm` / `hatch-vcs` from git tags. +- The compiled bundle lives at `src/panel_reactflow/dist/panel-reactflow.bundle.js`. + +### Key Patterns + +- **Nodes carry `view` entries** — Panel viewables placed in `node["view"]` are extracted during `_process_param_change`, replaced with a `view_idx` integer, and rendered via the `_views` Children param. The JSX side reads `data.view_idx` to embed the corresponding Panel model. +- **Editor resolution** — Per-node/edge editors are resolved in `_update_node_editors`/`_update_edge_editors`. Priority: type-specific editor from `node_editors` dict > `default_node_editor` > `SchemaEditor` fallback. +- **Frontend sync** — The JS side sends typed messages (`sync`, `node_moved`, `selection_changed`, `edge_added`, `node_deleted`, etc.) handled by `_handle_msg` in Python. Python-to-JS patches go through `_send_msg`. +- **`node_types`/`edge_types`** are normalized to JSON-serializable dicts via `_coerce_spec_map()` on init and on param change, supporting raw dicts, `NodeType`/`EdgeType` dataclasses, or bare `param.Parameterized`/Pydantic classes as shorthand. + +### Test Structure + +- `tests/test_core.py` — Smoke test (import check). +- `tests/test_api.py` — Unit tests for specs, graph operations, schema normalization, editor resolution, type normalization, NetworkX interop. +- `tests/ui/test_ui.py` — Playwright browser tests (marked `@pytest.mark.ui`, require `--ui` flag). +- `tests/conftest.py` — Imports Panel's `document`/`comm` fixtures, adds `--ui` CLI option. + +### Docs + +Built with [zensical](https://github.com/zensical/zensical) (MkDocs-based). Config in `zensical.toml`. Docstring style is numpy. diff --git a/docs/index.md b/docs/index.md index 91c2583..a199a9a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -65,3 +65,4 @@ flow.servable() ## Reference - [API reference](reference/panel_reactflow.md) +- [Release notes](releases.md) diff --git a/docs/releases.md b/docs/releases.md new file mode 100644 index 0000000..b14c61e --- /dev/null +++ b/docs/releases.md @@ -0,0 +1,42 @@ +# Release Notes + +## Version 0.1.0 + +*Initial release* + +Panel-ReactFlow brings the power of [React Flow](https://reactflow.dev/) to +the [Panel](https://panel.holoviz.org/) ecosystem, giving Python developers a +fully interactive node-based graph editor with minimal boilerplate. + +### Highlights + +- **ReactFlow component** — a Panel component that renders an interactive + React Flow canvas with drag, select, connect, and delete support. +- **Python-first graph state** — define nodes and edges as plain Python + dictionaries; all changes sync bidirectionally between the frontend and + Python. +- **Node & edge types** — use `NodeType` and `EdgeType` dataclasses to + declare typed nodes and edges with optional JSON Schema for their `data` + payloads. +- **Schema-driven editors** — auto-generate editing forms from JSON Schema + (via `SchemaEditor`) or fall back to a raw JSON tree view + (`JsonEditor`). Custom editors can be any Python callable or class. +- **Editor display modes** — show editors inline inside nodes + (`editor_mode="node"`), in a toolbar popover (`"toolbar"`), or in a + side panel (`"side"`). +- **Embedded views** — pass any Panel `Viewable` as a node's `view` to + render rich content (charts, indicators, widgets) directly inside graph + nodes. +- **Event system** — subscribe to granular events (`node_added`, + `edge_deleted`, `selection_changed`, …) or use `"*"` to listen to + everything. +- **Custom handles** — configure named input and output ports per node type. +- **Stylesheets** — apply CSS stylesheets targeting `.react-flow__node-{type}` + and `.react-flow__edge-{type}` classes for full visual control. +- **Panel integration** — `top_panel`, `bottom_panel`, `left_panel`, and + `right_panel` slots for surrounding the canvas with arbitrary Panel + components. +- **Helper dataclasses** — `NodeSpec` and `EdgeSpec` for validated node/edge + construction; convenience methods `add_node`, `remove_node`, `add_edge`, + `remove_edge`, and `patch_node_data` / `patch_edge_data` for live graph + manipulation. diff --git a/pixi.toml b/pixi.toml index 4d9d6ea..b8a7249 100644 --- a/pixi.toml +++ b/pixi.toml @@ -24,6 +24,7 @@ hatchling = "*" hatch-vcs = "*" nodejs = ">=18" esbuild = "*" +hvplot = ">=0.12.2,<0.13" [feature.py310.dependencies] python = "3.10.*" diff --git a/src/panel_reactflow/__init__.py b/src/panel_reactflow/__init__.py index c3dd158..583da17 100644 --- a/src/panel_reactflow/__init__.py +++ b/src/panel_reactflow/__init__.py @@ -12,6 +12,7 @@ SchemaEditor, SchemaSource, ) +from .pipeline import Pipeline __all__ = [ "EdgeSpec", @@ -20,6 +21,7 @@ "JsonEditor", "NodeSpec", "NodeType", + "Pipeline", "ReactFlow", "SchemaEditor", "SchemaSource", diff --git a/src/panel_reactflow/pipeline.py b/src/panel_reactflow/pipeline.py new file mode 100644 index 0000000..c50b63e --- /dev/null +++ b/src/panel_reactflow/pipeline.py @@ -0,0 +1,518 @@ +"""Pipeline: visual data-flow built from parameterized stages.""" + +from __future__ import annotations + +from collections import defaultdict +from typing import Any + +import panel as pn +import param +from panel.viewable import Viewer + +from .base import NodeSpec, ReactFlow + +# --------------------------------------------------------------------------- +# Pipeline node styling +# --------------------------------------------------------------------------- + +_PIPELINE_CSS = """\ +.react-flow__node.rf-auto-input { + border: 1.5px solid #6366f1; + background: linear-gradient(180deg, #eef2ff 0%, #ffffff 100%); + overflow: visible; +} +.react-flow__node.rf-auto-input::before { + content: "input"; + position: absolute; + top: -9px; + left: 10px; + background: #6366f1; + color: #fff; + font-size: 10px; + font-weight: 600; + padding: 1px 8px; + border-radius: 9999px; + letter-spacing: 0.04em; + text-transform: uppercase; + line-height: 16px; +} +.react-flow__node.rf-auto-input.selected { + border-color: #4f46e5; + box-shadow: 0 0 0 1.5px rgba(99, 102, 241, 0.3); +} +.react-flow__node.rf-stage { + border: 1.5px solid #10b981; + background: linear-gradient(180deg, #ecfdf5 0%, #ffffff 100%); + overflow: visible; +} +.react-flow__node.rf-stage::before { + content: "output"; + position: absolute; + top: -9px; + left: 10px; + background: #10b981; + color: #fff; + font-size: 10px; + font-weight: 600; + padding: 1px 8px; + border-radius: 9999px; + letter-spacing: 0.04em; + text-transform: uppercase; + line-height: 16px; +} +.react-flow__node.rf-stage.selected { + border-color: #059669; + box-shadow: 0 0 0 1.5px rgba(16, 185, 129, 0.3); +} +""" + + +# --------------------------------------------------------------------------- +# Introspection helpers +# --------------------------------------------------------------------------- + +# Parameters inherited from Parameterized that should never be treated as +# stage inputs. +_BASE_PARAMS: set[str] = set(param.Parameterized.param) +try: + from panel.viewable import Viewable + + _BASE_PARAMS |= set(Viewable.param) +except Exception: + pass + + +def _get_outputs(instance: param.Parameterized) -> dict[str, tuple]: + """Return ``{name: (type, method, index)}`` from ``@param.output``.""" + return instance.param.outputs() + + +def _get_input_params(instance: param.Parameterized) -> dict[str, param.Parameter]: + """Return non-base, non-private parameters suitable as stage inputs.""" + return {name: p for name, p in instance.param.objects("existing").items() if name not in _BASE_PARAMS and not name.startswith("_")} + + +def _get_view_methods( + instance: param.Parameterized, + output_method_names: set[str], +) -> list: + """Return public ``@param.depends`` bound methods that are not outputs.""" + views = [] + for attr_name in sorted(dir(type(instance))): + if attr_name.startswith("_"): + continue + if attr_name in output_method_names: + continue + func = getattr(type(instance), attr_name, None) + if callable(func) and hasattr(func, "_dinfo"): + views.append(getattr(instance, attr_name)) + return views + + +def _make_output_view( + instance: param.Parameterized, + method: Any, + index: int | None, +) -> pn.viewable.Viewable: + """Create a reactive Panel view for a single output. + + For single-output methods (``index is None``), the bound method is + rendered directly. For multi-output methods, a ``pn.bind`` wrapper + extracts the correct element from the tuple. + """ + if index is None: + return pn.panel(method) + + method_name = method.__name__ + deps = instance.param.method_dependencies(method_name) + dep_params = [instance.param[dep.name] for dep in deps] + + def _extract(*_args, _m=method, _i=index): + try: + return _m()[_i] + except Exception: + return None + + if dep_params: + return pn.panel(pn.bind(_extract, *dep_params)) + return pn.panel(_extract()) + + +def _infer_edges( + stage_names: list[str], + instances: dict[str, param.Parameterized], + outputs_map: dict[str, dict[str, tuple]], +) -> list[tuple[str, str, str]]: + """Infer edges by matching output names to downstream parameter names. + + Returns a list of ``(source_name, target_name, param_name)`` triples. + """ + edges: list[tuple[str, str, str]] = [] + for i, src_name in enumerate(stage_names): + src_outputs = outputs_map.get(src_name, {}) + for output_name in src_outputs: + for tgt_name in stage_names[i + 1 :]: + tgt_inputs = _get_input_params(instances[tgt_name]) + if output_name in tgt_inputs: + edges.append((src_name, tgt_name, output_name)) + return edges + + +def _resolve_explicit_graph( + graph: dict[str, str | tuple[str, ...]], + instances: dict[str, param.Parameterized], + outputs_map: dict[str, dict[str, tuple]], +) -> list[tuple[str, str, str]]: + """Convert an explicit ``graph`` dict into edge triples. + + For each ``source -> target(s)`` entry, match source outputs to target + input parameters by name. + """ + edges: list[tuple[str, str, str]] = [] + for src_name, targets in graph.items(): + if isinstance(targets, str): + targets = (targets,) + src_outputs = outputs_map.get(src_name, {}) + for tgt_name in targets: + tgt_inputs = _get_input_params(instances[tgt_name]) + for output_name in src_outputs: + if output_name in tgt_inputs: + edges.append((src_name, tgt_name, output_name)) + return edges + + +def _compute_positions( + stage_names: list[str], + edges: list[tuple[str, str, str]], + spacing: tuple[float, float], +) -> dict[str, dict[str, float]]: + """Compute simple left-to-right positions for stages. + + Linear chains are placed in a single row. When fan-out occurs (a node + has multiple outgoing targets at the same depth), branches are stacked + vertically. + """ + sx, sy = spacing + + # Build adjacency for topological depth assignment. + children: dict[str, set[str]] = defaultdict(set) + parents: dict[str, set[str]] = defaultdict(set) + for src, tgt, _ in edges: + children[src].add(tgt) + parents[tgt].add(src) + + # Assign depth (column) via BFS from roots. + depth: dict[str, int] = {} + # Roots are stages with no parents (among stages in edges). + all_in_edges = {s for s, _, _ in edges} | {t for _, t, _ in edges} + roots = [name for name in stage_names if name not in parents or name not in all_in_edges] + if not roots: + roots = [stage_names[0]] + + queue = list(roots) + for r in roots: + depth.setdefault(r, 0) + + while queue: + node = queue.pop(0) + for child in children.get(node, []): + new_depth = depth[node] + 1 + if child not in depth or new_depth > depth[child]: + depth[child] = new_depth + queue.append(child) + + # Stages with no edges get sequential depth. + next_col = max(depth.values(), default=-1) + 1 + for name in stage_names: + if name not in depth: + depth[name] = next_col + next_col += 1 + + # Group stages by depth for vertical stacking. + by_depth: dict[int, list[str]] = defaultdict(list) + for name in stage_names: + by_depth[depth[name]].append(name) + + positions: dict[str, dict[str, float]] = {} + for col, names in by_depth.items(): + total_height = (len(names) - 1) * sy + start_y = -total_height / 2 + for row, name in enumerate(names): + positions[name] = {"x": col * sx, "y": start_y + row * sy} + + return positions + + +# --------------------------------------------------------------------------- +# Pipeline class +# --------------------------------------------------------------------------- + + +class Pipeline(Viewer): + """Visual pipeline built from parameterized stages. + + Parameters + ---------- + stages : list + List of ``(name, class_or_instance)`` tuples. Classes are + instantiated automatically. + graph : dict or None + Explicit topology mapping source stage names to target name(s). + When *None*, edges are inferred by matching ``@param.output`` + names to downstream parameter names. + layout_spacing : tuple + ``(horizontal, vertical)`` spacing in pixels between nodes. + kwargs : dict + Extra keyword arguments forwarded to the ``ReactFlow`` constructor. + """ + + stages = param.List( + doc="List of (name, class_or_instance) tuples.", + ) + graph = param.Dict( + default=None, + allow_None=True, + doc=( + "Explicit topology: {source_name: target_name | (t1, t2, ...)}. " + "If None, edges are inferred by matching @param.output names to " + "downstream parameter names." + ), + ) + layout_spacing = param.NumericTuple( + default=(350, 150), + doc="(horizontal, vertical) spacing between nodes in pixels.", + ) + auto_inputs = param.Boolean( + default=True, + doc=("Auto-generate input widget nodes for stage parameters that have " "no incoming edge from another stage."), + ) + kwargs = param.Dict( + default={}, + doc="Extra keyword arguments forwarded to ReactFlow.", + ) + + def __init__(self, **params): + super().__init__(**params) + self._instances: dict[str, param.Parameterized] = {} + self._outputs: dict[str, dict[str, tuple]] = {} + self._edges: list[tuple[str, str, str]] = [] + self._input_views: dict[str, pn.viewable.Viewable] = {} + self._flow: ReactFlow | None = None + self._build() + + # ------------------------------------------------------------------ + # Build + # ------------------------------------------------------------------ + + def _find_unconnected_params( + self, + stage_names: list[str], + ) -> list[tuple[str, str]]: + """Return ``(stage_name, param_name)`` pairs with no incoming edge.""" + connected: dict[str, set[str]] = defaultdict(set) + for _src, tgt, pname in self._edges: + connected[tgt].add(pname) + + result: list[tuple[str, str]] = [] + for name in stage_names: + instance = self._instances[name] + for pname in _get_input_params(instance): + if pname not in connected.get(name, set()): + result.append((name, pname)) + return result + + def _build(self) -> None: + stage_names: list[str] = [] + + # 1. Instantiate stages + for name, cls_or_instance in self.stages: + stage_names.append(name) + if isinstance(cls_or_instance, type): + instance = cls_or_instance(name=name) + else: + instance = cls_or_instance + self._instances[name] = instance + + # 2. Introspect outputs + for name, instance in self._instances.items(): + self._outputs[name] = _get_outputs(instance) + + # 3. Infer or resolve edges + if self.graph is None: + self._edges = _infer_edges(stage_names, self._instances, self._outputs) + else: + self._edges = _resolve_explicit_graph(self.graph, self._instances, self._outputs) + + # 4. Wire reactivity (stage-to-stage only) + self._wire(self._edges) + + # 5. Auto-generate input nodes for unconnected params + all_names = list(stage_names) + all_edges = list(self._edges) + self._input_views.clear() + + if self.auto_inputs: + unconnected = self._find_unconnected_params(stage_names) + for stage_name, param_name in unconnected: + input_id = f"{stage_name}:{param_name}" + instance = self._instances[stage_name] + widget = pn.panel(instance.param[param_name]) + self._input_views[input_id] = widget + all_names.append(input_id) + all_edges.append((input_id, stage_name, param_name)) + + # 6. Build ReactFlow + self._flow = self._build_reactflow(all_names, all_edges) + + def _wire(self, edges: list[tuple[str, str, str]]) -> None: + """Set up ``param.watch`` watchers so upstream outputs propagate.""" + for src_name, tgt_name, param_name in edges: + src = self._instances[src_name] + tgt = self._instances[tgt_name] + output_info = self._outputs[src_name][param_name] + # output_info = (param_type_instance, bound_method, index) + _ptype, method, index = output_info + + # Find the parameter dependencies of the output method. + method_name = method.__name__ + deps = src.param.method_dependencies(method_name) + dep_names = [dep.name for dep in deps] + + if not dep_names: + # No explicit dependencies — try to fire on any non-private + # param change; or at minimum set the initial value. + self._propagate_output(src, method, index, tgt, param_name) + continue + + # Capture variables for the closure. + def _make_watcher(_src, _method, _index, _tgt, _param_name): + def _watcher(*events): + self._propagate_output(_src, _method, _index, _tgt, _param_name) + + return _watcher + + watcher = _make_watcher(src, method, index, tgt, param_name) + src.param.watch(watcher, dep_names) + + # Set initial value. + self._propagate_output(src, method, index, tgt, param_name) + + @staticmethod + def _propagate_output( + src: param.Parameterized, + method: Any, + index: int | None, + tgt: param.Parameterized, + param_name: str, + ) -> None: + """Call the output method and assign the result to the target.""" + try: + value = method() + except Exception: + return + if index is not None: + try: + value = value[index] + except (IndexError, TypeError, KeyError): + return + setattr(tgt, param_name, value) + + def _build_reactflow( + self, + all_names: list[str], + edges: list[tuple[str, str, str]], + ) -> ReactFlow: + """Construct the ReactFlow component.""" + positions = _compute_positions(all_names, edges, self.layout_spacing) + + nodes: list[dict[str, Any]] = [] + for name in all_names: + pos = positions[name] + if name in self._input_views: + # Auto-generated input widget node + param_name = name.split(":", 1)[1] + label = param_name.replace("_", " ").title() + node_dict = NodeSpec( + id=name, + position=pos, + label=label, + className="rf-auto-input", + ).to_dict() + node_dict["view"] = self._input_views[name] + else: + # Stage node — prefer view methods, fall back to outputs + instance = self._instances[name] + outputs = self._outputs.get(name, {}) + output_method_names = {info[1].__name__ for info in outputs.values()} + view_methods = _get_view_methods(instance, output_method_names) + + node_dict = NodeSpec( + id=name, + position=pos, + label=name, + className="rf-stage", + ).to_dict() + + if view_methods: + # Use non-output @param.depends view methods + if len(view_methods) == 1: + node_dict["view"] = pn.panel(view_methods[0]) + else: + node_dict["view"] = pn.Column(*(pn.panel(m) for m in view_methods)) + elif outputs: + # Fall back to rendering @param.output methods + output_names = [] + output_views = [] + for out_name, out_info in outputs.items(): + _ptype, method, index = out_info + output_names.append(out_name) + output_views.append(_make_output_view(instance, method, index)) + if len(output_views) == 1: + node_dict["view"] = output_views[0] + else: + node_dict["view"] = pn.Accordion( + *zip(output_names, output_views, strict=False), + active=list(range(len(output_views))), + ) + nodes.append(node_dict) + + edge_dicts: list[dict[str, Any]] = [] + seen_edges: set[tuple[str, str, str]] = set() + edge_counter = 0 + for src_name, tgt_name, param_name in edges: + key = (src_name, tgt_name, param_name) + if key in seen_edges: + continue + seen_edges.add(key) + edge_counter += 1 + edge_dicts.append( + { + "id": f"e{edge_counter}", + "source": src_name, + "target": tgt_name, + "label": param_name, + "markerEnd": {"type": "arrowclosed"}, + } + ) + + flow_kwargs: dict[str, Any] = { + "nodes": nodes, + "edges": edge_dicts, + "sizing_mode": "stretch_both", + "min_height": 500, + "editable": False, + } + flow_kwargs.update(self.kwargs) + + # Merge auto-input CSS with any user-provided stylesheets + user_sheets = flow_kwargs.pop("stylesheets", []) + flow_kwargs["stylesheets"] = [_PIPELINE_CSS] + list(user_sheets) + + return ReactFlow(**flow_kwargs) + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def __panel__(self): + return self._flow diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..0f6ca72 --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,497 @@ +"""Tests for the Pipeline class.""" + +import param + +from panel_reactflow import Pipeline, ReactFlow +from panel_reactflow.pipeline import ( + _BASE_PARAMS, + _compute_positions, + _get_input_params, + _get_outputs, + _get_view_methods, + _infer_edges, + _make_output_view, +) + +# --------------------------------------------------------------------------- +# Test stage classes +# --------------------------------------------------------------------------- + + +class Source(param.Parameterized): + text = param.String(default="hello") + + @param.output(param.String) + @param.depends("text") + def text_out(self): + return self.text.upper() + + +class Sink(param.Parameterized): + text_out = param.String() + + @param.depends("text_out") + def display(self): + return self.text_out or "empty" + + +class Transform(param.Parameterized): + text_out = param.String() + + @param.output(param.String) + @param.depends("text_out") + def result(self): + return (self.text_out or "")[::-1] + + +class Display(param.Parameterized): + result = param.String() + + @param.depends("result") + def display(self): + return self.result or "waiting" + + +class FanOutSource(param.Parameterized): + value = param.Integer(default=10) + + @param.output(param.Integer) + @param.depends("value") + def value_out(self): + return self.value * 2 + + +class BranchA(param.Parameterized): + value_out = param.Integer() + + @param.output(param.Integer) + @param.depends("value_out") + def a_result(self): + return (self.value_out or 0) + 1 + + +class BranchB(param.Parameterized): + value_out = param.Integer() + + @param.output(param.Integer) + @param.depends("value_out") + def b_result(self): + return (self.value_out or 0) * 10 + + +class Merger(param.Parameterized): + a_result = param.Integer() + b_result = param.Integer() + + @param.depends("a_result", "b_result") + def display(self): + return f"{self.a_result} / {self.b_result}" + + +# --------------------------------------------------------------------------- +# Helper tests +# --------------------------------------------------------------------------- + + +def test_get_outputs_discovers_param_output(): + src = Source() + outputs = _get_outputs(src) + assert "text_out" in outputs + # Each entry is (param_type_instance, bound_method, index) + assert outputs["text_out"][1].__name__ == "text_out" + + +def test_get_input_params_excludes_base(): + sink = Sink() + inputs = _get_input_params(sink) + assert "text_out" in inputs + # Base Parameterized params like 'name' should not appear + assert "name" not in inputs + + +def test_infer_edges_linear_chain(): + instances = {"Source": Source(name="Source"), "Sink": Sink(name="Sink")} + outputs_map = {name: _get_outputs(inst) for name, inst in instances.items()} + edges = _infer_edges(["Source", "Sink"], instances, outputs_map) + assert len(edges) == 1 + assert edges[0] == ("Source", "Sink", "text_out") + + +def test_infer_edges_three_stage(): + instances = { + "Source": Source(name="Source"), + "Transform": Transform(name="Transform"), + "Display": Display(name="Display"), + } + outputs_map = {name: _get_outputs(inst) for name, inst in instances.items()} + edges = _infer_edges(["Source", "Transform", "Display"], instances, outputs_map) + # Source.text_out -> Transform.text_out, Transform.result -> Display.result + assert ("Source", "Transform", "text_out") in edges + assert ("Transform", "Display", "result") in edges + assert len(edges) == 2 + + +def test_compute_positions_linear(): + names = ["A", "B", "C"] + edges = [("A", "B", "x"), ("B", "C", "x")] + positions = _compute_positions(names, edges, (350, 150)) + # Should be left to right + assert positions["A"]["x"] < positions["B"]["x"] < positions["C"]["x"] + # All on same y since linear + assert positions["A"]["y"] == positions["B"]["y"] == positions["C"]["y"] + + +def test_compute_positions_fan_out(): + names = ["Root", "Left", "Right"] + edges = [("Root", "Left", "x"), ("Root", "Right", "x")] + positions = _compute_positions(names, edges, (350, 150)) + # Root at depth 0, Left and Right at depth 1 + assert positions["Root"]["x"] == 0 + assert positions["Left"]["x"] == 350 + assert positions["Right"]["x"] == 350 + # Left and Right should be vertically offset + assert positions["Left"]["y"] != positions["Right"]["y"] + + +def test_get_view_methods(): + """_get_view_methods finds public @param.depends methods that are not outputs.""" + sink = Sink() + # Sink has no outputs, but has display() which is @param.depends + view_methods = _get_view_methods(sink, set()) + method_names = [m.__name__ for m in view_methods] + assert "display" in method_names + + +def test_get_view_methods_excludes_outputs(): + """_get_view_methods excludes methods whose names are in output_method_names.""" + src = Source() + # text_out is both @param.output and @param.depends — should be excluded + view_methods = _get_view_methods(src, {"text_out"}) + method_names = [m.__name__ for m in view_methods] + assert "text_out" not in method_names + + +def test_make_output_view_single(): + """_make_output_view creates a Panel viewable for a single-output method.""" + from panel.viewable import Viewable + + src = Source() + outputs = _get_outputs(src) + _ptype, method, index = outputs["text_out"] + view = _make_output_view(src, method, index) + assert isinstance(view, Viewable) + + +def test_make_output_view_multi(): + """_make_output_view creates a Panel viewable for a multi-output method.""" + from panel.viewable import Viewable + + class MultiOut(param.Parameterized): + x = param.String("hello") + + @param.output(upper=param.String(), length=param.Integer()) + @param.depends("x") + def outputs(self): + return self.x.upper(), len(self.x) + + inst = MultiOut() + outs = _get_outputs(inst) + for name in ("upper", "length"): + _ptype, method, index = outs[name] + view = _make_output_view(inst, method, index) + assert isinstance(view, Viewable) + + +# --------------------------------------------------------------------------- +# Pipeline integration tests +# --------------------------------------------------------------------------- + + +def test_pipeline_introspects_outputs(): + pipeline = Pipeline(stages=[("Source", Source), ("Sink", Sink)]) + assert "text_out" in pipeline._outputs["Source"] + + +def test_pipeline_infers_edges(): + pipeline = Pipeline(stages=[("Source", Source), ("Sink", Sink)]) + assert len(pipeline._edges) == 1 + assert pipeline._edges[0] == ("Source", "Sink", "text_out") + + +def test_pipeline_explicit_graph(): + pipeline = Pipeline( + stages=[("Source", Source), ("Sink", Sink)], + graph={"Source": "Sink"}, + ) + assert len(pipeline._edges) == 1 + assert pipeline._edges[0] == ("Source", "Sink", "text_out") + + +def test_pipeline_wires_reactivity(): + pipeline = Pipeline(stages=[("Source", Source), ("Sink", Sink)]) + src = pipeline._instances["Source"] + sink = pipeline._instances["Sink"] + # Initial wiring should have propagated + assert sink.text_out == src.text.upper() + # Change source and verify propagation + src.text = "world" + assert sink.text_out == "WORLD" + + +def test_pipeline_creates_reactflow(): + pipeline = Pipeline(stages=[("Source", Source), ("Sink", Sink)]) + flow = pipeline.__panel__() + assert isinstance(flow, ReactFlow) + # Source:text (auto-input) + Source + Sink = 3 nodes (no separate output nodes) + assert len(flow.nodes) == 3 + # Source:text->Source (auto) + Source->Sink = 2 edges + assert len(flow.edges) == 2 + node_ids = {n["id"] for n in flow.nodes} + assert node_ids == {"Source:text", "Source", "Sink"} + + +def test_pipeline_accepts_instances(): + src = Source(name="Source") + sink = Sink(name="Sink") + pipeline = Pipeline(stages=[("Source", src), ("Sink", sink)]) + assert pipeline._instances["Source"] is src + assert pipeline._instances["Sink"] is sink + assert isinstance(pipeline.__panel__(), ReactFlow) + + +def test_pipeline_fan_out(): + pipeline = Pipeline( + stages=[ + ("Root", FanOutSource), + ("A", BranchA), + ("B", BranchB), + ("Merge", Merger), + ], + graph={ + "Root": ("A", "B"), + "A": "Merge", + "B": "Merge", + }, + ) + flow = pipeline.__panel__() + # Root:value (auto) + Root + A + B + Merge = 5 nodes (no separate output nodes) + assert len(flow.nodes) == 5 + # Root:value->Root (auto) + Root->A + Root->B + A->Merge + B->Merge = 5 edges + assert len(flow.edges) == 5 + + # Check reactivity + root = pipeline._instances["Root"] + merge = pipeline._instances["Merge"] + + # Initial: root.value=10 -> value_out=20 -> A: 21, B: 200 + assert merge.a_result == 21 + assert merge.b_result == 200 + + # Update root + root.value = 5 + # value_out=10 -> A: 11, B: 100 + assert merge.a_result == 11 + assert merge.b_result == 100 + + +def test_pipeline_edges_have_labels(): + pipeline = Pipeline(stages=[("Source", Source), ("Sink", Sink)]) + flow = pipeline.__panel__() + labels = {e["label"] for e in flow.edges} + assert "text_out" in labels + + +def test_pipeline_edges_have_arrow_markers(): + pipeline = Pipeline(stages=[("Source", Source), ("Sink", Sink)]) + flow = pipeline.__panel__() + for edge in flow.edges: + assert edge["markerEnd"] == {"type": "arrowclosed"} + + +def test_pipeline_nodes_have_views(): + """All stage nodes now have views (outputs or view methods). + Auto-input nodes also have views.""" + pipeline = Pipeline(stages=[("Source", Source), ("Sink", Sink)]) + flow = pipeline.__panel__() + for node in flow.nodes: + nid = node["id"] + if ":" in nid: + # Auto-input nodes must have views + assert "view" in node, f"Auto-input node {nid} should have a view" + elif nid == "Sink": + # Sink has a display() view method + assert "view" in node, "Sink should have a view from display()" + elif nid == "Source": + # Source has only @param.output → falls back to output as view + assert "view" in node, "Source should have output rendered as view" + + +def test_pipeline_kwargs_forwarded(): + pipeline = Pipeline( + stages=[("Source", Source), ("Sink", Sink)], + kwargs={"show_minimap": True, "min_height": 800}, + ) + flow = pipeline.__panel__() + assert flow.show_minimap is True + assert flow.min_height == 800 + + +def test_pipeline_layout_spacing(): + pipeline = Pipeline( + stages=[("Source", Source), ("Sink", Sink)], + layout_spacing=(500, 200), + ) + flow = pipeline.__panel__() + positions = {n["id"]: n["position"] for n in flow.nodes} + # Direct stage-to-stage: Sink.x - Source.x == spacing + assert positions["Sink"]["x"] - positions["Source"]["x"] == 500 + + +# --------------------------------------------------------------------------- +# Auto-input tests +# --------------------------------------------------------------------------- + + +def test_pipeline_auto_input_creates_nodes(): + """Auto-input nodes are created for unconnected params.""" + pipeline = Pipeline(stages=[("Source", Source), ("Sink", Sink)]) + flow = pipeline.__panel__() + node_ids = {n["id"] for n in flow.nodes} + # Source has 'text' param with no incoming edge → auto-input node + assert "Source:text" in node_ids + # Sink.text_out IS connected from Source → no auto-input + assert "Sink:text_out" not in node_ids + + +def test_pipeline_auto_input_edge_count(): + """Verify total edge count with auto-inputs (no output node expansion).""" + pipeline = Pipeline( + stages=[ + ("Source", Source), + ("Transform", Transform), + ("Display", Display), + ], + ) + flow = pipeline.__panel__() + # stage-to-stage edges: Source->Transform(text_out), Transform->Display(result) = 2 + # auto-input: Source:text -> Source = 1 + # total = 3 + assert len(flow.edges) == 3 + # stage nodes: Source, Transform, Display = 3 + # auto-input: Source:text = 1 + # total = 4 + assert len(flow.nodes) == 4 + + +def test_pipeline_auto_input_wiring(): + """Changing an auto-input widget's underlying param propagates through the pipeline.""" + pipeline = Pipeline(stages=[("Source", Source), ("Sink", Sink)]) + src = pipeline._instances["Source"] + sink = pipeline._instances["Sink"] + + # The auto-input widget is bound to src.text via pn.panel(src.param.text), + # so changing src.text directly should propagate. + src.text = "test input" + assert sink.text_out == "TEST INPUT" + + +def test_pipeline_auto_inputs_false(): + """auto_inputs=False suppresses input node generation.""" + pipeline = Pipeline( + stages=[("Source", Source), ("Sink", Sink)], + auto_inputs=False, + ) + flow = pipeline.__panel__() + # Source + Sink = 2 nodes (no auto-inputs, no separate output nodes) + assert len(flow.nodes) == 2 + # Source->Sink = 1 edge + assert len(flow.edges) == 1 + node_ids = {n["id"] for n in flow.nodes} + assert "Source:text" not in node_ids + + +def test_find_unconnected_params(): + """Unit test for _find_unconnected_params helper.""" + pipeline = Pipeline( + stages=[("Source", Source), ("Sink", Sink)], + auto_inputs=False, # don't auto-build, just check helper + ) + unconnected = pipeline._find_unconnected_params(["Source", "Sink"]) + stage_param_pairs = [(s, p) for s, p in unconnected] + # Source.text has no incoming edge + assert ("Source", "text") in stage_param_pairs + # Sink.text_out IS connected from Source + assert ("Sink", "text_out") not in stage_param_pairs + + +def test_pipeline_auto_input_node_labels(): + """Auto-input nodes get human-readable labels.""" + pipeline = Pipeline(stages=[("Source", Source), ("Sink", Sink)]) + flow = pipeline.__panel__() + input_node = next(n for n in flow.nodes if n["id"] == "Source:text") + assert input_node["label"] == "Text" + + +def test_pipeline_auto_input_views_are_viewable(): + """Auto-input views should be Panel viewable objects.""" + from panel.viewable import Viewable + + pipeline = Pipeline(stages=[("Source", Source), ("Sink", Sink)]) + assert "Source:text" in pipeline._input_views + widget = pipeline._input_views["Source:text"] + assert isinstance(widget, Viewable) + + +def test_base_params_excludes_viewable(): + """_BASE_PARAMS should contain Viewable params like 'width', 'height'.""" + assert "width" in _BASE_PARAMS + assert "height" in _BASE_PARAMS + assert "sizing_mode" in _BASE_PARAMS + + +# --------------------------------------------------------------------------- +# Stage view tests +# --------------------------------------------------------------------------- + + +def test_pipeline_stage_view_methods(): + """Terminal stages with @param.depends view methods get a view in the stage node.""" + pipeline = Pipeline(stages=[("Source", Source), ("Sink", Sink)]) + flow = pipeline.__panel__() + sink_node = next(n for n in flow.nodes if n["id"] == "Sink") + assert "view" in sink_node + + +def test_pipeline_stage_output_as_view(): + """Stages with only @param.output methods (no view methods) render output as view.""" + pipeline = Pipeline(stages=[("Source", Source), ("Sink", Sink)]) + flow = pipeline.__panel__() + source_node = next(n for n in flow.nodes if n["id"] == "Source") + # Source has no non-output view methods, so output is rendered as view + assert "view" in source_node + + +def test_pipeline_stage_view_methods_preferred(): + """View methods are preferred over output methods for stage node views.""" + + class StageWithBoth(param.Parameterized): + x = param.String("hello") + + @param.output(param.String) + @param.depends("x") + def x_out(self): + return self.x.upper() + + @param.depends("x") + def custom_view(self): + return f"Custom: {self.x}" + + pipeline = Pipeline(stages=[("Stage", StageWithBoth)]) + flow = pipeline.__panel__() + stage_node = next(n for n in flow.nodes if n["id"] == "Stage") + assert "view" in stage_node + # The view should be from custom_view, not from the output + # We can verify by checking it's a Panel object wrapping the view method + from panel.viewable import Viewable + + assert isinstance(stage_node["view"], Viewable) diff --git a/tmp/data_explorer.py b/tmp/data_explorer.py new file mode 100644 index 0000000..8b058da --- /dev/null +++ b/tmp/data_explorer.py @@ -0,0 +1,82 @@ +"""Example 2: Data Explorer Pipeline — DataFrames flowing between nodes with an hvPlot scatter chart.""" + +import hvplot.pandas # noqa: F401 +import numpy as np +import pandas as pd +import panel as pn +import param + +from panel_reactflow import Pipeline + +pn.extension("jsoneditor") + +# Generate sample datasets (no external dependencies needed) +DATASETS = { + "iris": pd.DataFrame( + { + "sepal_length": np.random.normal(5.8, 0.8, 150), + "sepal_width": np.random.normal(3.0, 0.4, 150), + "petal_length": np.random.normal(3.7, 1.8, 150), + "petal_width": np.random.normal(1.2, 0.8, 150), + } + ), + "random": pd.DataFrame( + { + "x": np.random.randn(200), + "y": np.random.randn(200), + "size": np.random.uniform(1, 10, 200), + "value": np.random.uniform(0, 100, 200), + } + ), +} + + +class DataLoaderNode(param.Parameterized): + dataset = param.Selector(default="iris", objects=list(DATASETS.keys())) + + @param.output(param.DataFrame) + @param.depends("dataset") + def data(self): + return DATASETS[self.dataset] + + @param.depends("dataset") + def table(self): + return pn.pane.DataFrame(DATASETS[self.dataset], max_height=300) + + +class ChartNode(param.Parameterized): + data = param.DataFrame() + x_col = param.Selector(default="", objects=[""]) + y_col = param.Selector(default="", objects=[""]) + + def __init__(self, **params): + super().__init__(**params) + self._update_col_options() + + @param.depends("data", watch=True) + def _update_col_options(self): + if self.data is not None and len(self.data.columns): + cols = list(self.data.columns) + self.param.x_col.objects = cols + self.param.y_col.objects = cols + if self.x_col not in cols: + self.x_col = cols[0] + if self.y_col not in cols: + self.y_col = cols[1] if len(cols) > 1 else cols[0] + else: + self.param.x_col.objects = [""] + self.param.y_col.objects = [""] + self.x_col = "" + self.y_col = "" + + @param.output() + @param.depends("data", "x_col", "y_col") + def plot(self): + if self.data is None or not self.x_col or not self.y_col: + return pn.pane.Markdown("*Waiting for data...*") + return self.data.hvplot.scatter(x=self.x_col, y=self.y_col, height=500, width=500) + + +Pipeline( + stages=[("Data", DataLoaderNode), ("Chart", ChartNode)], +).servable() diff --git a/tmp/hello_world.py b/tmp/hello_world.py new file mode 100644 index 0000000..bbed633 --- /dev/null +++ b/tmp/hello_world.py @@ -0,0 +1,23 @@ +"""Example 1: Text Processing Pipeline — single node with auto-inputs.""" + +import panel as pn +import param + +from panel_reactflow import Pipeline + +pn.extension("jsoneditor") + + +class StrTransformNode(param.Parameterized): + text = param.String("Hello World") + mode = param.Selector(default="upper", objects=["upper", "lower", "title", "swapcase"]) + + @param.output(param.String) + @param.depends("text", "mode") + def result(self): + if not self.text: + return "" + return getattr(self.text, self.mode)() + + +Pipeline(stages=[("Transform", StrTransformNode)]).servable() diff --git a/tmp/multi_output.py b/tmp/multi_output.py new file mode 100644 index 0000000..629289c --- /dev/null +++ b/tmp/multi_output.py @@ -0,0 +1,51 @@ +"""Example: Multi-Output Pipeline — one method producing three outputs, each in its own node.""" + +import panel as pn +import param + +from panel_reactflow import Pipeline + +pn.extension("jsoneditor") + + +class Source(param.Parameterized): + text = param.String(default="Hello World") + + @param.output(upper=param.String(), lower=param.String(), length=param.Integer()) + @param.depends("text") + def split(self): + return self.text.upper(), self.text.lower(), len(self.text) + + +class UpperDisplay(param.Parameterized): + upper = param.String() + + @param.depends("upper") + def view(self): + return pn.pane.Markdown(f"**{self.upper}**") + + +class LowerDisplay(param.Parameterized): + lower = param.String() + + @param.depends("lower") + def view(self): + return pn.pane.Markdown(f"*{self.lower}*") + + +class LengthDisplay(param.Parameterized): + length = param.Integer() + + @param.depends("length") + def view(self): + return pn.pane.Alert(f"Length: {self.length}", alert_type="info") + + +Pipeline( + stages=[ + ("Source", Source), + ("Upper", UpperDisplay), + ("Lower", LowerDisplay), + ("Length", LengthDisplay), + ], +).servable() diff --git a/tmp/stock_dag.py b/tmp/stock_dag.py new file mode 100644 index 0000000..36417d0 --- /dev/null +++ b/tmp/stock_dag.py @@ -0,0 +1,96 @@ +"""Example 3: Stock Analysis DAG — fan-out and fan-in with a diamond topology.""" + +import hvplot.pandas # noqa: F401 +import numpy as np +import pandas as pd +import panel as pn +import param + +from panel_reactflow import Pipeline + +pn.extension("jsoneditor", "tabulator") + + +class StockData(param.Parameterized): + symbol = param.String(default="AAPL") + days = param.Integer(default=252, bounds=(30, 1000)) + + @param.output(param.DataFrame) + @param.depends("symbol", "days") + def prices(self): + np.random.seed(hash(self.symbol) % 2**32) + dates = pd.date_range(end=pd.Timestamp.now(), periods=self.days) + price = 100 + np.cumsum(np.random.randn(self.days) * 1.5) + return pd.DataFrame({"date": dates, "price": price}).set_index("date") + + +class MANode(param.Parameterized): + prices = param.DataFrame() + window = param.Integer(default=20, bounds=(5, 100)) + + @param.output(param.DataFrame) + @param.depends("prices", "window") + def ma_data(self): + if self.prices is None: + return None + df = self.prices.copy() + df["ma"] = df["price"].rolling(self.window).mean() + return df + + +class RSINode(param.Parameterized): + prices = param.DataFrame() + period = param.Integer(default=14, bounds=(2, 50)) + + @param.output(param.DataFrame) + @param.depends("prices", "period") + def rsi_data(self): + if self.prices is None: + return None + delta = self.prices["price"].diff() + gain = delta.clip(lower=0).rolling(self.period).mean() + loss = (-delta.clip(upper=0)).rolling(self.period).mean() + rs = gain / loss + df = self.prices.copy() + df["rsi"] = 100 - (100 / (1 + rs)) + return df + + +class ChartsNode(param.Parameterized): + ma_data = param.DataFrame() + rsi_data = param.DataFrame() + + @param.output("price_plot", "rsi_plot") + @param.depends("ma_data", "rsi_data") + def plot(self): + if self.ma_data is None or self.rsi_data is None: + return pn.pane.Markdown("*Waiting for data from both branches...*") + price_plot = self.ma_data.hvplot.line( + y=["price", "ma"], + ylabel="Price", + title="Price & Moving Average", + legend="top_left", + width=600, + height=400, + ) + rsi_plot = self.rsi_data.hvplot.line( + y="rsi", + ylabel="RSI", + title="RSI", + color="orange", + width=600, + height=400, + ) + return price_plot, rsi_plot + + +Pipeline( + stages=[ + ("Stock Data", StockData), + ("MA", MANode), + ("RSI", RSINode), + ("Charts", ChartsNode), + ], + graph={"Stock Data": ("MA", "RSI"), "MA": "Charts", "RSI": "Charts"}, + kwargs={"min_height": 600}, +).servable() diff --git a/tmp/text_pipeline.py b/tmp/text_pipeline.py new file mode 100644 index 0000000..877788b --- /dev/null +++ b/tmp/text_pipeline.py @@ -0,0 +1,40 @@ +"""Example 1: Text Processing Pipeline — two stages with auto-inputs.""" + +import panel as pn +import param + +from panel_reactflow import Pipeline + +pn.extension("jsoneditor") + + +class TransformNode(param.Parameterized): + text = param.String(default="hello world") + mode = param.Selector(default="upper", objects=["upper", "lower", "title", "swapcase"]) + + @param.output(param.String) + @param.depends("text", "mode") + def result(self): + if not self.text: + return "" + return getattr(self.text, self.mode)() + + @param.depends("text", "mode") + def preview(self): + if not self.text: + return pn.pane.Markdown("*No input yet*") + result = getattr(self.text, self.mode)() + return pn.pane.Markdown(f"**{result}**") + + +class DisplayNode(param.Parameterized): + result = param.String() + + @param.depends("result") + def view(self): + return pn.pane.Alert(self.result or "Waiting...", alert_type="success") + + +Pipeline( + stages=[("Transform", TransformNode), ("Display", DisplayNode)], +).servable() diff --git a/todo.md b/todo.md new file mode 100644 index 0000000..d8430b6 --- /dev/null +++ b/todo.md @@ -0,0 +1,521 @@ +# Panel Pipelines: Visual Data Flow with Parameterized Nodes + +*A next-generation replacement for `panel.pipeline.Pipeline`, built on ReactFlow.* + +## Why Replace Panel Pipeline? + +[Panel Pipeline](https://panel.holoviz.org/how_to/pipeline/simple_pipeline.html) +was a pioneering attempt at visual workflows, but its limitations are well-known: +linear-first design, no interactive canvas (no drag/zoom/pan), navigation-centric +rather than data-flow-centric, no embedded previews inside nodes, and +[long-standing unresolved bugs](https://github.com/holoviz/panel/issues?q=is%3Aissue+pipeline+is%3Aopen). + +Tools like [Daggr](https://github.com/gradio-app/daggr), +[ComfyUI](https://github.com/comfyanonymous/ComfyUI), +[LangFlow](https://github.com/langflow-ai/langflow), and +[n8n](https://github.com/n8n-io/n8n) have proven that visual node editors +are the natural interface for composable workflows. Panel deserves the same. + +--- + +## Core Concept: A Parameterized Class IS the Node + +A pipeline stage is any `param.Parameterized` subclass. No special base class needed. + +| Parameterized Concept | Pipeline Role | +|---|---| +| `param.Parameter` declarations | **Inputs** (data the node consumes) | +| `@param.output` decorated methods | **Outputs** (data the node produces) | +| `@param.depends` view methods | **Display** (live preview inside the node) | +| Parameter types & constraints | Auto-wiring and auto-input widgets | + +```python +import param + +class TransformNode(param.Parameterized): + # Inputs + text = param.String(default="hello world") + mode = param.Selector(default="upper", objects=["upper", "lower", "title", "swapcase"]) + + # Output: Should we continue to use param.output. Or just param.depends methods? + @param.output(param.String) + @param.depends("text", "mode") + def result(self): + if not self.text: + return "" + return getattr(self.text, self.mode)() + + # Display (view method — rendered inside the node) + # Should param.depends methods be recognized as outputs? + @param.depends("text", "mode") + def preview(self): + result = getattr(self.text, self.mode)() + return pn.pane.Markdown(f"**{result}**") +``` + +The `Pipeline` class introspects this and automatically: + +1. Creates auto-input widget nodes for `text` and `mode` (unconnected params) +2. Wires the `result` output to any downstream stage with a matching `result` parameter +3. Renders the `preview()` view method inside the node body +4. Sets up reactive updates so changes propagate through the graph + +--- + +## Pipeline API + +```python +from panel_reactflow import Pipeline + +Pipeline( + stages=[("Name", ClassOrInstance), ...], + graph=None, # None = auto-infer edges; dict = explicit topology + layout_spacing=(350, 150), # (horizontal, vertical) pixels between nodes + auto_inputs=True, # auto-generate widget nodes for unconnected params + kwargs={}, # extra kwargs forwarded to ReactFlow +).servable() +``` + +Question: Should this be a `Pipeline` class or just a `def create_pipeline(...)->ReactFlow` function? + +### Parameters + +- **`stages`** — List of `(name, class_or_instance)` tuples. Classes are instantiated automatically. +- **`graph`** — Explicit topology: `{source_name: target_name | (t1, t2, ...)}`. When `None`, edges are inferred by matching `@param.output` names to downstream parameter names. +- **`layout_spacing`** — `(horizontal, vertical)` spacing in pixels. +- **`auto_inputs`** — When `True`, unconnected parameters get auto-generated widget nodes with an "INPUT" pill badge. +- **`kwargs`** — Extra keyword arguments forwarded to `ReactFlow` (e.g., `min_height`, `show_minimap`). + +### View resolution + +Pipeline resolves what to display inside each stage node: + +1. **View methods** (preferred) — Public `@param.depends` methods that are *not* `@param.output` methods. If a stage has `preview()` or `view()` decorated with `@param.depends`, it's rendered in the node. +2. **Output fallback** — If no view methods exist, `@param.output` methods are rendered. A single output is shown directly; multiple outputs are displayed in a `pn.Accordion` with named sections (all expanded). + +### Node styling + +Pipeline nodes are visually distinguished: + +- **Auto-input nodes** — Indigo border + gradient + "INPUT" pill badge (CSS class `rf-auto-input`) +- **Stage nodes** — Emerald border + gradient + "OUTPUT" pill badge (CSS class `rf-stage`) + +Question: What would the right styling here be? + +--- + +## Examples + +### 1. Hello World — Single Stage + +*`tmp/hello_world.py` — The simplest pipeline: one stage, auto-input widgets.* + +```python +import panel as pn +import param + +from panel_reactflow import Pipeline + +pn.extension("jsoneditor") + + +class StrTransformNode(param.Parameterized): + text = param.String("Hello World") + mode = param.Selector(default="upper", objects=["upper", "lower", "title", "swapcase"]) + + @param.output(param.String) + @param.depends("text", "mode") + def result(self): + if not self.text: + return "" + return getattr(self.text, self.mode)() + + +Pipeline(stages=[("Transform", StrTransformNode)]).servable() +``` + +`text` and `mode` have no upstream connection, so Pipeline auto-generates input widget nodes for them. The `result()` output is rendered as the node's fallback view. + +### 2. Text Pipeline — Two Stages with View Methods + +*`tmp/text_pipeline.py` — Auto-inferred edges, view methods in both stages.* + +```python +import panel as pn +import param + +from panel_reactflow import Pipeline + +pn.extension("jsoneditor") + + +class TransformNode(param.Parameterized): + text = param.String(default="hello world") + mode = param.Selector(default="upper", objects=["upper", "lower", "title", "swapcase"]) + + @param.output(param.String) + @param.depends("text", "mode") + def result(self): + if not self.text: + return "" + return getattr(self.text, self.mode)() + + @param.depends("text", "mode") + def preview(self): + if not self.text: + return pn.pane.Markdown("*No input yet*") + result = getattr(self.text, self.mode)() + return pn.pane.Markdown(f"**{result}**") + + +class DisplayNode(param.Parameterized): + result = param.String() + + @param.depends("result") + def view(self): + return pn.pane.Alert(self.result or "Waiting...", alert_type="success") + + +Pipeline( + stages=[("Transform", TransformNode), ("Display", DisplayNode)], +).servable() +``` + +Pipeline auto-infers the edge `Transform.result -> Display.result` by name matching. Transform shows `preview()`, Display shows `view()`. Auto-input widgets are created for `text` and `mode`. + +### 3. Data Explorer — DataFrames with Dynamic Selectors + +*`tmp/data_explorer.py` — DataFrames flowing between nodes, dynamic column selectors, hvPlot chart.* + +```python +import hvplot.pandas # noqa: F401 +import numpy as np +import pandas as pd +import panel as pn +import param + +from panel_reactflow import Pipeline + +pn.extension("jsoneditor") + +DATASETS = { + "iris": pd.DataFrame({ + "sepal_length": np.random.normal(5.8, 0.8, 150), + "sepal_width": np.random.normal(3.0, 0.4, 150), + "petal_length": np.random.normal(3.7, 1.8, 150), + "petal_width": np.random.normal(1.2, 0.8, 150), + }), + "random": pd.DataFrame({ + "x": np.random.randn(200), + "y": np.random.randn(200), + "size": np.random.uniform(1, 10, 200), + "value": np.random.uniform(0, 100, 200), + }), +} + + +class DataLoaderNode(param.Parameterized): + dataset = param.Selector(default="iris", objects=list(DATASETS.keys())) + + @param.output(param.DataFrame) + @param.depends("dataset") + def data(self): + return DATASETS[self.dataset] + + @param.depends("dataset") + def table(self): + return pn.pane.DataFrame(DATASETS[self.dataset], max_height=300) + + +class ChartNode(param.Parameterized): + data = param.DataFrame() + x_col = param.Selector(default="", objects=[""]) + y_col = param.Selector(default="", objects=[""]) + + def __init__(self, **params): + super().__init__(**params) + self._update_col_options() + + @param.depends("data", watch=True) + def _update_col_options(self): + if self.data is not None and len(self.data.columns): + cols = list(self.data.columns) + self.param.x_col.objects = cols + self.param.y_col.objects = cols + if self.x_col not in cols: + self.x_col = cols[0] + if self.y_col not in cols: + self.y_col = cols[1] if len(cols) > 1 else cols[0] + else: + self.param.x_col.objects = [""] + self.param.y_col.objects = [""] + self.x_col = "" + self.y_col = "" + + @param.output() + @param.depends("data", "x_col", "y_col") + def plot(self): + if self.data is None or not self.x_col or not self.y_col: + return pn.pane.Markdown("*Waiting for data...*") + return self.data.hvplot.scatter(x=self.x_col, y=self.y_col, height=500, width=500) + + +Pipeline( + stages=[("Data", DataLoaderNode), ("Chart", ChartNode)], +).servable() +``` + +`DataLoaderNode.data` auto-connects to `ChartNode.data`. ChartNode dynamically updates its `x_col`/`y_col` selector options when data arrives. `table()` is DataLoader's view method; `plot()` is Chart's output fallback. + +### 4. Stock Analysis DAG — Diamond Topology + +*`tmp/stock_dag.py` — Fan-out and fan-in with explicit graph.* + +```python +import hvplot.pandas # noqa: F401 +import numpy as np +import pandas as pd +import panel as pn +import param + +from panel_reactflow import Pipeline + +pn.extension("jsoneditor") + + +class StockData(param.Parameterized): + symbol = param.String(default="AAPL") + days = param.Integer(default=252, bounds=(30, 1000)) + + @param.output(param.DataFrame) + @param.depends("symbol", "days") + def prices(self): + np.random.seed(hash(self.symbol) % 2**32) + dates = pd.date_range(end=pd.Timestamp.now(), periods=self.days) + price = 100 + np.cumsum(np.random.randn(self.days) * 1.5) + return pd.DataFrame({"date": dates, "price": price}).set_index("date") + + +class MANode(param.Parameterized): + prices = param.DataFrame() + window = param.Integer(default=20, bounds=(5, 100)) + + @param.output(param.DataFrame) + @param.depends("prices", "window") + def ma_data(self): + if self.prices is None: + return None + df = self.prices.copy() + df["ma"] = df["price"].rolling(self.window).mean() + return df + + +class RSINode(param.Parameterized): + prices = param.DataFrame() + period = param.Integer(default=14, bounds=(2, 50)) + + @param.output(param.DataFrame) + @param.depends("prices", "period") + def rsi_data(self): + if self.prices is None: + return None + delta = self.prices["price"].diff() + gain = delta.clip(lower=0).rolling(self.period).mean() + loss = (-delta.clip(upper=0)).rolling(self.period).mean() + rs = gain / loss + df = self.prices.copy() + df["rsi"] = 100 - (100 / (1 + rs)) + return df + + +class ChartNode(param.Parameterized): + ma_data = param.DataFrame() + rsi_data = param.DataFrame() + + @param.output() + @param.depends("ma_data", "rsi_data") + def plot(self): + if self.ma_data is None or self.rsi_data is None: + return pn.pane.Markdown("*Waiting for data from both branches...*") + price_plot = self.ma_data.hvplot.line( + y=["price", "ma"], ylabel="Price", title="Price & Moving Average", + legend="top_left", height=200, + ) + rsi_plot = self.rsi_data.hvplot.line( + y="rsi", ylabel="RSI", title="RSI", color="orange", height=150, + ) + return pn.Column(price_plot, rsi_plot, sizing_mode="stretch_width") + + +Pipeline( + stages=[ + ("Stock Data", StockData), + ("MA", MANode), + ("RSI", RSINode), + ("Chart", ChartNode), + ], + graph={"Stock Data": ("MA", "RSI"), "MA": "Chart", "RSI": "Chart"}, + kwargs={"min_height": 600}, +).servable() +``` + +The explicit `graph` defines the diamond: Stock Data fans out to MA and RSI, which fan in to Chart. Auto-inputs are created for `symbol`, `days`, `window`, and `period`. + +### 5. Multi-Output — One Method, Three Outputs + +*`tmp/multi_output.py` — A single `@param.output` method producing three named outputs, each flowing to its own downstream node.* + +```python +import panel as pn +import param + +from panel_reactflow import Pipeline + +pn.extension("jsoneditor") + + +class Source(param.Parameterized): + text = param.String(default="Hello World") + + @param.output(upper=param.String(), lower=param.String(), length=param.Integer()) + @param.depends("text") + def split(self): + return self.text.upper(), self.text.lower(), len(self.text) + + +class UpperDisplay(param.Parameterized): + upper = param.String() + + @param.depends("upper") + def view(self): + return pn.pane.Markdown(f"**{self.upper}**") + + +class LowerDisplay(param.Parameterized): + lower = param.String() + + @param.depends("lower") + def view(self): + return pn.pane.Markdown(f"*{self.lower}*") + + +class LengthDisplay(param.Parameterized): + length = param.Integer() + + @param.depends("length") + def view(self): + return pn.pane.Alert(f"Length: {self.length}", alert_type="info") + + +Pipeline( + stages=[ + ("Source", Source), + ("Upper", UpperDisplay), + ("Lower", LowerDisplay), + ("Length", LengthDisplay), + ], +).servable() +``` + +`Source.split()` returns a tuple of 3 values. Pipeline auto-infers edges: `upper -> Upper.upper`, `lower -> Lower.lower`, `length -> Length.length`. The Source node displays all 3 outputs in an Accordion (expanded). Each downstream node shows its view method. + +--- + +## Implemented Features + +- [x] `Pipeline` class (`src/panel_reactflow/pipeline.py`) +- [x] Auto-inferred edges by matching `@param.output` names to downstream parameter names +- [x] Explicit `graph` dict for non-linear topologies (fan-out, fan-in, diamond) +- [x] Auto-input widget nodes for unconnected parameters +- [x] Reactive wiring via `param.watch` (upstream output changes propagate downstream) +- [x] View method resolution (public `@param.depends` methods rendered in nodes) +- [x] Output fallback rendering (single output direct, multi-output in Accordion) +- [x] Multi-output support (tuple-indexed extraction from `@param.output(a=..., b=..., c=...)`) +- [x] Topological layout algorithm (BFS depth assignment, vertical stacking for fan-out) +- [x] Configurable layout spacing +- [x] Visual pill badges: "INPUT" (indigo) on auto-input nodes, "OUTPUT" (emerald) on stage nodes +- [x] Forward `kwargs` to ReactFlow (min_height, show_minimap, etc.) +- [x] Accepts both classes and instances as stages +- [x] Unit tests (`tests/test_pipeline.py` — 34 tests) + +--- + +## Open Issues + +- https://github.com/panel-extensions/panel-reactflow/issues/27 +- https://github.com/panel-extensions/panel-reactflow/issues/26 +- https://github.com/panel-extensions/panel-reactflow/issues/25 +- https://github.com/panel-extensions/panel-reactflow/issues/24 +- https://github.com/panel-extensions/panel-reactflow/issues/23 +- https://github.com/panel-extensions/panel-reactflow/issues/22 +- https://github.com/panel-extensions/panel-reactflow/issues/21 +- https://github.com/panel-extensions/panel-reactflow/issues/20 +- https://github.com/panel-extensions/panel-reactflow/issues/19 +- https://github.com/panel-extensions/panel-reactflow/issues/18 +- https://github.com/panel-extensions/panel-reactflow/issues/17 +- https://github.com/panel-extensions/panel-reactflow/issues/16 +- https://github.com/panel-extensions/panel-reactflow/issues/15 +- https://github.com/panel-extensions/panel-reactflow/issues/14 +- https://github.com/panel-extensions/panel-reactflow/issues/13 +- https://github.com/panel-extensions/panel-reactflow/issues/12 + +## Requirements + +### Customization + +- **Custom input widgets** — Developers must be able to customize auto-input widgets in the same way `pn.Param` allows (e.g., specifying widget types, formatting, bounds overrides per parameter). +- **Custom output views** — Developers must be able to customize how individual outputs are rendered in stage nodes, by providing panes, custom functions, or `Viewer` subclasses (analogous to `pn.Param`'s `widgets` dict). +- **Customizable node styling** — The default input/output pill badges and colors should look polished out of the box, but developers must be able to override them (custom CSS classes, colors, or disable badges entirely). + +### Execution control + +- **Output caching** — Expensive output computations should be cacheable. The recommended pattern is `@pn.cache` on the output method; this should be documented with examples. +- **Manual vs. automatic execution** — Developers must be able to choose between automatic reactive updates (current default: outputs recompute on any input change) and manual trigger mode (outputs recompute only on button click). +- **Startup computation** — Developers must be able to control whether outputs are computed on initialization (similar to `on_init=True` in `param.depends`). +- **Background execution** — Long-running output computations should run in the background (e.g., via `pn.state.execute` or threading) to keep the UI responsive. When possible, independent branches should compute in parallel. + +### Visual feedback + +- **Stale/invalidated state** — When an input changes but the downstream output has not yet recomputed, the affected nodes should be visually marked as stale (e.g., dimmed border, "stale" badge). +- **Computing indicator** — While an output is being recomputed, the node should show a spinner or loading overlay so the user knows work is in progress. + +### Documentation and examples + +- **Convert legacy Panel Pipeline examples** — All examples from the [Panel Pipeline How-To guides](https://panel.holoviz.org/how_to/pipeline/index.html) should be ported to this API and tested. +- **Convert Gradio Daggr examples** — All examples from [Daggr](https://github.com/gradio-app/daggr) should be ported to this API and tested. +- **ML and GenAI examples** — Create showcase examples for machine learning workflows (training pipelines, inference chains) and generative AI (LLM chains, RAG pipelines). +- **Function-to-node guide** — Document how to wrap an existing function as a Pipeline stage with minimal boilerplate (input params from function signature, output from return value). +- **Testing guide** — Document how to unit-test individual stages in isolation and how to integration-test a full pipeline. + +### API design + +- **Helper node classes** — Evaluate whether to provide ready-made base classes like `FnNode` (wraps a plain function), `PanelNode` (wraps a Panel viewable), and `InferenceNode` (wraps an ML model), similar to [Daggr's node types](https://github.com/gradio-app/daggr). +- **LLM-friendly error messages** — All Pipeline errors (missing outputs, unresolved edges, type mismatches) should produce clear, actionable messages that an LLM coding assistant can interpret and fix without ambiguity. +- **Resolve open issues above** — Content-aware layout, Viewer resolution, node overflow, and type-based wiring should all be addressed. + +--- + +## References + +### Panel Pipeline + +- [Simple Pipeline How-To](https://panel.holoviz.org/how_to/pipeline/simple_pipeline.html) +- [Complex (Non-Linear) Pipeline How-To](https://panel.holoviz.org/how_to/pipeline/complex_pipeline.html) + +### param.output + +- [Outputs User Guide](https://param.holoviz.org/user_guide/Outputs.html) + +### Prior Art + +- [Daggr](https://github.com/gradio-app/daggr) — Visual DAG builder for Gradio apps +- [ComfyUI](https://github.com/comfyanonymous/ComfyUI) — Node-based Stable Diffusion workflow editor +- [LangFlow](https://github.com/langflow-ai/langflow) — Visual LLM agent workflow builder +- [Flowise](https://github.com/FlowiseAI/Flowise) — Drag-and-drop LLM flow builder +- [n8n](https://github.com/n8n-io/n8n) — Workflow automation platform + +### panel-reactflow + +- [Repository](https://github.com/panel-extensions/panel-reactflow) +- [Documentation](https://panel-extensions.github.io/panel-reactflow/) +- [React Flow (xyflow)](https://xyflow.com/) diff --git a/zensical.toml b/zensical.toml index d27dfd5..63e143a 100644 --- a/zensical.toml +++ b/zensical.toml @@ -21,7 +21,8 @@ nav = [ {"React to Events" = "how-to/react-to-events.md"} ]}, {"Examples" = "examples.md"}, - {"Reference" = "reference/panel_reactflow.md"} + {"Reference" = "reference/panel_reactflow.md"}, + {"Releases" = "releases.md"} ] # Theme configuration