diff --git a/python/src/server/services/threading_service.py b/python/src/server/services/threading_service.py index a79180cf..8ee95ac4 100644 --- a/python/src/server/services/threading_service.py +++ b/python/src/server/services/threading_service.py @@ -97,9 +97,8 @@ class RateLimiter: wait_time = self._calculate_wait_time(estimated_tokens) if wait_time > 0: logfire_logger.info( - f"Rate limiting: waiting {wait_time:.1f}s", - tokens=estimated_tokens, - current_usage=self._get_current_usage(), + f"Rate limiting: waiting {wait_time:.1f}s - " + f"tokens: {estimated_tokens}, current usage: {self._get_current_usage()}" ) await asyncio.sleep(wait_time) return await self.acquire(estimated_tokens) @@ -198,17 +197,15 @@ class MemoryAdaptiveDispatcher: # Reduce workers when memory is high workers = max(1, base // 2) logfire_logger.warning( - "High memory usage detected, reducing workers", - memory_percent=metrics.memory_percent, - workers=workers, + f"High memory usage detected, reducing workers - " + f"memory: {metrics.memory_percent:.1f}%, workers: {workers}" ) elif metrics.cpu_percent > self.config.cpu_threshold * 100: # Reduce workers when CPU is high workers = max(1, base // 2) logfire_logger.warning( - "High CPU usage detected, reducing workers", - cpu_percent=metrics.cpu_percent, - workers=workers, + f"High CPU usage detected, reducing workers - " + f"cpu: {metrics.cpu_percent:.1f}%, workers: {workers}" ) elif metrics.memory_percent < 50 and metrics.cpu_percent < 50: # Increase workers when resources are available @@ -238,12 +235,10 @@ class MemoryAdaptiveDispatcher: semaphore = asyncio.Semaphore(optimal_workers) logfire_logger.info( - "Starting adaptive processing", - items_count=len(items), - workers=optimal_workers, - mode=mode, - memory_percent=self.last_metrics.memory_percent, - cpu_percent=self.last_metrics.cpu_percent, + f"Starting adaptive processing - items: {len(items)}, " + f"workers: {optimal_workers}, mode: {mode}, " + f"memory: {self.last_metrics.memory_percent:.1f}%, " + f"cpu: {self.last_metrics.cpu_percent:.1f}%" ) # Track active workers @@ -474,7 +469,7 @@ class ThreadingService: self._running = True self._health_check_task = asyncio.create_task(self._health_check_loop()) - logfire_logger.info("Threading service started", config=self.config.__dict__) + logfire_logger.info(f"Threading service started with config: {self.config.__dict__}") async def stop(self): """Stop the threading service""" @@ -510,7 +505,7 @@ class ThreadingService: finally: duration = time.time() - start_time logfire_logger.debug( - "Rate limited operation completed", duration=duration, tokens=estimated_tokens + f"Rate limited operation completed - duration: {duration:.2f}s, tokens: {estimated_tokens}" ) async def run_cpu_intensive(self, func: Callable, *args, **kwargs) -> Any: @@ -562,37 +557,35 @@ class ThreadingService: # Log system metrics logfire_logger.info( - "System health check", - memory_percent=metrics.memory_percent, - cpu_percent=metrics.cpu_percent, - available_memory_gb=metrics.available_memory_gb, - active_threads=metrics.active_threads, - active_websockets=len(self.websocket_processor.active_connections), + f"System health check - memory: {metrics.memory_percent:.1f}%, " + f"cpu: {metrics.cpu_percent:.1f}%, " + f"available memory: {metrics.available_memory_gb:.2f}GB, " + f"threads: {metrics.active_threads}, " + f"websockets: {len(self.websocket_processor.active_connections)}" ) # Alert on critical thresholds if metrics.memory_percent > 90: logfire_logger.warning( - "Critical memory usage", memory_percent=metrics.memory_percent + f"Critical memory usage: {metrics.memory_percent:.1f}%" ) # Force garbage collection gc.collect() if metrics.cpu_percent > 95: - logfire_logger.warning("Critical CPU usage", cpu_percent=metrics.cpu_percent) + logfire_logger.warning(f"Critical CPU usage: {metrics.cpu_percent:.1f}%") # Check for memory leaks (too many threads) if metrics.active_threads > self.config.max_workers * 3: logfire_logger.warning( - "High thread count detected", - active_threads=metrics.active_threads, - max_expected=self.config.max_workers * 3, + f"High thread count detected - active: {metrics.active_threads}, " + f"max expected: {self.config.max_workers * 3}" ) await asyncio.sleep(self.config.health_check_interval) except Exception as e: - logfire_logger.error("Health check failed", error=str(e)) + logfire_logger.error(f"Health check failed: {str(e)}") await asyncio.sleep(self.config.health_check_interval)