From adae7901ef9b23456330ec338d3d3b6410a39bd5 Mon Sep 17 00:00:00 2001 From: Charith Nuwan Bimsara <59943919+nuwangeek@users.noreply.github.com> Date: Tue, 3 Mar 2026 15:36:09 +0530 Subject: [PATCH] Optimize first user query response generation time (#326) --- src/contextual_retrieval/bm25_search.py | 53 ++++++++++++- .../contextual_retriever.py | 30 ++++++-- src/guardrails/nemo_rails_adapter.py | 41 ++++------ src/intent_data_enrichment/main_enrichment.py | 22 ++++-- src/llm_orchestration_service.py | 66 +++++++++++++++- src/llm_orchestration_service_api.py | 75 +++++-------------- .../vault/vault_client.py | 25 +------ src/tool_classifier/classifier.py | 8 ++ .../workflows/service_workflow.py | 12 +-- src/utils/prompt_config_loader.py | 10 +-- 10 files changed, 207 insertions(+), 135 deletions(-) diff --git a/src/contextual_retrieval/bm25_search.py b/src/contextual_retrieval/bm25_search.py index 5bde02d0..2be66e4c 100644 --- a/src/contextual_retrieval/bm25_search.py +++ b/src/contextual_retrieval/bm25_search.py @@ -5,10 +5,11 @@ when collection data changes. """ -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Optional, Set from loguru import logger from rank_bm25 import BM25Okapi import re +import asyncio from contextual_retrieval.contextual_retrieval_api_client import get_http_client_manager from contextual_retrieval.error_handler import SecureErrorHandler from contextual_retrieval.constants import ( @@ -33,6 +34,11 @@ def __init__( self.chunk_mapping: Dict[int, Dict[str, Any]] = {} self.last_collection_stats: Dict[str, Any] = {} self.tokenizer_pattern = re.compile(r"\w+") # Simple word tokenizer + # Background refresh state - prevents blocking queries during index rebuild + self._refresh_in_progress: bool = False + self._refresh_lock: asyncio.Lock = asyncio.Lock() + # Strong references to background tasks to prevent premature GC + self._background_tasks: Set[asyncio.Task[None]] = set() async def _get_http_client_manager(self): """Get the HTTP client manager instance.""" @@ -103,10 +109,24 @@ async def search_bm25( limit = self._config.search.topk_bm25 try: - # Check if index needs refresh + # Check if index needs refresh (non-blocking: schedule background rebuild, + # current query continues with the existing index to avoid latency). if await self._should_refresh_index(): - logger.info("Collection data changed - refreshing BM25 index") - await self.initialize_index() + # Avoid scheduling multiple concurrent refresh tasks; coalesce while a + # refresh is already in progress. + if not self._refresh_in_progress: + logger.info( + "Collection data changed - scheduling background BM25 refresh " + "(current query uses existing index)" + ) + task = asyncio.create_task(self._background_refresh_index()) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + else: + logger.debug( + "BM25 refresh already in progress; skipping scheduling of a " + "new background refresh task" + ) if not self.bm25_index: logger.error("BM25 index not initialized") @@ -162,6 +182,31 @@ async def search_bm25( logger.error(f"BM25 search failed: {e}") return [] + async def _background_refresh_index(self) -> None: + """ + Rebuild the BM25 index in the background without blocking in-flight queries. + + Uses a lock to ensure only one rebuild runs at a time. If a rebuild is + already in progress when a second collection-change is detected, the + duplicate request is silently discarded — the in-progress rebuild will + capture the latest data anyway. + """ + if self._refresh_in_progress: + logger.debug("BM25 background refresh already running - skipping duplicate") + return + async with self._refresh_lock: + if self._refresh_in_progress: + return + self._refresh_in_progress = True + try: + logger.info("Starting background BM25 index refresh...") + await self.initialize_index() + logger.info("Background BM25 index refresh complete") + except Exception as e: + logger.error(f"Background BM25 refresh failed: {e}") + finally: + self._refresh_in_progress = False + async def _fetch_all_contextual_chunks(self) -> List[Dict[str, Any]]: """Fetch all chunks from contextual collections.""" all_chunks: List[Dict[str, Any]] = [] diff --git a/src/contextual_retrieval/contextual_retriever.py b/src/contextual_retrieval/contextual_retriever.py index b6d4699b..048c131b 100644 --- a/src/contextual_retrieval/contextual_retriever.py +++ b/src/contextual_retrieval/contextual_retriever.py @@ -42,6 +42,7 @@ def __init__( connection_id: Optional[str] = None, config_path: Optional[str] = None, llm_service: Optional["LLMOrchestrationService"] = None, + shared_bm25: Optional[SmartBM25Search] = None, ): """ Initialize contextual retriever. @@ -52,6 +53,10 @@ def __init__( connection_id: Optional connection ID config_path: Optional config file path llm_service: Optional LLM service instance (prevents circular dependency) + shared_bm25: Optional pre-warmed SmartBM25Search singleton. When + provided the retriever skips the expensive index-build step during + initialize() and reuses the already-ready index, eliminating the + cold-start latency on the first query. """ self.qdrant_url = qdrant_url self.environment = environment @@ -70,7 +75,14 @@ def __init__( # Initialize components with configuration self.provider_detection = DynamicProviderDetection(qdrant_url, self.config) self.qdrant_search = QdrantContextualSearch(qdrant_url, self.config) - self.bm25_search = SmartBM25Search(qdrant_url, self.config) + # Use the injected pre-warmed singleton when available; create a fresh + # instance only as a fallback (avoids duplicate Qdrant scroll on startup). + self.bm25_search: SmartBM25Search = ( + shared_bm25 + if shared_bm25 is not None + else SmartBM25Search(qdrant_url, self.config) + ) + self._bm25_is_shared: bool = shared_bm25 is not None self.rank_fusion = DynamicRankFusion(self.config) # State @@ -87,10 +99,18 @@ async def initialize(self) -> bool: try: logger.info("Initializing Contextual Retriever...") - # Initialize BM25 index - bm25_success = await self.bm25_search.initialize_index() - if not bm25_success: - logger.warning("BM25 initialization failed - will skip BM25 search") + # If received a pre-warmed shared BM25 index, reuse it directly. + # This is the normal startup path and adds zero latency to the first query. + if self._bm25_is_shared and self.bm25_search.bm25_index is not None: + logger.info( + "Using pre-warmed shared BM25 index - skipping BM25 build " + f"({len(self.bm25_search.chunk_mapping)} chunks ready)" + ) + else: + # No shared index available - build it now (fallback path). + bm25_success = await self.bm25_search.initialize_index() + if not bm25_success: + logger.warning("BM25 initialization failed - will skip BM25 search") self.initialized = True logger.info("Contextual Retriever initialized successfully") diff --git a/src/guardrails/nemo_rails_adapter.py b/src/guardrails/nemo_rails_adapter.py index 1ae38982..17f6585e 100644 --- a/src/guardrails/nemo_rails_adapter.py +++ b/src/guardrails/nemo_rails_adapter.py @@ -57,14 +57,14 @@ def __init__( self._rails: Optional[LLMRails] = None self._initialized = False - logger.info(f"Initializing NeMoRailsAdapter for environment: {environment}") + logger.debug(f"NeMoRailsAdapter created for environment: {environment}") def _register_custom_provider(self) -> None: """Register DSPy custom LLM provider with NeMo Guardrails.""" try: from src.guardrails.dspy_nemo_adapter import DSPyLLMProviderFactory - logger.info("Registering DSPy custom LLM provider with NeMo Guardrails") + logger.debug("Registering DSPy custom LLM provider with NeMo Guardrails") # NeMo Guardrails' register_llm_provider accepts callable factories at runtime. # We instantiate DSPyLLMProviderFactory first, then register the instance. @@ -74,7 +74,7 @@ def _register_custom_provider(self) -> None: # We use cast to satisfy the type checker while maintaining runtime correctness. factory = DSPyLLMProviderFactory() register_llm_provider("dspy-custom", cast(Type[BaseLLM], factory)) - logger.info("DSPy custom LLM provider registered successfully") + logger.debug("DSPy custom LLM provider registered successfully") except Exception as e: logger.error(f"Failed to register DSPy custom provider: {str(e)}") @@ -86,8 +86,8 @@ def _ensure_initialized(self) -> None: return try: - logger.info( - "Initializing NeMo Guardrails with DSPy LLM and streaming support" + logger.debug( + f"Initializing NeMo Guardrails with DSPy LLM (env={self.environment})" ) from llm_orchestrator_config.llm_manager import LLMManager @@ -106,33 +106,24 @@ def _ensure_initialized(self) -> None: guardrails_loader = get_guardrails_loader() config_path, metadata = guardrails_loader.get_optimized_config_path() - logger.info(f"Loading guardrails config from: {config_path}") + logger.debug(f"Loading guardrails config from: {config_path}") rails_config = RailsConfig.from_path(str(config_path.parent)) rails_config.streaming = True - logger.info("Streaming configuration:") - logger.info(f" Global streaming: {rails_config.streaming}") - - if hasattr(rails_config, "rails") and hasattr(rails_config.rails, "output"): + if metadata.get("optimized", False): + version = metadata.get("version", "unknown") + metrics = metadata.get("metrics", {}) + accuracy = metrics.get("weighted_accuracy", "N/A") if metrics else "N/A" logger.info( - f" Output rails config exists: {rails_config.rails.output}" + f"Guardrails ready: OPTIMIZED config v={version}, " + f"weighted_accuracy={accuracy}, env={self.environment}" ) else: - logger.info(" Output rails config will be loaded from YAML") - - if metadata.get("optimized", False): logger.info( - f"Loaded OPTIMIZED guardrails config (version: {metadata.get('version', 'unknown')})" + f"Guardrails ready: BASE config (no optimization), env={self.environment}" ) - metrics = metadata.get("metrics", {}) - if metrics: - logger.info( - f" Optimization metrics: weighted_accuracy={metrics.get('weighted_accuracy', 'N/A')}" - ) - else: - logger.info("Loaded BASE guardrails config (no optimization)") from src.guardrails.dspy_nemo_adapter import DSPyNeMoLLM @@ -144,18 +135,16 @@ def _ensure_initialized(self) -> None: verbose=False, ) - if ( + if not ( hasattr(self._rails.config, "streaming") and self._rails.config.streaming ): - logger.info("✓ Streaming enabled in NeMo Guardrails configuration") - else: logger.warning( "Streaming not enabled in configuration - this may cause issues" ) self._initialized = True - logger.info("NeMo Guardrails initialized successfully with DSPy LLM") + logger.debug("NeMo Guardrails initialized successfully with DSPy LLM") except Exception as e: logger.error(f"Failed to initialize NeMo Guardrails: {str(e)}") diff --git a/src/intent_data_enrichment/main_enrichment.py b/src/intent_data_enrichment/main_enrichment.py index 16db8c66..9724683a 100644 --- a/src/intent_data_enrichment/main_enrichment.py +++ b/src/intent_data_enrichment/main_enrichment.py @@ -219,14 +219,20 @@ async def enrich_service(service_data: ServiceData) -> EnrichmentResult: qdrant.ensure_collection() # Delete old points before inserting new ones - qdrant.delete_service_points(service_data.service_id) - - # Step 5: Bulk upsert all points (examples + summary) - logger.info( - f"Step 5: Storing {len(enriched_points)} points in Qdrant " - f"({len(service_data.examples)} examples + 1 summary)" - ) - success = qdrant.upsert_service_points(enriched_points) + deleted = qdrant.delete_service_points(service_data.service_id) + if not deleted: + logger.error( + f"Failed to delete existing points for service_id={service_data.service_id}; " + "aborting upsert to avoid stale data." + ) + success = False + else: + # Step 5: Bulk upsert all points (examples + summary) + logger.info( + f"Step 5: Storing {len(enriched_points)} points in Qdrant " + f"({len(service_data.examples)} examples + 1 summary)" + ) + success = qdrant.upsert_service_points(enriched_points) finally: qdrant.close() diff --git a/src/llm_orchestration_service.py b/src/llm_orchestration_service.py index 0d32941d..7f7432fc 100644 --- a/src/llm_orchestration_service.py +++ b/src/llm_orchestration_service.py @@ -55,6 +55,7 @@ from src.utils.query_validator import validate_query_basic from src.guardrails import NeMoRailsAdapter, GuardrailCheckResult from src.contextual_retrieval import ContextualRetriever +from src.contextual_retrieval.bm25_search import SmartBM25Search from src.llm_orchestrator_config.exceptions import ( ContextualRetrieverInitializationError, ContextualRetrievalFailureError, @@ -133,6 +134,13 @@ def __init__(self) -> None: # This allows components to be initialized per-request with proper context self.tool_classifier = None + # Shared BM25 search index pre-warmed at startup. + # Populated by _prewarm_shared_bm25() which is called from the FastAPI + # lifespan so it runs inside the async event loop. Until then it is None + # and each ContextualRetriever will build the index on first query (graceful + # degradation path). + self.shared_bm25_search: Optional[SmartBM25Search] = None + # Initialize shared guardrails adapters at startup (production and testing) self.shared_guardrails_adapters = ( self._initialize_shared_guardrails_at_startup() @@ -168,10 +176,17 @@ def _initialize_shared_guardrails_at_startup(self) -> Dict[str, NeMoRailsAdapter connection_id=None, # Shared configuration, not user-specific ) + # Eagerly trigger the full internal initialization (NeMo config + # loading, LLMRails creation, embedding model download) so that + # the first user query is not penalised by the cold-start cost. + # Without this, _ensure_initialized() runs lazily on the first + guardrails_adapter._ensure_initialized() + elapsed_time = time.time() - start_time adapters[env] = guardrails_adapter logger.info( - f" Guardrails for '{env}' initialized successfully in {elapsed_time:.3f}s" + f" Guardrails for '{env}' fully initialized in {elapsed_time:.3f}s " + f"(NeMo Rails + embedding model loaded)" ) except Exception as e: @@ -197,6 +212,53 @@ def _initialize_shared_guardrails_at_startup(self) -> Dict[str, NeMoRailsAdapter return adapters + async def _prewarm_shared_bm25(self) -> None: + """ + Pre-warm the shared BM25 index at application startup. + + Must be called from an async context (e.g. FastAPI lifespan) so that + asyncio is available for the HTTP calls to Qdrant. Absorbs the + cold-start latency (fetching all chunks + building BM25Okapi corpus) + at deploy time so that the first real user query is not penalised. + + On any failure the method logs a warning and leaves + self.shared_bm25_search as None — the ContextualRetriever will then + fall back to building the index on the first query (graceful degradation). + """ + qdrant_url = os.getenv("QDRANT_URL", "http://qdrant:6333") + logger.info("Pre-warming shared BM25 index at startup...") + prewarm_start = time.time() + try: + bm25 = SmartBM25Search(qdrant_url=qdrant_url) + success = await bm25.initialize_index() + if success: + self.shared_bm25_search = bm25 + elapsed = time.time() - prewarm_start + logger.info( + f"Shared BM25 index pre-warmed in {elapsed:.2f}s " + f"({len(bm25.chunk_mapping)} chunks indexed)" + ) + else: + logger.warning( + "BM25 pre-warming produced an empty index - " + "index will be built on first query instead" + ) + except Exception as e: + logger.warning( + f"BM25 pre-warming failed: {e} - " + f"index will be built on first query (graceful degradation)" + ) + + async def aclose(self) -> None: + """Release all long-lived async resources held by the service. + + Must be awaited during application shutdown (FastAPI lifespan teardown) + to avoid connection leaks from the ToolClassifier's httpx client. + """ + if self.tool_classifier is not None: + await self.tool_classifier.aclose() + logger.debug("LLMOrchestrationService async resources closed") + @observe(name="orchestration_request", as_type="agent") async def process_orchestration_request( self, request: OrchestrationRequest @@ -1786,7 +1848,6 @@ def _initialize_guardrails( environment=environment, connection_id=connection_id ) - logger.info("Guardrails adapter initialized successfully") return guardrails_adapter except Exception as e: @@ -2322,6 +2383,7 @@ def _initialize_contextual_retriever( environment=environment, connection_id=connection_id, llm_service=self, # Inject self to eliminate circular dependency + shared_bm25=self.shared_bm25_search, # Inject pre-warmed BM25 index ) logger.info("Contextual retriever initialized successfully") diff --git a/src/llm_orchestration_service_api.py b/src/llm_orchestration_service_api.py index 2a929db0..0e9b1273 100644 --- a/src/llm_orchestration_service_api.py +++ b/src/llm_orchestration_service_api.py @@ -1,5 +1,6 @@ """LLM Orchestration Service API - FastAPI application.""" +import logging from contextlib import asynccontextmanager from typing import Any, AsyncGenerator, Dict @@ -49,10 +50,23 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: """Application lifespan manager.""" # Startup logger.info("Starting LLM Orchestration Service API") + + # nemoguardrails.actions.action_dispatcher logs every action it registers + logging.getLogger("nemoguardrails.actions.action_dispatcher").setLevel( + logging.WARNING + ) + logging.getLogger("langfuse").setLevel(logging.ERROR) + try: app.state.orchestration_service = LLMOrchestrationService() logger.info("LLM Orchestration Service initialized successfully") + # Pre-warm shared BM25 index so the first query is never penalised by + # the cold-start cost of scrolling all Qdrant chunks + building the index. + logger.info("Pre-warming shared BM25 index...") + await app.state.orchestration_service._prewarm_shared_bm25() + logger.info("BM25 pre-warming complete") + # Initialize rate limiter if enabled if StreamConfig.RATE_LIMIT_ENABLED: app.state.rate_limiter = RateLimiter( @@ -71,8 +85,11 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # Shutdown logger.info("Shutting down LLM Orchestration Service API") - # Clean up resources if needed - if hasattr(app.state, "orchestration_service"): + if ( + hasattr(app.state, "orchestration_service") + and app.state.orchestration_service is not None + ): + await app.state.orchestration_service.aclose() app.state.orchestration_service = None @@ -841,60 +858,6 @@ def refresh_prompt_config(http_request: Request) -> Dict[str, Any]: }, ) from e - try: - success = orchestration_service.prompt_config_loader.force_refresh() - - if success: - # Get prompt metadata without exposing content (security) - custom_instructions = ( - orchestration_service.prompt_config_loader.get_custom_instructions() - ) - prompt_length = len(custom_instructions) - - # Generate hash for verification purposes (without exposing content) - import hashlib - - prompt_hash = hashlib.sha256(custom_instructions.encode()).hexdigest()[:16] - - logger.info( - f"Prompt configuration cache refreshed successfully ({prompt_length} chars)" - ) - - return { - "refreshed": True, - "message": "Prompt configuration refreshed successfully", - "prompt_length": prompt_length, - "content_hash": prompt_hash, # Safe: hash instead of preview - } - else: - # No fresh data loaded - could be fetch failure or truly not found - error_id = generate_error_id() - logger.warning( - f"[{error_id}] Prompt configuration refresh returned empty result" - ) - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail={ - "error": "No prompt configuration found in database", - "error_id": error_id, - }, - ) - - except HTTPException: - # Re-raise HTTP exceptions as-is - raise - except Exception as e: - # Unexpected errors during refresh - error_id = generate_error_id() - logger.error(f"[{error_id}] Failed to refresh prompt configuration: {e}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail={ - "error": "Failed to refresh prompt configuration", - "error_id": error_id, - }, - ) from e - if __name__ == "__main__": logger.info("Starting LLM Orchestration Service API server on port 8100") diff --git a/src/llm_orchestrator_config/vault/vault_client.py b/src/llm_orchestrator_config/vault/vault_client.py index b0c3a3d6..241f019e 100644 --- a/src/llm_orchestrator_config/vault/vault_client.py +++ b/src/llm_orchestrator_config/vault/vault_client.py @@ -142,10 +142,7 @@ def is_authenticated(self) -> bool: try: # If using proxy mode, skip token checks if not self.use_token_file: - logger.debug( - "Using vault agent proxy - skipping token authentication check" - ) - # Just verify vault is accessible + # Just verify vault is accessible (no token needed with proxy) return self.is_vault_available() # Check token is available @@ -182,27 +179,10 @@ def is_vault_available(self) -> bool: """ try: response = self.client.sys.read_health_status() - logger.debug(f"Vault health response type: {type(response)}") - logger.debug(f"Vault health response: {response}") # For Vault health endpoint, we primarily check the HTTP status code if hasattr(response, "status_code"): - is_available = response.status_code == 200 - logger.debug( - f"Vault health check: status_code={response.status_code}, available={is_available}" - ) - - # Try to get additional details from response body if available - try: - if hasattr(response, "json") and callable(response.json): - health_data = response.json() - logger.debug(f"Vault health details: {health_data}") - except Exception as e: - logger.debug( - f"Could not parse health response body (this is normal): {e}" - ) - - return is_available + return response.status_code == 200 else: # Fallback for non-Response objects (direct dict) if isinstance(response, dict): @@ -291,7 +271,6 @@ def list_secrets(self, path: str) -> Optional[list[str]]: path=path, mount_point=self.mount_point, ) - logger.debug(f"List secrets response: {response}") if response and "data" in response: keys = response["data"].get("keys", []) diff --git a/src/tool_classifier/classifier.py b/src/tool_classifier/classifier.py index 0d4644df..f18ef3ec 100644 --- a/src/tool_classifier/classifier.py +++ b/src/tool_classifier/classifier.py @@ -99,6 +99,14 @@ def __init__( f"(Qdrant: {self._qdrant_base_url})" ) + async def aclose(self) -> None: + """Close the shared httpx client and release connection pool resources. + + Must be awaited during application shutdown to avoid connection leaks. + """ + await self._qdrant_client.aclose() + logger.debug("ToolClassifier Qdrant httpx client closed") + async def classify( self, query: str, diff --git a/src/tool_classifier/workflows/service_workflow.py b/src/tool_classifier/workflows/service_workflow.py index dbb52113..bb72f785 100644 --- a/src/tool_classifier/workflows/service_workflow.py +++ b/src/tool_classifier/workflows/service_workflow.py @@ -581,11 +581,11 @@ async def execute_async( # Skip service discovery — use hybrid search match directly matched_service_id = context.get("matched_service_id") matched_service_name = context.get("matched_service_name") - rrf_score = context.get("rrf_score", 0) + cosine_score = context.get("cosine_score", 0.0) logger.info( f"[{chat_id}] HIGH-CONFIDENCE SERVICE MATCH (non-streaming): " - f"{matched_service_name} (rrf_score={rrf_score:.6f}) - " + f"{matched_service_name} (cosine_score={cosine_score:.4f}) - " f"skipping discovery" ) @@ -634,7 +634,7 @@ async def execute_async( context=context, costs_metric=costs_metric, ) - time_metric["service.discovery"] = time.time() - start_time + time_metric["service.intent_detection"] = time.time() - start_time else: # LEGACY PATH: No hybrid search metadata (classifier disabled or error) @@ -784,11 +784,11 @@ async def execute_streaming( if needs_llm_confirmation is False: # HIGH CONFIDENCE PATH: Skip discovery, use matched service matched_service_name = context.get("matched_service_name") - rrf_score = context.get("rrf_score", 0) + cosine_score = context.get("cosine_score", 0.0) logger.info( f"[{chat_id}] HIGH-CONFIDENCE SERVICE MATCH (streaming): " - f"{matched_service_name} (rrf_score={rrf_score:.6f})" + f"{matched_service_name} (cosine_score={cosine_score:.4f})" ) top_results = context.get("top_results", []) @@ -831,7 +831,7 @@ async def execute_streaming( context=context, costs_metric=costs_metric, ) - time_metric["service.discovery"] = time.time() - start_time + time_metric["service.intent_detection"] = time.time() - start_time else: # LEGACY PATH: Full service discovery (original behavior) diff --git a/src/utils/prompt_config_loader.py b/src/utils/prompt_config_loader.py index 8df8945b..01a40c84 100644 --- a/src/utils/prompt_config_loader.py +++ b/src/utils/prompt_config_loader.py @@ -229,7 +229,7 @@ def _load_from_ruuter_with_retry(self) -> Optional[str]: # Unwrap Ruuter's response wrapper if present if isinstance(data, dict) and "response" in data: - logger.info("Unwrapping 'response' key") + logger.debug("Unwrapping 'response' key") data = data["response"] # Now extract prompt from the unwrapped data @@ -238,25 +238,25 @@ def _load_from_ruuter_with_retry(self) -> Optional[str]: first_elem_keys = ( list(data[0].keys()) if isinstance(data[0], dict) else [] ) - logger.info( + logger.debug( f"Extracting from list, first element keys: {first_elem_keys}" ) prompt = data[0].get("prompt", "").strip() elif isinstance(data, dict): # Dict format: {"id": 1, "prompt": "..."} - logger.info(f"Extracting from dict, keys: {list(data.keys())}") + logger.debug(f"Extracting from dict, keys: {list(data.keys())}") prompt = data.get("prompt", "").strip() else: logger.warning( f"Unexpected data type: {type(data).__name__}, structure not recognized" ) - logger.info( + logger.debug( f"Extracted prompt length: {len(prompt) if prompt else 0}" ) if prompt: - logger.info( + logger.debug( f"Loaded prompt on attempt {attempt} ({len(prompt)} chars)" ) return prompt