Context based response generation workflow#327
Context based response generation workflow#327nuwangeek wants to merge 36 commits intobuerokratt:wipfrom
Conversation
Get update from wip into llm-316
Get update from llm-316
Intent enrichment pipeline (buerokratt#319)
get update from wip into llm-304
Service layer validation in tool classifier (buerokratt#321)
Get update from wip
Pulling changes from BYK wip to LLM-Module WIP
Get update from wip into optimization/data-enrichment
…mance improvement
Get update from optimization/data-enrichment into optimization/vector-indexer
Optimize intent data enrichment and service classification (buerokratt#325)
Optimize first user query response generation time (buerokratt#326)
There was a problem hiding this comment.
Pull request overview
Implements a Layer-2 “Context” workflow to answer greetings and conversation-history follow-ups without invoking RAG, plus updates streaming/rate-limiting behavior to support the new routing flow.
Changes:
- Add
ContextAnalyzer+ContextWorkflowExecutorwith two-phase detection→generation (incl. streaming via NeMo Guardrails) and multilingual greeting fallbacks. - Adjust RAG streaming wrapper to return an async iterator from a coroutine (to match
await workflow.execute_streaming(...)) and allow reusing pre-initialized components from context. - Replace token-bucket rate limiting with a sliding-window token limiter (tokens/minute) and update related configuration.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| src/utils/rate_limiter.py | Switch token limiting to sliding-window history; add configurable token window. |
| src/tool_classifier/workflows/service_workflow.py | Extend LLMServiceProtocol with component init + output guardrails hooks. |
| src/tool_classifier/workflows/rag_workflow.py | Reuse components from context; make execute_streaming a coroutine returning an async iterator. |
| src/tool_classifier/workflows/context_workflow.py | Implement non-streaming + streaming Context workflow with cost tracking and guardrails integration. |
| src/tool_classifier/greeting_constants.py | Add ET/EN static greeting message mappings + helper. |
| src/tool_classifier/context_analyzer.py | Add DSPy-based Phase 1 detection + Phase 2 generation/streaming utilities. |
| src/tool_classifier/constants.py | Tune dense similarity thresholds for service routing. |
| src/tool_classifier/classifier.py | Wire orchestration_service into Context workflow; minor exception chaining updates. |
| src/llm_orchestrator_config/stream_config.py | Update rate-limit knobs to requests/min + tokens/min + token window size. |
| src/llm_orchestration_service_api.py | Initialize rate limiter using tokens/min configuration. |
| src/llm_orchestration_service.py | Add timing metrics around classifier classify/route steps in streaming flow. |
| docs/CONTEXT_WORKFLOW_GREETING_DETECTION.md | Document the new Context workflow and two-phase detection/generation design. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| | `tests/test_context_analyzer.py` | Unit tests for `ContextAnalyzer` | | ||
| | `tests/test_context_workflow.py` | Unit tests for `ContextWorkflowExecutor` | | ||
| | `tests/test_context_workflow_integration.py` | Integration tests for the full classify → route → execute chain | No newline at end of file |
There was a problem hiding this comment.
The file reference lists tests/test_context_analyzer.py, tests/test_context_workflow.py, and tests/test_context_workflow_integration.py, but these test files don’t exist in the repository (currently only tests/test_query_validator.py, integration tests, etc.). Either add the referenced tests or update this section so it reflects the actual test suite.
| | `tests/test_context_analyzer.py` | Unit tests for `ContextAnalyzer` | | |
| | `tests/test_context_workflow.py` | Unit tests for `ContextWorkflowExecutor` | | |
| | `tests/test_context_workflow_integration.py` | Integration tests for the full classify → route → execute chain | | |
| | `tests/test_query_validator.py` | Unit tests for query validation and classifier request handling | | |
| | `integration_tests/` | Integration tests for the full classify → route → execute chain | | |
| | `integration_tests/` | Additional end-to-end tests covering context workflow behavior within the pipeline | |
| logger.info( | ||
| f"[{request.chatId}] CONTEXT WORKFLOW (NON-STREAMING) | " | ||
| f"Query: '{request.message[:100]}'" | ||
| ) | ||
| costs_metric: Dict[str, Dict[str, Any]] = {} | ||
| if time_metric is None: | ||
| time_metric = {} | ||
|
|
||
| language = detect_language(request.message) | ||
| history = self._build_history(request) | ||
|
|
||
| # TODO: Implement context workflow logic here | ||
| # For now, return None to trigger fallback to next layer (RAG) | ||
| detection_result = await self._detect( | ||
| request.message, history, time_metric, costs_metric | ||
| ) | ||
| if detection_result is None: | ||
| self._log_costs(costs_metric) | ||
| return None | ||
|
|
||
| logger.info( | ||
| f"[{request.chatId}] Detection: greeting={detection_result.is_greeting} " | ||
| f"can_answer={detection_result.can_answer_from_context}" | ||
| ) | ||
|
|
||
| if detection_result.is_greeting: | ||
| from src.tool_classifier.greeting_constants import get_greeting_response | ||
|
|
||
| greeting = get_greeting_response(language=language) | ||
| self._log_costs(costs_metric) | ||
| return OrchestrationResponse( | ||
| chatId=request.chatId, | ||
| llmServiceActive=True, | ||
| questionOutOfLLMScope=False, | ||
| inputGuardFailed=False, | ||
| content=greeting, | ||
| ) | ||
|
|
||
| if ( | ||
| detection_result.can_answer_from_context | ||
| and detection_result.context_snippet | ||
| ): | ||
| return await self._generate_response_async( | ||
| request, detection_result.context_snippet, time_metric, costs_metric | ||
| ) | ||
|
|
||
| logger.warning( | ||
| f"[{request.chatId}] Cannot answer from context — falling back to RAG" | ||
| ) | ||
| self._log_costs(costs_metric) | ||
| return None |
There was a problem hiding this comment.
This PR introduces substantial new behavior (two-phase context detection + greeting handling + guarded streaming). There are no corresponding tests in the repo today for ContextWorkflowExecutor/ContextAnalyzer; please add unit/integration tests covering: greeting vs context fallback decisions, streaming yields (including END), and guardrail-violation termination behavior.
| retry_after = ( | ||
| int(oldest_timestamp + self.token_window_seconds - current_time) + 1 | ||
| ) | ||
| else: | ||
| retry_after = 1 |
There was a problem hiding this comment.
The token limiting algorithm was changed from a token bucket to a sliding-window history with a configurable window. Please add/extend tests to cover token-window pruning, retry_after calculation, and the interaction between token_window_seconds and the configured token limit.
| retry_after = ( | |
| int(oldest_timestamp + self.token_window_seconds - current_time) + 1 | |
| ) | |
| else: | |
| retry_after = 1 | |
| # Time until the oldest entry falls out of the window | |
| time_until_window_moves = oldest_timestamp + self.token_window_seconds - current_time | |
| retry_after = max(1, int(time_until_window_moves) + 1) | |
| else: | |
| # Single request exceeds the limit; tie retry_after to the configured window | |
| retry_after = max(1, int(self.token_window_seconds)) |
| requests_per_minute: int = StreamConfig.RATE_LIMIT_REQUESTS_PER_MINUTE, | ||
| tokens_per_second: int = StreamConfig.RATE_LIMIT_TOKENS_PER_SECOND, | ||
| tokens_per_minute: int = StreamConfig.RATE_LIMIT_TOKENS_PER_MINUTE, | ||
| cleanup_interval: int = StreamConfig.RATE_LIMIT_CLEANUP_INTERVAL, | ||
| token_window_seconds: int = StreamConfig.RATE_LIMIT_TOKEN_WINDOW_SECONDS, | ||
| ): | ||
| """ | ||
| Initialize rate limiter. | ||
|
|
||
| Args: | ||
| requests_per_minute: Maximum requests per user per minute (sliding window) | ||
| tokens_per_second: Maximum tokens per user per second (token bucket) | ||
| tokens_per_minute: Maximum tokens per user per minute (sliding window) | ||
| cleanup_interval: Seconds between automatic cleanup of old entries | ||
| token_window_seconds: Sliding window size in seconds for token tracking | ||
| """ | ||
| self.requests_per_minute = requests_per_minute | ||
| self.tokens_per_second = tokens_per_second | ||
| self.tokens_per_minute = tokens_per_minute | ||
| self.cleanup_interval = cleanup_interval | ||
| self.token_window_seconds = token_window_seconds |
There was a problem hiding this comment.
tokens_per_minute is enforced over token_window_seconds, but the code compares usage-in-window directly against the per-minute limit without scaling. Either constrain token_window_seconds to 60s (and validate it) or rename/redefine the limit as tokens_per_window (or scale tokens_per_minute * token_window_seconds / 60).
No description provided.