From 468463997dc7a194769b59a39fe7b2fb78fbf5b0 Mon Sep 17 00:00:00 2001 From: Rasmus Widing Date: Wed, 20 Aug 2025 12:10:59 +0300 Subject: [PATCH] Complete logging fixes for all statements in threading service Applied the extra parameter pattern to all remaining logging statements (11 more) to ensure consistency and prevent runtime errors when any code path is executed. This completes the fix for the entire file. --- .../src/server/services/threading_service.py | 70 ++++++++++++------- 1 file changed, 44 insertions(+), 26 deletions(-) diff --git a/python/src/server/services/threading_service.py b/python/src/server/services/threading_service.py index 5db8ae41..b3a0053e 100644 --- a/python/src/server/services/threading_service.py +++ b/python/src/server/services/threading_service.py @@ -98,8 +98,10 @@ class RateLimiter: if wait_time > 0: logfire_logger.info( f"Rate limiting: waiting {wait_time:.1f}s", - tokens=estimated_tokens, - current_usage=self._get_current_usage(), + extra={ + "tokens": estimated_tokens, + "current_usage": self._get_current_usage(), + } ) await asyncio.sleep(wait_time) return await self.acquire(estimated_tokens) @@ -199,16 +201,20 @@ class MemoryAdaptiveDispatcher: workers = max(1, base // 2) logfire_logger.warning( "High memory usage detected, reducing workers", - memory_percent=metrics.memory_percent, - workers=workers, + extra={ + "memory_percent": metrics.memory_percent, + "workers": workers, + } ) elif metrics.cpu_percent > self.config.cpu_threshold * 100: # Reduce workers when CPU is high workers = max(1, base // 2) logfire_logger.warning( "High CPU usage detected, reducing workers", - cpu_percent=metrics.cpu_percent, - workers=workers, + extra={ + "cpu_percent": metrics.cpu_percent, + "workers": workers, + } ) elif metrics.memory_percent < 50 and metrics.cpu_percent < 50: # Increase workers when resources are available @@ -239,11 +245,13 @@ class MemoryAdaptiveDispatcher: logfire_logger.info( "Starting adaptive processing", - items_count=len(items), - workers=optimal_workers, - mode=mode, - memory_percent=self.last_metrics.memory_percent, - cpu_percent=self.last_metrics.cpu_percent, + extra={ + "items_count": len(items), + "workers": optimal_workers, + "mode": mode, + "memory_percent": self.last_metrics.memory_percent, + "cpu_percent": self.last_metrics.cpu_percent, + } ) # Track active workers @@ -318,7 +326,8 @@ class MemoryAdaptiveDispatcher: del active_workers[worker_id] logfire_logger.error( - f"Processing failed for item {index}", error=str(e), item_index=index + f"Processing failed for item {index}", + extra={"error": str(e), "item_index": index} ) return None @@ -334,10 +343,12 @@ class MemoryAdaptiveDispatcher: success_rate = len(successful_results) / len(items) * 100 logfire_logger.info( "Adaptive processing completed", - total_items=len(items), - successful=len(successful_results), - success_rate=f"{success_rate:.1f}%", - workers_used=optimal_workers, + extra={ + "total_items": len(items), + "successful": len(successful_results), + "success_rate": f"{success_rate:.1f}%", + "workers_used": optimal_workers, + } ) return successful_results @@ -355,7 +366,8 @@ class WebSocketSafeProcessor: await websocket.accept() self.active_connections.append(websocket) logfire_logger.info( - "WebSocket client connected", total_connections=len(self.active_connections) + "WebSocket client connected", + extra={"total_connections": len(self.active_connections)} ) def disconnect(self, websocket: WebSocket): @@ -363,7 +375,8 @@ class WebSocketSafeProcessor: if websocket in self.active_connections: self.active_connections.remove(websocket) logfire_logger.info( - "WebSocket client disconnected", remaining_connections=len(self.active_connections) + "WebSocket client disconnected", + extra={"remaining_connections": len(self.active_connections)} ) async def broadcast_progress(self, message: dict[str, Any]): @@ -564,17 +577,20 @@ class ThreadingService: # Log system metrics logfire_logger.info( "System health check", - memory_percent=metrics.memory_percent, - cpu_percent=metrics.cpu_percent, - available_memory_gb=metrics.available_memory_gb, - active_threads=metrics.active_threads, - active_websockets=len(self.websocket_processor.active_connections), + extra={ + "memory_percent": metrics.memory_percent, + "cpu_percent": metrics.cpu_percent, + "available_memory_gb": metrics.available_memory_gb, + "active_threads": metrics.active_threads, + "active_websockets": len(self.websocket_processor.active_connections), + } ) # Alert on critical thresholds if metrics.memory_percent > 90: logfire_logger.warning( - "Critical memory usage", memory_percent=metrics.memory_percent + "Critical memory usage", + extra={"memory_percent": metrics.memory_percent} ) # Force garbage collection gc.collect() @@ -588,8 +604,10 @@ class ThreadingService: if metrics.active_threads > self.config.max_workers * 3: logfire_logger.warning( "High thread count detected", - active_threads=metrics.active_threads, - max_expected=self.config.max_workers * 3, + extra={ + "active_threads": metrics.active_threads, + "max_expected": self.config.max_workers * 3, + } ) await asyncio.sleep(self.config.health_check_interval)