From da8d9401fe3ba7d02147f0966777ecc1df6976c0 Mon Sep 17 00:00:00 2001 From: Charith Nuwan Bimsara <59943919+nuwangeek@users.noreply.github.com> Date: Tue, 3 Mar 2026 14:51:45 +0530 Subject: [PATCH] Optimize intent data enrichment and service classification (#325) --- docker-compose.yml | 2 + docs/HYBRID_SEARCH_CLASSIFICATION.md | 393 +++++++++++++++ src/contextual_retrieval/constants.py | 5 +- src/intent_data_enrichment/constants.py | 4 + src/intent_data_enrichment/main_enrichment.py | 152 ++++-- src/intent_data_enrichment/models.py | 21 +- src/intent_data_enrichment/qdrant_manager.py | 266 +++++++--- src/tool_classifier/classifier.py | 476 ++++++++++++++++-- src/tool_classifier/constants.py | 25 + src/tool_classifier/sparse_encoder.py | 85 ++++ .../workflows/service_workflow.py | 152 +++++- src/vector_indexer/constants.py | 9 +- 12 files changed, 1440 insertions(+), 150 deletions(-) create mode 100644 docs/HYBRID_SEARCH_CLASSIFICATION.md create mode 100644 src/tool_classifier/sparse_encoder.py diff --git a/docker-compose.yml b/docker-compose.yml index 1fec54b..0a82508 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -178,6 +178,7 @@ services: - ./DSL/CronManager/DSL:/DSL - ./DSL/CronManager/script:/app/scripts - ./src/vector_indexer:/app/src/vector_indexer + - ./src/tool_classifier:/app/src/tool_classifier - ./src/intent_data_enrichment:/app/src/intent_data_enrichment - ./src/utils/decrypt_vault_secrets.py:/app/src/utils/decrypt_vault_secrets.py:ro # Decryption utility (read-only) - cron_data:/app/data @@ -577,6 +578,7 @@ services: - ./src/llm_config_module/config:/app/src/llm_config_module/config:ro - ./src/optimization/optimized_modules:/app/src/optimization/optimized_modules - llm_orchestration_logs:/app/logs + - ./tests:/app/tests # mount tests directory (excluded from image via .dockerignore) networks: - bykstack depends_on: diff --git a/docs/HYBRID_SEARCH_CLASSIFICATION.md b/docs/HYBRID_SEARCH_CLASSIFICATION.md new file mode 100644 index 0000000..18c512a --- /dev/null +++ b/docs/HYBRID_SEARCH_CLASSIFICATION.md @@ -0,0 +1,393 @@ +# Hybrid Search Classification & Intent Data Enrichment + +> Updated architecture for the Tool Classifier using hybrid search (dense + sparse + RRF) with per-example indexing. +> Replaces the single-embedding approach documented in `TOOL_CLASSIFIER_AND_SERVICE_WORKFLOW.md`. + +--- + +## Table of Contents + +1. [Architecture Overview](#architecture-overview) +2. [Intent Data Enrichment (Indexing)](#intent-data-enrichment-indexing) +3. [Classification Flow (Query Time)](#classification-flow-query-time) +4. [Intent Detection & Entity Extraction](#intent-detection--entity-extraction) +5. [Thresholds & Configuration](#thresholds--configuration) + +--- + +## Architecture Overview + +The system has two phases: + +1. **Indexing (offline):** For each service, create multiple Qdrant points with dense + sparse vectors +2. **Classification (query time):** Two-step search to route queries — dense for relevance, hybrid for service identification + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ INDEXING (Offline) │ +│ │ +│ service_enrichment.sh → main_enrichment.py │ +│ ├─ LLM context generation │ +│ ├─ Per-example: dense embedding + sparse BM25 vector │ +│ ├─ Summary: dense embedding + sparse BM25 vector │ +│ └─ Qdrant upsert (N examples + 1 summary = N+1 points) │ +├─────────────────────────────────────────────────────────────────────┤ +│ CLASSIFICATION (Query Time) │ +│ │ +│ User Query │ +│ ├─ Step 1: Dense search → cosine similarity (relevance check) │ +│ ├─ Step 2: Hybrid search → RRF fusion (service identification) │ +│ └─ Route: HIGH-CONFIDENCE / AMBIGUOUS / CONTEXT-RAG │ +└─────────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Intent Data Enrichment (Indexing) + +### Source Files + +| File | Role | +|------|------| +| `DSL/CronManager/script/service_enrichment.sh` | Entry point — sets environment, runs Python script | +| `src/intent_data_enrichment/main_enrichment.py` | Orchestrates per-example and summary point creation | +| `src/intent_data_enrichment/qdrant_manager.py` | Qdrant collection management, upsert, and deletion | +| `src/intent_data_enrichment/api_client.py` | LLM API calls (context generation, embeddings) | +| `src/intent_data_enrichment/models.py` | `EnrichedService` data model | +| `src/tool_classifier/sparse_encoder.py` | BM25-style sparse vector computation | + +### What Changed: Single Embedding → Per-Example Indexing + +**Before (old):** One point per service from concatenated text. + +**After (new):** N+1 points per service — one per example query, plus one summary. + +Example for a service with 3 examples: +``` +Service "Valuutakursid" → 4 Qdrant points + + Point 0 (example): "Mis suhe on euro ja usd vahel" + dense: 3072-dim embedding of this exact text + sparse: BM25 vector → {euro: 1.0, usd: 1.0, suhe: 1.0, ...} + + Point 1 (example): "Mis on euro ja btc vahetuskurss?" + dense: 3072-dim embedding of this exact text + sparse: BM25 vector → {euro: 1.0, btc: 1.0, vahetuskurss: 1.0, ...} + + Point 2 (example): "euro ja gbp vaheline kurss" + dense: 3072-dim embedding of this exact text + sparse: BM25 vector → {euro: 1.0, gbp: 1.0, kurss: 1.0, ...} + + Point 3 (summary): "Valuutakursid - Kasutaja soovib infot..." + dense: 3072-dim embedding of name + description + LLM context + sparse: BM25 vector of combined text +``` + +### Why Per-Example Indexing? + +- Each example gets its own embedding, matching diverse user phrasings better +- Short example queries aren't diluted by long descriptions +- More examples = wider coverage "net" for query matching +- Sparse vectors enable keyword matching ("EUR", "USD") alongside semantic search + +### Dense vs Sparse Vectors + +| Type | Generation | Strength | +|------|-----------|----------| +| **Dense** (3072-dim) | `text-embedding-3-large` via Azure OpenAI | Semantic similarity — matches paraphrases, cross-language | +| **Sparse** (BM25) | Term frequency hashing (`sparse_encoder.py`) | Keyword overlap — exact token matching ("EUR", "USD", "THB") | + +### Sparse Vector Generation + +```python +# sparse_encoder.py +text = "Mis suhe on euro ja usd vahel" +tokens = re.findall(r"\w+", text.lower()) # ["mis", "suhe", "on", "euro", ...] +# Each token → hashed to index in [0, VOCAB_SIZE), value = term frequency +# Output: SparseVector(indices=[hash("mis"), hash("euro"), ...], values=[1.0, 1.0, ...]) +``` + +### Qdrant Collection Schema + +```python +# Collection: "intent_collections" +vectors_config = { + "dense": VectorParams(size=3072, distance=Distance.COSINE) +} +sparse_vectors_config = { + "sparse": SparseVectorParams(index=SparseIndexParams(on_disk=False)) +} +``` + +Each point payload: +```json +{ + "service_id": "common_service_exchange_rate", + "name": "Valuutakursid", + "description": "Kasutaja soovib infot valuutade kohta", + "examples": ["Mis suhe on euro ja usd vahel", "..."], + "entities": ["currency_from", "currency_to"], + "context": "LLM-generated enriched context...", + "point_type": "example", + "example_text": "Mis suhe on euro ja usd vahel" +} +``` + +### Enrichment Pipeline Flow + +``` +service_enrichment.sh + │ + ├─ Parse args: service_id, name, description, examples, entities + │ + ├─ Step 1: LLM context generation (enriched description) + │ + ├─ Step 2: For each example query: + │ ├─ Generate dense embedding (text-embedding-3-large) + │ └─ Generate sparse vector (BM25 term hashing) + │ + ├─ Step 3: Summary point (name + description + LLM context): + │ ├─ Generate dense embedding + │ └─ Generate sparse vector + │ + ├─ Step 4: Delete existing points for this service (idempotent) + │ + └─ Step 5: Bulk upsert N+1 points to Qdrant +``` + +### Service Deletion + +When a service is deactivated, all its points are removed: +```python +qdrant_manager.delete_service_points(service_id) +# Uses payload filter: {"service_id": service_id} +``` + +--- + +## Classification Flow (Query Time) + +### Source Files + +| File | Role | +|------|------| +| `src/tool_classifier/classifier.py` | Two-step search + routing decisions | +| `src/tool_classifier/constants.py` | All thresholds and configuration | +| `src/tool_classifier/sparse_encoder.py` | Query sparse vector generation | +| `src/tool_classifier/workflows/service_workflow.py` | Service execution with 3 routing paths | + +### Step 1: Dense Search — "Is This a Service Query?" + +Queries Qdrant using only the dense vector to get **actual cosine similarity scores** (0.0 – 1.0). + +```python +# classifier.py → _dense_search() +POST /collections/intent_collections/points/query +{ + "query": [0.023, -0.041, ...], # 3072-dim dense vector + "using": "dense", + "limit": 6, + "with_payload": true +} +``` + +Results are deduplicated by `service_id` (best score per service). + +**Why not use RRF scores?** +Qdrant's RRF uses `1/(1+rank)`, producing fixed scores (0.50, 0.33, 0.25) regardless of actual relevance. A perfect match and a random query both get 0.50 for rank 1. Cosine similarity reflects true semantic closeness. + +### Step 2: Hybrid Search — "Which Service?" + +Only runs if cosine ≥ `DENSE_MIN_THRESHOLD`. Combines dense + sparse search with RRF fusion. +Sparse prefetch is only included if the query produces a non-empty sparse vector. + +```python +# classifier.py → _hybrid_search() +POST /collections/intent_collections/points/query +{ + "prefetch": [ + {"query": dense_vector, "using": "dense", "limit": 10}, + {"query": {"indices": [...], "values": [...]}, "using": "sparse", "limit": 10} + ], + "query": {"fusion": "rrf"}, + "limit": 5, + "with_payload": true +} +``` + +### Routing Decision + +``` +Dense cosine score + gap + │ + ├─ cosine < 0.38 → PATH 1: Skip SERVICE → CONTEXT/RAG + │ + ├─ cosine ≥ 0.40 AND → PATH 2: HIGH-CONFIDENCE SERVICE + │ gap ≥ 0.05 (skip discovery, intent detection on matched service only) + │ + └─ else (0.38 ≤ cosine < 0.40 → PATH 3: AMBIGUOUS SERVICE + OR gap < 0.05) (LLM intent detection on candidates) +``` + +### Path 1: Non-Service Query → CONTEXT/RAG + +Top cosine score below minimum threshold. The query has no meaningful similarity to any indexed service. + +``` +Query: "Miks ID-kaart ei tööta e-teenustes?" +Dense: top cosine=0.29 → below 0.38 → skip SERVICE +→ Routes directly to CONTEXT → RAG (saves ~50-300ms by skipping hybrid search) +``` + +### Path 2: HIGH-CONFIDENCE Service Match + +One service clearly stands out with high cosine and large gap to second result. + +``` +Query: "Palju saan 1 EUR eest THBdes?" +Dense: Valuutakursid (cosine=0.5511), gap=0.2371 +→ 0.5511 ≥ 0.40 AND 0.2371 ≥ 0.05 → HIGH-CONFIDENCE +→ Skips service discovery +→ Runs intent detection + entity extraction on matched service only +→ Entities: {currency_from: EUR, currency_to: THB} +→ Validation: PASSED ✓ +``` + +### Path 3: AMBIGUOUS Service Match → LLM Confirmation + +Multiple services score similarly or cosine is in the medium range. + +``` +Query: "Mis on täna ilm?" +Dense: Ilmapäring (cosine=0.39), gap=0.03 +→ 0.39 ≥ 0.38 but 0.39 < 0.40 → AMBIGUOUS +→ Runs LLM Intent Detection on top 3 candidates +→ LLM confirms or rejects → falls back to RAG if rejected +``` + +> **Note:** With the current threshold (0.38), the AMBIGUOUS zone (0.38–0.40) is intentionally narrow. +> Most queries resolve cleanly to either NON-SERVICE (<0.38) or HIGH-CONFIDENCE (≥0.40 with gap). + +### Fallback Chain + +Each workflow returns a response or `None` (fallback to next): + +``` +SERVICE (Layer 1) → CONTEXT (Layer 2) → RAG (Layer 3) → OOD (Layer 4) +``` + +--- + +## Intent Detection & Entity Extraction + +### When Does It Run? + +| Path | Intent Detection | Entity Extraction | +|------|-----------------|-------------------| +| HIGH-CONFIDENCE | On 1 service (matched) | Yes — from LLM output | +| AMBIGUOUS | On 2-3 candidates | Yes — if LLM matches | +| Non-service | Not run | Not run | + +### Intent Detection Module (DSPy) + +**File:** `src/tool_classifier/intent_detector.py` + +The DSPy `IntentDetectionModule` receives: +- User query +- Candidate services (formatted as JSON) +- Conversation history (last 3 turns) + +It returns: +```json +{ + "matched_service_id": "common_service_exchange_rate", + "confidence": 0.92, + "entities": { + "currency_from": "EUR", + "currency_to": "THB" + }, + "reasoning": "User wants EUR to THB exchange rate" +} +``` + +### Entity Validation + +**File:** `src/tool_classifier/workflows/service_workflow.py` → `_validate_entities()` + +Extracted entities are validated against the service's schema: + +``` +Schema: ["currency_from", "currency_to"] +Extracted: {"currency_from": "EUR", "currency_to": "THB"} +Result: PASSED ✓ +``` + +- **Missing entities** → sent as empty strings (service validates) +- **Extra entities** → ignored +- **Validation is lenient** — always proceeds, lets the service endpoint validate + +### Entity Transformation + +Entities dict → ordered array matching service schema: + +```python +# Schema: ["currency_from", "currency_to"] +# Dict: {"currency_from": "EUR", "currency_to": "THB"} +# Array: ["EUR", "THB"] +``` + +--- + +## Thresholds & Configuration + +All defined in `src/tool_classifier/constants.py`. + +### Classification Thresholds + +| Constant | Value | Description | +|----------|-------|-------------| +| `DENSE_MIN_THRESHOLD` | `0.38` | Minimum cosine to consider any service match. Below → skip SERVICE entirely. Empirically tuned: SERVICE queries score ≥ 0.49, RAG queries ≤ 0.35 — threshold sits in the 0.134 natural gap between the two distributions. | +| `DENSE_HIGH_CONFIDENCE_THRESHOLD` | `0.40` | Cosine for HIGH-CONFIDENCE path. Service queries with correct match score ≥ 0.49 (observed range: 0.49–1.00). Non-service score 0.27–0.35. | +| `DENSE_SCORE_GAP_THRESHOLD` | `0.05` | Required gap between top two services. Prevents false positives when multiple services score similarly. Service gaps: 0.15–0.75, non-service gaps: 0.001–0.029. | + +### Search Configuration + +| Constant | Value | Description | +|----------|-------|-------------| +| `DENSE_SEARCH_TOP_K` | `3` | Unique services from dense search | +| `HYBRID_SEARCH_TOP_K` | `5` | Results from hybrid RRF search | + +### Observed Score Distributions + +Based on empirical testing with 42 Estonian queries (20 SERVICE, 22 RAG): + +| Metric | Service Query (n=20) | Non-Service / RAG Query (n=22) | +|--------|:--------------------:|:------------------------------:| +| Top cosine range | **0.49 – 1.00** | 0.27 – 0.35 | +| Top cosine mean | **0.77** | 0.30 | +| Cosine gap range | **0.15 – 0.75** | 0.001 – 0.029 | +| Cosine gap mean | **0.31** | 0.010 | +| Decision | HIGH-CONFIDENCE (100%) | NON-SERVICE (100%) | + +> **Separation gap:** The lowest SERVICE cosine (0.49) and highest RAG cosine (0.35) are separated by **0.134** — a clean margin with no overlap. The threshold at 0.38 sits centrally in this gap. + +### Performance by Path + +| Path | Latency | LLM Calls | Cost | +|------|:-------:|:---------:|:----:| +| Non-service (below threshold) | ~50ms | 0 | $0 | +| HIGH-CONFIDENCE service | ~100ms | 1 | ~$0.002 | +| AMBIGUOUS service | ~3.5s | 1-2 | ~$0.002–0.004 | +| Legacy (no classifier) | ~4.0s | 2+ | ~$0.004+ | + +> **Note:** Latencies above are classification time only (embedding + Qdrant search), excluding the downstream service call or RAG pipeline. + +### Tuning Recommendations + +- **Adding more services:** Score distributions improve naturally — service queries score higher, non-service score lower. +- **Adding more examples per service:** Diverse phrasings expand the embedding coverage. Aim for 5-8 examples per service covering formal + informal + different word orders. +- **Adjusting thresholds:** Monitor the logs (`Dense search: top=... cosine=...`) and adjust if real-world scores differ from test data. + +### Current Limitations + +- **Step 7 (Ruuter service call) is not yet implemented.** The service workflow currently returns a debug response with service metadata (endpoint URL, HTTP method, extracted entities) instead of calling the actual Ruuter service endpoint. See the `TODO: STEP 7` comments in `src/tool_classifier/workflows/service_workflow.py`. diff --git a/src/contextual_retrieval/constants.py b/src/contextual_retrieval/constants.py index 7ca58cb..cb7c49c 100644 --- a/src/contextual_retrieval/constants.py +++ b/src/contextual_retrieval/constants.py @@ -5,6 +5,8 @@ and other configurable values across the contextual retrieval system. """ +from vector_indexer.constants import ResponseGenerationConstants + class HttpClientConstants: """HTTP client configuration constants.""" @@ -41,7 +43,8 @@ class SearchConstants: # Default search parameters DEFAULT_TOPK_SEMANTIC = 40 DEFAULT_TOPK_BM25 = 40 - DEFAULT_FINAL_TOP_N = 12 + # Final top-N chunks returned after RRF fusion. + DEFAULT_FINAL_TOP_N = ResponseGenerationConstants.DEFAULT_MAX_BLOCKS DEFAULT_SEARCH_TIMEOUT = 2 # Score and quality thresholds diff --git a/src/intent_data_enrichment/constants.py b/src/intent_data_enrichment/constants.py index f1f35f3..f506880 100644 --- a/src/intent_data_enrichment/constants.py +++ b/src/intent_data_enrichment/constants.py @@ -24,6 +24,10 @@ class EnrichmentConstants: VECTOR_SIZE = 3072 # Azure text-embedding-3-large dimension DISTANCE_METRIC = "Cosine" + # Named Vector Configuration (for hybrid search) + DENSE_VECTOR_NAME = "dense" + SPARSE_VECTOR_NAME = "sparse" + # Context Generation CONTEXT_TEMPLATE = """ {full_service_info} diff --git a/src/intent_data_enrichment/main_enrichment.py b/src/intent_data_enrichment/main_enrichment.py index d718678..16db8c6 100644 --- a/src/intent_data_enrichment/main_enrichment.py +++ b/src/intent_data_enrichment/main_enrichment.py @@ -3,19 +3,61 @@ Service Data Enrichment Script This script receives service data, enriches it with LLM-generated context, -creates embeddings, and stores in Qdrant intent_collections. +creates embeddings (dense + sparse per example), and stores in Qdrant intent_collections. + +Indexing strategy: +- One 'example' point per example query (dense + sparse vectors of the example text) +- One 'summary' point per service (dense + sparse vectors of name + description + context) """ import sys import json import argparse import asyncio +from typing import List from loguru import logger from intent_data_enrichment.models import ServiceData, EnrichedService, EnrichmentResult from intent_data_enrichment.api_client import LLMAPIClient from intent_data_enrichment.qdrant_manager import QdrantManager +# Import sparse encoder from tool_classifier (shared module) +sys.path.insert(0, "/app/src") +try: + from tool_classifier.sparse_encoder import compute_sparse_vector +except ImportError: + # Fallback for local development + try: + from src.tool_classifier.sparse_encoder import compute_sparse_vector + except ImportError: + logger.warning( + "Could not import sparse_encoder from tool_classifier, " + "attempting direct import" + ) + import importlib.util + import os + + # Try to find the module relative to this file + module_path = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + "tool_classifier", + "sparse_encoder.py", + ) + if os.path.exists(module_path): + spec = importlib.util.spec_from_file_location("sparse_encoder", module_path) + if spec is not None and spec.loader is not None: + sparse_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(sparse_module) + compute_sparse_vector = sparse_module.compute_sparse_vector + else: + raise ImportError( + f"Cannot load spec or loader for sparse_encoder.py at {module_path}" + ) from None + else: + raise ImportError( + f"Cannot find sparse_encoder.py at {module_path}" + ) from None + def parse_arguments() -> ServiceData: """Parse command line arguments into ServiceData model.""" @@ -76,7 +118,8 @@ def parse_arguments() -> ServiceData: async def enrich_service(service_data: ServiceData) -> EnrichmentResult: """ - Main enrichment pipeline: generate context, create embedding, store in Qdrant. + Main enrichment pipeline: generate context, create per-example embeddings, + store in Qdrant with hybrid vectors (dense + sparse). Args: service_data: Service data to enrich @@ -85,14 +128,52 @@ async def enrich_service(service_data: ServiceData) -> EnrichmentResult: EnrichmentResult with success/failure information """ try: - # Step 1: Generate rich context using LLM + # Step 1: Generate rich context using LLM (unchanged from original) logger.info("Step 1: Generating rich context with LLM") async with LLMAPIClient() as api_client: context = await api_client.generate_context(service_data) logger.success(f"Context generated: {len(context)} characters") - # Step 2: Combine generated context with original metadata for embedding - logger.info("Step 2: Combining context with original service metadata") + # Step 2: Create per-example points (dense + sparse vectors) + logger.info( + f"Step 2: Creating per-example embeddings for " + f"{len(service_data.examples)} examples" + ) + enriched_points: List[EnrichedService] = [] + + for i, example in enumerate(service_data.examples): + logger.info( + f" Creating embeddings for example {i + 1}/{len(service_data.examples)}: " + f"'{example[:80]}...'" + if len(example) > 80 + else f" Creating embeddings for example {i + 1}/{len(service_data.examples)}: " + f"'{example}'" + ) + + # Dense: embed the individual example + dense_embedding = await api_client.create_embedding(example) + + # Sparse: BM25-style term frequencies for the example + sparse_vec = compute_sparse_vector(example) + + enriched_points.append( + EnrichedService( + id=service_data.service_id, + name=service_data.name, + description=service_data.description, + examples=service_data.examples, + entities=service_data.entities, + context=context, + embedding=dense_embedding, + sparse_indices=sparse_vec.indices, + sparse_values=sparse_vec.values, + example_text=example, + point_type="example", + ) + ) + + # Step 3: Create summary point (combined name + description + context) + logger.info("Step 3: Creating summary embedding") combined_text_parts = [ f"Service Name: {service_data.name}", f"Description: {service_data.description}", @@ -108,35 +189,44 @@ async def enrich_service(service_data: ServiceData) -> EnrichmentResult: f"Required Entities: {', '.join(service_data.entities)}" ) - # Add generated context last (enriched understanding) combined_text_parts.append(f"Enriched Context: {context}") - combined_text = "\n".join(combined_text_parts) - logger.info(f"Combined text length: {len(combined_text)} characters") - - # Step 3: Create embedding for combined text - logger.info("Step 3: Creating embedding vector for combined text") - embedding = await api_client.create_embedding(combined_text) - logger.success(f"Embedding created: {len(embedding)}-dimensional vector") - - # Step 4: Prepare enriched service - enriched_service = EnrichedService( - id=service_data.service_id, - name=service_data.name, - description=service_data.description, - examples=service_data.examples, - entities=service_data.entities, - context=context, - embedding=embedding, - ) - # Step 5: Store in Qdrant - logger.info("Step 5: Storing in Qdrant") + summary_embedding = await api_client.create_embedding(combined_text) + summary_sparse = compute_sparse_vector(combined_text) + + enriched_points.append( + EnrichedService( + id=service_data.service_id, + name=service_data.name, + description=service_data.description, + examples=service_data.examples, + entities=service_data.entities, + context=context, + embedding=summary_embedding, + sparse_indices=summary_sparse.indices, + sparse_values=summary_sparse.values, + example_text=None, + point_type="summary", + ) + ) + + # Step 4: Delete existing points for this service (idempotent update) + logger.info("Step 4: Removing existing points for idempotent update") qdrant = QdrantManager() try: qdrant.connect() qdrant.ensure_collection() - success = qdrant.upsert_service(enriched_service) + + # Delete old points before inserting new ones + qdrant.delete_service_points(service_data.service_id) + + # Step 5: Bulk upsert all points (examples + summary) + logger.info( + f"Step 5: Storing {len(enriched_points)} points in Qdrant " + f"({len(service_data.examples)} examples + 1 summary)" + ) + success = qdrant.upsert_service_points(enriched_points) finally: qdrant.close() @@ -144,9 +234,13 @@ async def enrich_service(service_data: ServiceData) -> EnrichmentResult: return EnrichmentResult( success=True, service_id=service_data.service_id, - message=f"Service '{service_data.name}' enriched and indexed successfully", + message=( + f"Service '{service_data.name}' enriched and indexed successfully " + f"({len(enriched_points)} points: " + f"{len(service_data.examples)} examples + 1 summary)" + ), context_length=len(context), - embedding_dimension=len(embedding), + embedding_dimension=len(summary_embedding), error=None, ) else: diff --git a/src/intent_data_enrichment/models.py b/src/intent_data_enrichment/models.py index eb0ef64..9390e73 100644 --- a/src/intent_data_enrichment/models.py +++ b/src/intent_data_enrichment/models.py @@ -20,7 +20,12 @@ class ServiceData(BaseModel): class EnrichedService(BaseModel): - """Enriched service data ready for storage.""" + """Enriched service data ready for storage. + + Each service produces multiple points in Qdrant: + - One 'example' point per example query (for precise matching) + - One 'summary' point for the combined service description + context + """ id: str = Field(..., description="Service ID (maps to service_id)") name: str = Field(..., description="Service name") @@ -28,7 +33,19 @@ class EnrichedService(BaseModel): examples: List[str] = Field(..., description="Example queries") entities: List[str] = Field(..., description="Expected entity names") context: str = Field(..., description="Generated rich context") - embedding: List[float] = Field(..., description="Context embedding vector") + embedding: List[float] = Field(..., description="Dense embedding vector") + sparse_indices: List[int] = Field( + default_factory=list, description="Sparse vector indices" + ) + sparse_values: List[float] = Field( + default_factory=list, description="Sparse vector values" + ) + example_text: Optional[str] = Field( + default=None, description="The specific example this point represents" + ) + point_type: str = Field( + default="summary", description="Point type: 'example' or 'summary'" + ) class EnrichmentResult(BaseModel): diff --git a/src/intent_data_enrichment/qdrant_manager.py b/src/intent_data_enrichment/qdrant_manager.py index 5024e23..2e2ef2e 100644 --- a/src/intent_data_enrichment/qdrant_manager.py +++ b/src/intent_data_enrichment/qdrant_manager.py @@ -1,10 +1,21 @@ -"""Qdrant manager for intent collections.""" +"""Qdrant manager for intent collections with hybrid search support.""" import uuid -from typing import Optional +from typing import Optional, List from loguru import logger from qdrant_client import QdrantClient -from qdrant_client.models import Distance, VectorParams, PointStruct +from qdrant_client.models import ( + Distance, + VectorParams, + PointStruct, + SparseVectorParams, + SparseIndexParams, + SparseVector, + Filter, + FieldCondition, + MatchValue, + FilterSelector, +) from intent_data_enrichment.constants import EnrichmentConstants from intent_data_enrichment.models import EnrichedService @@ -14,7 +25,7 @@ class QdrantManager: - """Manages Qdrant operations for intent collections.""" + """Manages Qdrant operations for intent collections with hybrid search.""" def __init__( self, @@ -44,7 +55,12 @@ def connect(self) -> None: raise def ensure_collection(self) -> None: - """Ensure the intent_collections collection exists with correct vector size.""" + """Ensure the intent_collections collection exists with hybrid vector config. + + The collection uses named vectors: + - 'dense': 3072-dim cosine similarity vectors for semantic matching + - 'sparse': BM25-style sparse vectors for keyword matching + """ try: if not self.client: raise RuntimeError(_CLIENT_NOT_INITIALIZED) @@ -53,48 +69,60 @@ def ensure_collection(self) -> None: collection_names = [col.name for col in collections] if self.collection_name in collection_names: - # Check if existing collection has correct vector size collection_info = self.client.get_collection(self.collection_name) - - # Qdrant vectors config is a dict - get the default vector config vectors_config = collection_info.config.params.vectors - existing_vector_size: Optional[int] = None + # Check if collection has the expected named vector configuration if isinstance(vectors_config, dict): - # Get first vector config (usually the default/unnamed one) - if vectors_config: - vector_params = next(iter(vectors_config.values())) - existing_vector_size = vector_params.size + if EnrichmentConstants.DENSE_VECTOR_NAME in vectors_config: + existing_vector_size = vectors_config[ + EnrichmentConstants.DENSE_VECTOR_NAME + ].size + if existing_vector_size != EnrichmentConstants.VECTOR_SIZE: + logger.error( + f"Collection '{self.collection_name}' has incompatible vector size: " + f"{existing_vector_size} (expected {EnrichmentConstants.VECTOR_SIZE})" + ) + raise RuntimeError( + f"Collection '{self.collection_name}' has incompatible vector size " + f"({existing_vector_size} vs expected {EnrichmentConstants.VECTOR_SIZE}). " + "To recreate the collection, manually delete it first using: " + f"qdrant.client.delete_collection('{self.collection_name}') or via Qdrant UI/API." + ) + logger.info( + f"Collection '{self.collection_name}' already exists " + f"with correct hybrid vector config (dense: {existing_vector_size}d + sparse)" + ) + else: + # Old collection format (unnamed/single vector) — needs migration + logger.error( + f"Collection '{self.collection_name}' exists but uses old single-vector format. " + "Migration to named vectors (dense + sparse) required." + ) + raise RuntimeError( + f"Collection '{self.collection_name}' uses old single-vector format. " + "Please delete the collection and re-index all services. " + f"Delete with: qdrant.client.delete_collection('{self.collection_name}') " + "or via Qdrant UI/API." + ) elif vectors_config is not None: - # Direct VectorParams object (older API) - existing_vector_size = vectors_config.size - - if existing_vector_size is None: + # Direct VectorParams object (old single-vector format) logger.error( - f"Collection '{self.collection_name}' exists but vector size cannot be determined" + f"Collection '{self.collection_name}' exists but uses old single-vector format." ) raise RuntimeError( - f"Collection '{self.collection_name}' exists but vector size cannot be determined. " - "This may indicate a Qdrant API issue or unexpected collection configuration. " - "Manual intervention required: verify Qdrant health, inspect collection config, " - "or manually delete the collection if recreating is intended." + f"Collection '{self.collection_name}' uses old single-vector format. " + "Please delete the collection and re-index all services. " + f"Delete with: qdrant.client.delete_collection('{self.collection_name}') " + "or via Qdrant UI/API." ) - elif existing_vector_size != EnrichmentConstants.VECTOR_SIZE: + else: logger.error( - f"Collection '{self.collection_name}' has incompatible vector size: " - f"{existing_vector_size} (expected {EnrichmentConstants.VECTOR_SIZE})" + f"Collection '{self.collection_name}' exists but vector config cannot be determined" ) raise RuntimeError( - f"Collection '{self.collection_name}' has incompatible vector size " - f"({existing_vector_size} vs expected {EnrichmentConstants.VECTOR_SIZE}). " - "This prevents automatic deletion to avoid accidental data loss. " - "To recreate the collection, manually delete it first using: " - f"qdrant.client.delete_collection('{self.collection_name}') or via Qdrant UI/API." - ) - else: - logger.info( - f"Collection '{self.collection_name}' already exists " - f"with correct vector size ({existing_vector_size})" + f"Collection '{self.collection_name}' exists but vector config cannot be determined. " + "Manual intervention required." ) else: self._create_collection() @@ -104,77 +132,167 @@ def ensure_collection(self) -> None: raise def _create_collection(self) -> None: - """Create the collection with correct vector configuration.""" + """Create the collection with hybrid vector configuration (dense + sparse).""" if not self.client: raise RuntimeError(_CLIENT_NOT_INITIALIZED) logger.info( f"Creating collection '{self.collection_name}' " - f"with vector size {EnrichmentConstants.VECTOR_SIZE}" + f"with hybrid vectors (dense: {EnrichmentConstants.VECTOR_SIZE}d + sparse)" ) self.client.create_collection( collection_name=self.collection_name, - vectors_config=VectorParams( - size=EnrichmentConstants.VECTOR_SIZE, - distance=Distance.COSINE, - ), + vectors_config={ + EnrichmentConstants.DENSE_VECTOR_NAME: VectorParams( + size=EnrichmentConstants.VECTOR_SIZE, + distance=Distance.COSINE, + ), + }, + sparse_vectors_config={ + EnrichmentConstants.SPARSE_VECTOR_NAME: SparseVectorParams( + index=SparseIndexParams(on_disk=False), + ), + }, ) logger.success(f"Collection '{self.collection_name}' created successfully") - def upsert_service(self, enriched_service: EnrichedService) -> bool: - """ - Upsert enriched service to Qdrant (update if exists, insert if new). + def delete_service_points(self, service_id: str) -> bool: + """Delete all points belonging to a service. + + Used before re-indexing to ensure idempotent updates, and when + a service is deactivated. Args: - enriched_service: EnrichedService instance containing the embedding and - associated metadata to upsert into Qdrant. + service_id: Service identifier to delete all points for Returns: True if successful, False otherwise """ try: if not self.client: - raise RuntimeError("Qdrant client not initialized") - - logger.info(f"Upserting service '{enriched_service.id}' to Qdrant") - - # Convert service_id to UUID for Qdrant compatibility - # Qdrant requires point IDs to be either integers or UUIDs - point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, enriched_service.id)) - - # Prepare payload (all metadata except embedding) - payload = { - "service_id": enriched_service.id, # Store original ID in payload - "name": enriched_service.name, - "description": enriched_service.description, - "examples": enriched_service.examples, - "entities": enriched_service.entities, - "context": enriched_service.context, - } - - # Create point with UUID - point = PointStruct( - id=point_id, # ✓ Now using UUID string - vector=enriched_service.embedding, - payload=payload, + raise RuntimeError(_CLIENT_NOT_INITIALIZED) + + logger.info( + f"Deleting existing points for service '{service_id}' from Qdrant" + ) + + self.client.delete( + collection_name=self.collection_name, + points_selector=FilterSelector( + filter=Filter( + must=[ + FieldCondition( + key="service_id", + match=MatchValue(value=service_id), + ) + ] + ) + ), ) - # Upsert to Qdrant + logger.success(f"Successfully deleted points for service '{service_id}'") + return True + + except Exception as e: + logger.error(f"Failed to delete points for service '{service_id}': {e}") + return False + + def upsert_service_points(self, enriched_points: List[EnrichedService]) -> bool: + """Upsert multiple enriched service points to Qdrant. + + Each point contains both dense and sparse vectors for hybrid search. + Points are identified by a deterministic UUID based on service_id + point_index. + + Args: + enriched_points: List of EnrichedService instances (examples + summary) + + Returns: + True if all points upserted successfully, False otherwise + """ + try: + if not self.client: + raise RuntimeError(_CLIENT_NOT_INITIALIZED) + + if not enriched_points: + logger.warning("No points to upsert") + return True + + service_id = enriched_points[0].id + logger.info( + f"Upserting {len(enriched_points)} points for service '{service_id}'" + ) + + from typing import Any, Dict + + points: List[PointStruct] = [] + for idx, enriched_service in enumerate(enriched_points): + # Deterministic UUID based on service_id + index + point_id_source = f"{enriched_service.id}_{idx}" + point_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, point_id_source)) + + # Prepare payload + payload = { + "service_id": enriched_service.id, + "name": enriched_service.name, + "description": enriched_service.description, + "examples": enriched_service.examples, + "entities": enriched_service.entities, + "context": enriched_service.context, + "point_type": enriched_service.point_type, + } + + # Add example_text for example points + if enriched_service.example_text: + payload["example_text"] = enriched_service.example_text + + # Build named vectors (dense always, sparse if present) + vectors: Dict[str, Any] = { + EnrichmentConstants.DENSE_VECTOR_NAME: enriched_service.embedding, + } + if enriched_service.sparse_indices: + vectors[EnrichmentConstants.SPARSE_VECTOR_NAME] = SparseVector( + indices=enriched_service.sparse_indices, + values=enriched_service.sparse_values, + ) + + point = PointStruct( + id=point_id, + vector=vectors, + payload=payload, + ) + + points.append(point) + + # Bulk upsert self.client.upsert( collection_name=self.collection_name, - points=[point], + points=points, ) logger.success( - f"Successfully upserted service '{enriched_service.id}' " - f"({len(enriched_service.embedding)}-dim vector)" + f"Successfully upserted {len(points)} points for service '{service_id}' " + f"({sum(1 for p in enriched_points if p.point_type == 'example')} examples + " + f"{sum(1 for p in enriched_points if p.point_type == 'summary')} summary)" ) return True except Exception as e: - logger.error(f"Failed to upsert service '{enriched_service.id}': {e}") + logger.error(f"Failed to upsert service points: {e}") return False + def upsert_service(self, enriched_service: EnrichedService) -> bool: + """Upsert a single enriched service to Qdrant. + + Backward-compatible wrapper that delegates to upsert_service_points. + + Args: + enriched_service: EnrichedService instance + + Returns: + True if successful, False otherwise + """ + return self.upsert_service_points([enriched_service]) + def close(self) -> None: """Close Qdrant connection.""" if self.client: diff --git a/src/tool_classifier/classifier.py b/src/tool_classifier/classifier.py index 4455f8c..0d4644d 100644 --- a/src/tool_classifier/classifier.py +++ b/src/tool_classifier/classifier.py @@ -1,6 +1,7 @@ -"""Main tool classifier for workflow routing.""" +"""Main tool classifier for workflow routing with hybrid search classification.""" from typing import Any, AsyncIterator, Dict, List, Literal, Optional, Union, overload +import httpx from loguru import logger from models.request_models import ( @@ -8,8 +9,24 @@ OrchestrationRequest, OrchestrationResponse, ) -from tool_classifier.enums import WorkflowType, WORKFLOW_DISPLAY_NAMES +from tool_classifier.enums import ( + WorkflowType, + WORKFLOW_DISPLAY_NAMES, + WORKFLOW_LAYER_ORDER, +) from tool_classifier.models import ClassificationResult +from tool_classifier.constants import ( + QDRANT_HOST, + QDRANT_PORT, + QDRANT_COLLECTION, + QDRANT_TIMEOUT, + HYBRID_SEARCH_TOP_K, + DENSE_SEARCH_TOP_K, + DENSE_MIN_THRESHOLD, + DENSE_HIGH_CONFIDENCE_THRESHOLD, + DENSE_SCORE_GAP_THRESHOLD, +) +from tool_classifier.sparse_encoder import compute_sparse_vector from tool_classifier.workflows import ( ServiceWorkflowExecutor, ContextWorkflowExecutor, @@ -22,19 +39,20 @@ class ToolClassifier: """ Main classifier that determines which workflow should handle user queries. + Uses a two-step search approach for classification: + 1. Dense-only search → real cosine similarity scores for relevance check + 2. Hybrid search (dense + sparse + RRF) → best service identification + + Routing decisions: + - High-confidence service match → SERVICE workflow (skip discovery + intent detection) + - Ambiguous match → SERVICE workflow with LLM confirmation + - No match → CONTEXT/RAG workflow (skip SERVICE entirely) + Implements a layer-wise filtering approach: Layer 1: Service Workflow → External API calls Layer 2: Context Workflow → Conversation history/greetings Layer 3: RAG Workflow → Knowledge base retrieval Layer 4: OOD Workflow → Out-of-domain fallback - - Each layer is tried in sequence. If a layer cannot handle the query - (returns None), the classifier falls back to the next layer. - - Architecture: - - Strategy Pattern: Each workflow is a pluggable strategy - - Chain of Responsibility: Layers form a fallback chain - - Dependency Injection: LLM manager and connections injected from main service """ def __init__( @@ -52,6 +70,17 @@ def __init__( self.llm_manager = llm_manager self.orchestration_service = orchestration_service + # Shared httpx client for Qdrant queries (connection pooling) + self._qdrant_base_url = f"http://{QDRANT_HOST}:{QDRANT_PORT}" + self._qdrant_client = httpx.AsyncClient( + base_url=self._qdrant_base_url, + timeout=QDRANT_TIMEOUT, + limits=httpx.Limits( + max_connections=20, + max_keepalive_connections=10, + ), + ) + # Initialize workflow executors self.service_workflow = ServiceWorkflowExecutor( llm_manager=llm_manager, @@ -65,7 +94,10 @@ def __init__( ) self.ood_workflow = OODWorkflowExecutor() - logger.info("Tool classifier initialized with all workflow executors") + logger.info( + "Tool classifier initialized with hybrid search classification " + f"(Qdrant: {self._qdrant_base_url})" + ) async def classify( self, @@ -74,13 +106,15 @@ async def classify( language: str, ) -> ClassificationResult: """ - Classify a user query to determine which workflow should handle it. + Classify a user query using a two-step search approach. - Implements layer-wise classification logic with fallback chain: - 1. SERVICE workflow (external API calls) - 2. CONTEXT workflow (greetings/conversation history) - 3. RAG workflow (knowledge base retrieval) - 4. OOD workflow (out-of-domain) + Step 1: Dense-only search → cosine similarity for relevance check + Step 2: Hybrid search (dense + sparse + RRF) → service identification + + Routing: + - cosine < DENSE_MIN_THRESHOLD → CONTEXT/RAG (skip SERVICE) + - cosine ≥ HIGH_CONFIDENCE + large gap → SERVICE (no LLM needed) + - else → SERVICE with LLM confirmation Args: query: User's query string @@ -92,13 +126,403 @@ async def classify( """ logger.info(f"Classifying query: {query[:100]}...") - logger.info("Starting layer-wise fallback: ") - return ClassificationResult( - workflow=WorkflowType.SERVICE, - confidence=1.0, - metadata={}, - reasoning="Start with Service workflow - will cascade through layers", - ) + try: + # Step 1: Generate dense embedding for query + query_embedding = self._get_query_embedding(query) + if query_embedding is None: + logger.warning( + "Failed to generate query embedding, falling back to CONTEXT/RAG" + ) + return ClassificationResult( + workflow=WorkflowType.CONTEXT, + confidence=1.0, + metadata={"reason": "embedding_generation_failed"}, + reasoning="Could not generate embedding - skip to Context/RAG", + ) + + # Step 2: Dense-only search → get actual cosine similarity scores + dense_results = await self._dense_search( + dense_vector=query_embedding, + top_k=DENSE_SEARCH_TOP_K, + ) + + if not dense_results: + logger.info("No dense search results - routing to CONTEXT/RAG") + return ClassificationResult( + workflow=WorkflowType.CONTEXT, + confidence=1.0, + metadata={"reason": "no_service_match"}, + reasoning="No services matched the query (dense search empty)", + ) + + top_cosine = dense_results[0].get("cosine_score", 0.0) + top_service_name = dense_results[0].get("name", "unknown") + second_cosine = ( + dense_results[1].get("cosine_score", 0.0) + if len(dense_results) > 1 + else 0.0 + ) + cosine_gap = top_cosine - second_cosine + + logger.info( + f"Dense search: top={top_service_name} " + f"(cosine={top_cosine:.4f}), " + f"second={dense_results[1].get('name', 'none') if len(dense_results) > 1 else 'none'} " + f"(cosine={second_cosine:.4f}), " + f"gap={cosine_gap:.4f}" + ) + + # Decision: Is this a service query at all? + if top_cosine < DENSE_MIN_THRESHOLD: + logger.info( + f"Low relevance (cosine={top_cosine:.4f} < {DENSE_MIN_THRESHOLD}) " + f"- routing to CONTEXT/RAG, skipping SERVICE" + ) + return ClassificationResult( + workflow=WorkflowType.CONTEXT, + confidence=1.0, + metadata={ + "reason": "below_dense_threshold", + "top_cosine": top_cosine, + "top_service": top_service_name, + }, + reasoning=( + f"Dense cosine {top_cosine:.4f} below threshold " + f"{DENSE_MIN_THRESHOLD} - skip to Context/RAG" + ), + ) + + # Step 3: Hybrid search → identify best service using RRF + query_sparse = compute_sparse_vector(query) + hybrid_results = await self._hybrid_search( + dense_vector=query_embedding, + sparse_vector=query_sparse, + top_k=HYBRID_SEARCH_TOP_K, + ) + + # Use hybrid results for service identification, dense scores for confidence + if not hybrid_results: + # Dense matched but hybrid didn't — use dense results + hybrid_results = dense_results + + top_result = hybrid_results[0] + top_service_id = top_result.get("service_id", "unknown") + top_service_name_hybrid = top_result.get("name", "unknown") + + logger.info( + f"Hybrid search: best service={top_service_name_hybrid} " + f"(service_id={top_service_id})" + ) + + # High confidence: cosine is high AND clear gap to second result + if ( + top_cosine >= DENSE_HIGH_CONFIDENCE_THRESHOLD + and cosine_gap >= DENSE_SCORE_GAP_THRESHOLD + ): + logger.info( + f"HIGH-CONFIDENCE match: {top_service_name_hybrid} " + f"(cosine={top_cosine:.4f}, gap={cosine_gap:.4f})" + ) + return ClassificationResult( + workflow=WorkflowType.SERVICE, + confidence=min(top_cosine, 1.0), + metadata={ + "matched_service_id": top_service_id, + "matched_service_name": top_service_name_hybrid, + "cosine_score": top_cosine, + "cosine_gap": cosine_gap, + "needs_llm_confirmation": False, + "top_results": hybrid_results[:3], + }, + reasoning=( + f"High-confidence match: {top_service_name_hybrid} " + f"(cosine={top_cosine:.4f}, gap={cosine_gap:.4f})" + ), + ) + + # Medium confidence: above min threshold but ambiguous + logger.info( + f"AMBIGUOUS match: {top_service_name_hybrid} " + f"(cosine={top_cosine:.4f}, gap={cosine_gap:.4f}) - needs LLM confirmation" + ) + return ClassificationResult( + workflow=WorkflowType.SERVICE, + confidence=0.5, + metadata={ + "matched_service_id": top_service_id, + "matched_service_name": top_service_name_hybrid, + "cosine_score": top_cosine, + "cosine_gap": cosine_gap, + "needs_llm_confirmation": True, + "top_results": hybrid_results[:3], + }, + reasoning=( + f"Ambiguous match: {top_service_name_hybrid} " + f"(cosine={top_cosine:.4f}) - LLM confirmation needed" + ), + ) + + except Exception as e: + logger.error(f"Hybrid classification failed: {e}", exc_info=True) + return ClassificationResult( + workflow=WorkflowType.CONTEXT, + confidence=1.0, + metadata={"reason": "classification_error", "error": str(e)}, + reasoning=f"Classification error - falling back to Context/RAG: {e}", + ) + + def _get_query_embedding(self, query: str) -> Optional[List[float]]: + """Generate dense embedding for a query using the orchestration service. + + Args: + query: Query text to embed + + Returns: + List of floats representing the dense embedding, or None on failure + """ + try: + if not self.orchestration_service: + logger.error("Orchestration service not available for embedding") + return None + + result = self.orchestration_service.create_embeddings_for_indexer( + texts=[query], + environment="production", + batch_size=1, + ) + + embeddings = result.get("embeddings", []) + if embeddings and len(embeddings) > 0: + return embeddings[0] + + logger.error("No embedding returned for query") + return None + + except Exception as e: + logger.error(f"Failed to generate query embedding: {e}") + return None + + async def _dense_search( + self, + dense_vector: List[float], + top_k: int = DENSE_SEARCH_TOP_K, + ) -> List[Dict[str, Any]]: + """Execute dense-only search on Qdrant to get actual cosine similarity scores. + + This is used as a pre-filter: the cosine scores tell us HOW RELEVANT + the top results actually are, unlike RRF scores which are purely rank-based. + + Args: + dense_vector: Dense embedding vector (3072-dim) + top_k: Number of results to return + + Returns: + List of result dicts with service metadata and cosine_score, + deduplicated by service_id (best score per service) + """ + try: + search_payload = { + "query": dense_vector, + "using": "dense", + "limit": top_k * 2, # Get more to allow dedup by service + "with_payload": True, + } + + response = await self._qdrant_client.post( + f"/collections/{QDRANT_COLLECTION}/points/query", + json=search_payload, + ) + + if response.status_code != 200: + logger.error( + f"Qdrant dense search failed: HTTP {response.status_code} - " + f"{response.text}" + ) + return [] + + search_results = response.json() + points = search_results.get("result", {}).get("points", []) + + if not points: + logger.info("No results from dense search") + return [] + + # Deduplicate by service_id (keep best cosine score per service) + service_results: Dict[str, Dict[str, Any]] = {} + for point in points: + payload = point.get("payload", {}) + score = float(point.get("score", 0)) + service_id = payload.get("service_id", "unknown") + + if service_id not in service_results or score > service_results[ + service_id + ].get("cosine_score", 0): + service_results[service_id] = { + "service_id": service_id, + "name": payload.get("name", ""), + "description": payload.get("description", ""), + "examples": payload.get("examples", []), + "entities": payload.get("entities", []), + "context": payload.get("context", ""), + "point_type": payload.get("point_type", "unknown"), + "example_text": payload.get("example_text"), + "cosine_score": score, + } + + # Sort by cosine score descending + sorted_results = sorted( + service_results.values(), + key=lambda x: x["cosine_score"], + reverse=True, + ) + + logger.info( + f"Dense search found {len(sorted_results)} unique services " + f"(top cosine: {sorted_results[0]['cosine_score']:.4f})" + ) + + return sorted_results + + except httpx.TimeoutException: + logger.error(f"Qdrant dense search timeout after {QDRANT_TIMEOUT}s") + return [] + except Exception as e: + logger.error(f"Dense search failed: {e}", exc_info=True) + return [] + + async def _hybrid_search( + self, + dense_vector: List[float], + sparse_vector: Any, + top_k: int = HYBRID_SEARCH_TOP_K, + ) -> List[Dict[str, Any]]: + """Execute hybrid search on Qdrant using prefetch + RRF fusion. + + Sends both dense and sparse vectors in a single Qdrant query, + using the prefetch API for parallel retrieval and RRF for fusion. + + Args: + dense_vector: Dense embedding vector (3072-dim) + sparse_vector: SparseVector with indices and values + top_k: Number of results to return + + Returns: + List of result dicts with service metadata and rrf_score + """ + try: + # Check if collection exists and has data + try: + collection_info = await self._qdrant_client.get( + f"/collections/{QDRANT_COLLECTION}" + ) + if collection_info.status_code == 200: + info = collection_info.json() + points_count = info.get("result", {}).get("points_count", 0) + if points_count == 0: + logger.info("Intent collection is empty - no services indexed") + return [] + else: + logger.warning( + f"Could not verify collection: HTTP {collection_info.status_code}" + ) + return [] + except Exception as e: + logger.warning(f"Could not verify intent collection: {e}") + return [] + + # Build hybrid search payload with prefetch + RRF + search_payload: Dict[str, Any] = { + "prefetch": [ + { + "query": dense_vector, + "using": "dense", + "limit": top_k * 2, + }, + ], + "query": {"fusion": "rrf"}, + "limit": top_k, + "with_payload": True, + } + + # Add sparse prefetch only if sparse vector is non-empty + if not sparse_vector.is_empty(): + search_payload["prefetch"].append( + { + "query": sparse_vector.to_dict(), + "using": "sparse", + "limit": top_k * 2, + } + ) + + response = await self._qdrant_client.post( + f"/collections/{QDRANT_COLLECTION}/points/query", + json=search_payload, + ) + + if response.status_code != 200: + logger.error( + f"Qdrant hybrid search failed: HTTP {response.status_code} - " + f"{response.text}" + ) + return [] + + search_results = response.json() + points = search_results.get("result", {}).get("points", []) + + if not points: + logger.info("No results from hybrid search") + return [] + + # Parse and deduplicate results (group by service_id, keep best score) + service_results: Dict[str, Dict[str, Any]] = {} + for point in points: + payload = point.get("payload", {}) + score = float(point.get("score", 0)) + service_id = payload.get("service_id", "unknown") + + if service_id not in service_results or score > service_results[ + service_id + ].get("rrf_score", 0): + service_results[service_id] = { + "service_id": service_id, + "name": payload.get("name", ""), + "description": payload.get("description", ""), + "examples": payload.get("examples", []), + "entities": payload.get("entities", []), + "context": payload.get("context", ""), + "point_type": payload.get("point_type", "unknown"), + "example_text": payload.get("example_text"), + "rrf_score": score, + } + + # Sort by RRF score descending + sorted_results = sorted( + service_results.values(), + key=lambda x: x["rrf_score"], + reverse=True, + ) + + logger.info( + f"Hybrid search found {len(sorted_results)} unique services " + f"from {len(points)} points" + ) + + for i, r in enumerate(sorted_results[:3]): + logger.debug( + f" Rank {i + 1}: {r['name']} " + f"(service_id={r['service_id']}, " + f"rrf_score={r['rrf_score']:.6f}, " + f"type={r['point_type']})" + ) + + return sorted_results + + except httpx.TimeoutException: + logger.error(f"Qdrant hybrid search timeout after {QDRANT_TIMEOUT}s") + return [] + except Exception as e: + logger.error(f"Hybrid search failed: {e}", exc_info=True) + return [] @overload async def route_to_workflow( @@ -231,7 +655,6 @@ async def _execute_with_fallback_async( ) # Get the layer order starting from current layer - from tool_classifier.enums import WORKFLOW_LAYER_ORDER current_index = WORKFLOW_LAYER_ORDER.index(start_layer) remaining_layers = WORKFLOW_LAYER_ORDER[current_index + 1 :] @@ -253,7 +676,6 @@ async def _execute_with_fallback_async( return result logger.info(f"[{chat_id}] {next_name} returned None, continuing...") - current_index += 1 # This should never happen since RAG/OOD should always return result raise RuntimeError("All workflows returned None (unexpected)") @@ -313,7 +735,6 @@ async def _execute_with_fallback_streaming( ) # Get the layer order starting from current layer - from tool_classifier.enums import WORKFLOW_LAYER_ORDER current_index = WORKFLOW_LAYER_ORDER.index(start_layer) remaining_layers = WORKFLOW_LAYER_ORDER[current_index + 1 :] @@ -338,7 +759,6 @@ async def _execute_with_fallback_streaming( return logger.info(f"[{chat_id}] {next_name} returned None, continuing...") - current_index += 1 # This should never happen raise RuntimeError("All workflows returned None in streaming (unexpected)") diff --git a/src/tool_classifier/constants.py b/src/tool_classifier/constants.py index c885b52..65f3033 100644 --- a/src/tool_classifier/constants.py +++ b/src/tool_classifier/constants.py @@ -58,3 +58,28 @@ SERVICE_COUNT_THRESHOLD = 10 """Threshold for triggering semantic search. If service count > this value, semantic search is used instead of sending all services to LLM.""" + + +# ============================================================================ +# Hybrid Search Classification Thresholds +# ============================================================================ + +HYBRID_SEARCH_TOP_K = 5 +"""Number of top results from hybrid search for service identification.""" + +DENSE_SEARCH_TOP_K = 3 +"""Number of top results from dense-only search for relevance scoring.""" + +DENSE_MIN_THRESHOLD = 0.38 +"""Minimum dense cosine similarity to consider a result as a potential match. +Below this → skip SERVICE entirely, go to CONTEXT/RAG. +Note: Multilingual embeddings (Estonian/short queries) typically yield +lower cosine scores (0.25-0.40) than English. Tune based on observed scores.""" + +DENSE_HIGH_CONFIDENCE_THRESHOLD = 0.40 +"""Dense cosine similarity for high-confidence service classification. +Above this AND score gap is large → SERVICE without LLM confirmation.""" + +DENSE_SCORE_GAP_THRESHOLD = 0.05 +"""Cosine score gap (top - second) for high-confidence classification. +Ensures the top result is significantly better than the runner-up.""" diff --git a/src/tool_classifier/sparse_encoder.py b/src/tool_classifier/sparse_encoder.py new file mode 100644 index 0000000..06f38a8 --- /dev/null +++ b/src/tool_classifier/sparse_encoder.py @@ -0,0 +1,85 @@ +""" +Sparse vector encoder for BM25-style term frequency vectors. + +Shared module used by both: +- intent_data_enrichment (indexing time) — to create sparse vectors for service examples +- tool_classifier (query time) — to create sparse vectors for user queries + +Uses hash-based indexing compatible with Qdrant's sparse vector format. +""" + +import hashlib +import re +from collections import Counter +from dataclasses import dataclass, field +from typing import List + + +# Hash space for sparse vector indices +# Larger = fewer collisions but more memory; 50K is a good balance for intent classification +SPARSE_VOCAB_SIZE = 50_000 + +# Simple word tokenizer matching the pattern used in contextual_retrieval/bm25_search.py +TOKENIZER_PATTERN = re.compile(r"\w+") + + +@dataclass +class SparseVector: + """Sparse vector representation for Qdrant. + + Attributes: + indices: Sorted list of non-zero dimension indices + values: Corresponding values for each index + """ + + indices: List[int] = field(default_factory=list) + values: List[float] = field(default_factory=list) + + def to_dict(self) -> dict: + """Convert to Qdrant API format.""" + return {"indices": self.indices, "values": self.values} + + def is_empty(self) -> bool: + """Check if the sparse vector has no entries.""" + return len(self.indices) == 0 + + +def compute_sparse_vector(text: str) -> SparseVector: + """Convert text to a sparse vector using term-frequency hashing. + + Tokenizes the input text, counts term frequencies, and maps each token + to a hash-based index in the sparse vector space. This creates a + BM25-compatible representation that Qdrant can use for sparse search. + + Args: + text: Input text to vectorize + + Returns: + SparseVector with hash-based indices and term frequency values + """ + if not text or not text.strip(): + return SparseVector() + + # Tokenize: lowercase and extract word tokens + tokens = TOKENIZER_PATTERN.findall(text.lower()) + if not tokens: + return SparseVector() + + # Count term frequencies + token_counts = Counter(tokens) + + # Hash-based indexing: map each token to an index in [0, SPARSE_VOCAB_SIZE) + # Uses MD5 (first 4 bytes) for deterministic cross-process indices. + # Collisions are handled by summing values at the same index. + hash_counts: dict[int, float] = {} + for token, count in token_counts.items(): + digest = hashlib.md5(token.encode(), usedforsecurity=False).digest() # noqa: S324 + idx = int.from_bytes(digest[:4], "little") % SPARSE_VOCAB_SIZE + # Handle hash collisions by accumulating + hash_counts[idx] = hash_counts.get(idx, 0) + float(count) + + # Sort indices for consistent representation (Qdrant requirement) + sorted_indices = sorted(hash_counts.keys()) + sorted_values = [hash_counts[i] for i in sorted_indices] + + return SparseVector(indices=sorted_indices, values=sorted_values) diff --git a/src/tool_classifier/workflows/service_workflow.py b/src/tool_classifier/workflows/service_workflow.py index b432c62..dbb5211 100644 --- a/src/tool_classifier/workflows/service_workflow.py +++ b/src/tool_classifier/workflows/service_workflow.py @@ -554,6 +554,11 @@ async def execute_async( ) -> Optional[OrchestrationResponse]: """Execute service workflow in non-streaming mode. + Uses classification metadata from hybrid search: + - needs_llm_confirmation=False: Skip discovery + intent detection, use matched service + - needs_llm_confirmation=True: Run LLM intent detection on candidate services only + - No metadata: Fall back to original discovery flow + Args: request: Orchestration request context: Workflow context @@ -568,12 +573,77 @@ async def execute_async( if time_metric is None: time_metric = {} - # Service discovery with timing - start_time = time.time() - await self._log_request_details( - request, context, mode="non-streaming", costs_metric=costs_metric - ) - time_metric["service.discovery"] = time.time() - start_time + # Check if classifier provided hybrid search metadata + needs_llm_confirmation = context.get("needs_llm_confirmation") + + if needs_llm_confirmation is False: + # HIGH CONFIDENCE PATH: Classifier matched a service with high confidence + # Skip service discovery — use hybrid search match directly + matched_service_id = context.get("matched_service_id") + matched_service_name = context.get("matched_service_name") + rrf_score = context.get("rrf_score", 0) + + logger.info( + f"[{chat_id}] HIGH-CONFIDENCE SERVICE MATCH (non-streaming): " + f"{matched_service_name} (rrf_score={rrf_score:.6f}) - " + f"skipping discovery" + ) + + # Get service details from top_results (already retrieved by classifier) + top_results = context.get("top_results", []) + if top_results: + matched = top_results[0] + + # Run entity extraction via LLM (DSPy) for this single service + start_time = time.time() + await self._process_intent_detection( + services=[matched], + request=request, + chat_id=chat_id, + context=context, + costs_metric=costs_metric, + ) + time_metric["service.intent_detection"] = time.time() - start_time + + # Ensure service_data is populated from hybrid match + # _process_intent_detection may not set it if DSPy returns + # a different service_id format, so we populate it explicitly + if not context.get("service_data"): + context["service_id"] = matched.get("service_id") + context["service_data"] = matched + logger.info( + f"[{chat_id}] Populated service_data from hybrid match: " + f"{matched.get('name')}" + ) + + elif needs_llm_confirmation is True: + # AMBIGUOUS PATH: Multiple services scored similarly + # Run LLM intent detection only on candidate services (not all services) + top_results = context.get("top_results", []) + logger.info( + f"[{chat_id}] AMBIGUOUS SERVICE MATCH (non-streaming): " + f"running LLM intent detection on {len(top_results)} candidates" + ) + + start_time = time.time() + if top_results: + await self._process_intent_detection( + services=top_results, + request=request, + chat_id=chat_id, + context=context, + costs_metric=costs_metric, + ) + time_metric["service.discovery"] = time.time() - start_time + + else: + # LEGACY PATH: No hybrid search metadata (classifier disabled or error) + # Full service discovery + intent detection (original behavior) + start_time = time.time() + await self._log_request_details( + request, context, mode="non-streaming", costs_metric=costs_metric + ) + time_metric["service.discovery"] = time.time() - start_time # Check if service was detected and validated if not context.get("service_id"): @@ -692,6 +762,8 @@ async def execute_streaming( ) -> Optional[AsyncIterator[str]]: """Execute service workflow in streaming mode. + Uses classification metadata from hybrid search (same as execute_async). + Args: request: Orchestration request context: Workflow context @@ -706,12 +778,68 @@ async def execute_streaming( if time_metric is None: time_metric = {} - # Service discovery with timing - start_time = time.time() - await self._log_request_details( - request, context, mode="streaming", costs_metric=costs_metric - ) - time_metric["service.discovery"] = time.time() - start_time + # Check if classifier provided hybrid search metadata + needs_llm_confirmation = context.get("needs_llm_confirmation") + + if needs_llm_confirmation is False: + # HIGH CONFIDENCE PATH: Skip discovery, use matched service + matched_service_name = context.get("matched_service_name") + rrf_score = context.get("rrf_score", 0) + + logger.info( + f"[{chat_id}] HIGH-CONFIDENCE SERVICE MATCH (streaming): " + f"{matched_service_name} (rrf_score={rrf_score:.6f})" + ) + + top_results = context.get("top_results", []) + if top_results: + matched = top_results[0] + + start_time = time.time() + await self._process_intent_detection( + services=[matched], + request=request, + chat_id=chat_id, + context=context, + costs_metric=costs_metric, + ) + time_metric["service.intent_detection"] = time.time() - start_time + + # Ensure service_data is populated from hybrid match + if not context.get("service_data"): + context["service_id"] = matched.get("service_id") + context["service_data"] = matched + logger.info( + f"[{chat_id}] Populated service_data from hybrid match: " + f"{matched.get('name')}" + ) + + elif needs_llm_confirmation is True: + # AMBIGUOUS PATH: Run LLM intent detection on candidates + top_results = context.get("top_results", []) + logger.info( + f"[{chat_id}] AMBIGUOUS SERVICE MATCH (streaming): " + f"{len(top_results)} candidates" + ) + + start_time = time.time() + if top_results: + await self._process_intent_detection( + services=top_results, + request=request, + chat_id=chat_id, + context=context, + costs_metric=costs_metric, + ) + time_metric["service.discovery"] = time.time() - start_time + + else: + # LEGACY PATH: Full service discovery (original behavior) + start_time = time.time() + await self._log_request_details( + request, context, mode="streaming", costs_metric=costs_metric + ) + time_metric["service.discovery"] = time.time() - start_time # Check if service was detected and validated if not context.get("service_id"): diff --git a/src/vector_indexer/constants.py b/src/vector_indexer/constants.py index c4f3810..b685428 100644 --- a/src/vector_indexer/constants.py +++ b/src/vector_indexer/constants.py @@ -100,10 +100,11 @@ class ProcessingConstants: class ResponseGenerationConstants: """Constants for response generation and context retrieval.""" - # Top-K blocks for response generation - # This controls how many of the retrieved chunks are used - # for generating the final response - DEFAULT_MAX_BLOCKS = 5 # Maximum context blocks to use in response generation + # Controls both: + # 1. How many chunks the contextual retriever returns after RRF fusion + # 2. How many context blocks the response generator feeds to the LLM + # Change this value to adjust both retrieval and generation together. + DEFAULT_MAX_BLOCKS = 5 MIN_BLOCKS_REQUIRED = 3 # Minimum blocks required for valid response