Python: (core): Add functional workflow API#4238
Python: (core): Add functional workflow API#4238moonbox3 wants to merge 11 commits intomicrosoft:mainfrom
Conversation
Python Test Coverage Report •
Python Unit Test Overview
|
|||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Pull request overview
This PR introduces a functional workflow API as an alternative to the existing graph-based workflow API. The functional approach allows users to write workflows as plain async functions decorated with @workflow, using native Python control flow (if/else, loops, asyncio.gather) instead of explicit graph construction with executors and edges. The @step decorator is optional and provides per-step checkpointing, caching, and observability.
Changes:
- Added core implementation (
_functional.py) with@workflow,@stepdecorators,RunContext,FunctionalWorkflow, andFunctionalWorkflowAgentclasses - Added comprehensive test suite (40+ test cases covering basic execution, HITL, checkpointing, streaming, error handling, edge cases)
- Added 6 sample files demonstrating functional workflows (basic pipeline, streaming, parallel execution, checkpointing, HITL, agent integration)
- Restructured getting-started samples to introduce functional workflows before graph workflows
- Updated exports in
__init__.pyto expose new functional API symbols
Reviewed changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
python/packages/core/agent_framework/_workflows/_functional.py |
Core implementation of functional workflow API with RunContext, StepWrapper, FunctionalWorkflow, and FunctionalWorkflowAgent classes (1105 lines) |
python/packages/core/agent_framework/__init__.py |
Added exports for FunctionalWorkflow, FunctionalWorkflowAgent, RunContext, StepWrapper, step, and workflow |
python/packages/core/tests/workflow/test_functional_workflow.py |
Comprehensive test suite covering basic execution, events, parallelism, HITL, errors, streaming, state, checkpointing, control flow, and edge cases (1031 lines) |
python/samples/01-get-started/05_first_functional_workflow.py |
Getting started sample demonstrating basic functional workflow with plain async functions |
python/samples/01-get-started/06_first_graph_workflow.py |
Renamed and updated graph workflow sample (previously 05_first_workflow.py) |
python/samples/01-get-started/07_host_your_agent.py |
Renamed agent hosting sample (previously 06_host_your_agent.py) |
python/samples/01-get-started/README.md |
Updated sample listing to include both functional and graph workflow samples |
python/samples/03-workflows/functional/basic_pipeline.py |
Sample showing simplest sequential pipeline with @workflow decorator |
python/samples/03-workflows/functional/basic_streaming_pipeline.py |
Sample demonstrating streaming workflow events with run(stream=True) |
python/samples/03-workflows/functional/parallel_pipeline.py |
Sample showing fan-out/fan-in with asyncio.gather |
python/samples/03-workflows/functional/steps_and_checkpointing.py |
Sample explaining @step decorator for per-step checkpointing and observability |
python/samples/03-workflows/functional/hitl_review.py |
Sample demonstrating HITL with ctx.request_info() and resume |
python/samples/03-workflows/functional/agent_integration.py |
Sample showing agent calls inside workflows and .as_agent() wrapper |
python/samples/03-workflows/README.md |
Added functional workflow section to samples overview |
| @workflow | ||
| async def hitl_pipeline(data: str, ctx: RunContext) -> str: | ||
| feedback = await ctx.request_info({"draft": data}, response_type=str) | ||
| return feedback |
There was a problem hiding this comment.
My brain maps @workflow to the graph-based Workflow and @step to Executor. I can see the benefit of allowing request_info at the workflow level. It's kind of like an executor whose sole purpose is to get user feedback. But should we also allow request_info inside a @step?
There was a problem hiding this comment.
By design, request_info lives at the workflow level — the workflow is the orchestrator that decides when to pause for input. Steps are meant to be self-contained units of work. If a step needs human input, the workflow calls request_info() first and passes the result to the step. This keeps steps simple and testable in isolation (no framework dependency).
There was a problem hiding this comment.
How will a step let the workflow know that it needs human input?
|
Btw, @eavanvalkenburg and @TaoChenOSU I think it would be best to stick this functional API in to its own package. We want to get some more signal around the APIs and use of it before we deem it "GA worthy," IMO. |
python/samples/01-get-started/06_functional_workflow_with_agents.py
Outdated
Show resolved
Hide resolved
python/samples/01-get-started/06_functional_workflow_with_agents.py
Outdated
Show resolved
Hide resolved
python/samples/01-get-started/06_functional_workflow_with_agents.py
Outdated
Show resolved
Hide resolved
- Swap 05/06 get-started samples: agent workflow first (motivates why workflows exist), simple text workflow second - Rename text_pipeline → text_workflow, poem_pipeline → poem_workflow - Add @step to agent workflow sample (05) to demonstrate caching - Switch agent samples to AzureOpenAIResponsesClient with Foundry - Remove .as_agent() from agent_integration.py to focus on the key difference between inline agent calls vs @step-cached calls - Add commented-out Agent.run example in hitl_review.py - Add clarifying comment in _functional.py that event streaming is buffered (not true per-token streaming) - Add naive_group_chat.py functional sample: round-robin group chat as a plain Python loop - Update READMEs to reflect new file names and group chat sample Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
We can use the experimental flag now :) |
Motivation and Context
The functional API is a stepping stone between single-agent use and the full graph API. Users write workflows as plain async functions -- no executor classes, no edges, no builder patterns.
HITL resume or crash recovery
A very basic example of the functional workflow API:
Note:
@stepis opt-in for functions where per-step checkpointing matters (for example, agent calls). Without@step, workflows still support HITL and checkpointing — functions just re-execute on resume.ctx: RunContextis only needed when you use HITL (request_info), custom events (add_event), or state (get_state/set_state). Otherwise, omit it for a cleaner signature.Description
Contribution Checklist