From cb2b3278691620d34a084048898d8b7972e43453 Mon Sep 17 00:00:00 2001 From: "cool.gitter.choco" Date: Tue, 18 Mar 2025 10:07:11 -0600 Subject: [PATCH] unlimitted logging!!!! --- .gitignore | 3 +- app.py | 120 +++++++++++++----- entrypoint.sh | 46 +------ routes/config.py | 70 ++++++++++- routes/utils/celery_config.py | 100 +++++++++------ routes/utils/celery_manager.py | 214 +++++++++++++++++++++++++++++++++ routes/utils/celery_tasks.py | 67 +++++++++-- 7 files changed, 494 insertions(+), 126 deletions(-) create mode 100644 routes/utils/celery_manager.py diff --git a/.gitignore b/.gitignore index c69773a..77f64e1 100755 --- a/.gitignore +++ b/.gitignore @@ -31,4 +31,5 @@ config/state/queue_state.json output.log queue_state.json search_demo.py -celery_worker.log \ No newline at end of file +celery_worker.log +logs/spotizerr.log diff --git a/app.py b/app.py index eb01fde..d575c64 100755 --- a/app.py +++ b/app.py @@ -9,38 +9,72 @@ from routes.prgs import prgs_bp from routes.config import config_bp from routes.artist import artist_bp import logging +import logging.handlers import time from pathlib import Path import os -import argparse +import atexit +import sys -# Import Celery configuration -try: - from routes.utils.celery_tasks import celery_app - has_celery = True -except ImportError: - has_celery = False +# Import Celery configuration and manager +from routes.utils.celery_tasks import celery_app +from routes.utils.celery_manager import celery_manager + +# Configure application-wide logging +def setup_logging(): + """Configure application-wide logging with rotation""" + # Create logs directory if it doesn't exist + logs_dir = Path('logs') + logs_dir.mkdir(exist_ok=True) + + # Set up log file paths + main_log = logs_dir / 'spotizerr.log' + + # Configure root logger + root_logger = logging.getLogger() + root_logger.setLevel(logging.INFO) + + # Log formatting + log_format = logging.Formatter( + '%(asctime)s [%(processName)s:%(threadName)s] [%(name)s] [%(levelname)s] - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # File handler with rotation (10 MB max, keep 5 backups) + file_handler = logging.handlers.RotatingFileHandler( + main_log, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' + ) + file_handler.setFormatter(log_format) + file_handler.setLevel(logging.INFO) + + # Console handler for stderr + console_handler = logging.StreamHandler(sys.stderr) + console_handler.setFormatter(log_format) + console_handler.setLevel(logging.INFO) + + # Add handlers to root logger + root_logger.addHandler(file_handler) + root_logger.addHandler(console_handler) + + # Set up specific loggers + for logger_name in ['werkzeug', 'celery', 'routes', 'flask', 'waitress']: + module_logger = logging.getLogger(logger_name) + module_logger.setLevel(logging.INFO) + # Handlers are inherited from root logger + + # Enable propagation for all loggers + logging.getLogger('celery').propagate = True + + # Notify successful setup + root_logger.info("Logging system initialized") + + # Return the main file handler for permissions adjustment + return file_handler def create_app(): app = Flask(__name__) - # Configure basic logging - log_file = 'flask_server.log' - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s', - handlers=[ - logging.FileHandler(log_file), - logging.StreamHandler() - ] - ) - - os.chmod(log_file, 0o666) - - # Get Flask's logger - logger = logging.getLogger('werkzeug') - logger.setLevel(logging.INFO) - + # Set up CORS CORS(app) # Register blueprints @@ -53,7 +87,6 @@ def create_app(): app.register_blueprint(artist_bp, url_prefix='/api/artist') app.register_blueprint(prgs_bp, url_prefix='/api/prgs') - # Serve frontend @app.route('/') def serve_index(): @@ -98,27 +131,48 @@ def create_app(): @app.before_request def log_request(): request.start_time = time.time() - logger.info(f"Request: {request.method} {request.path}") + app.logger.debug(f"Request: {request.method} {request.path}") @app.after_request def log_response(response): - duration = round((time.time() - request.start_time) * 1000, 2) - logger.info(f"Response: {response.status} | Duration: {duration}ms") + if hasattr(request, 'start_time'): + duration = round((time.time() - request.start_time) * 1000, 2) + app.logger.debug(f"Response: {response.status} | Duration: {duration}ms") return response # Error logging @app.errorhandler(Exception) def handle_exception(e): - logger.error(f"Server error: {str(e)}", exc_info=True) + app.logger.error(f"Server error: {str(e)}", exc_info=True) return "Internal Server Error", 500 return app -if __name__ == '__main__': - # Configure waitress logger - logger = logging.getLogger('waitress') - logger.setLevel(logging.INFO) +def start_celery_workers(): + """Start Celery workers with dynamic configuration""" + logging.info("Starting Celery workers with dynamic configuration") + celery_manager.start() + # Register shutdown handler + atexit.register(celery_manager.stop) + +if __name__ == '__main__': + # Configure application logging + log_handler = setup_logging() + + # Set file permissions for log files if needed + try: + os.chmod(log_handler.baseFilename, 0o666) + except: + logging.warning("Could not set permissions on log file") + + # Log application startup + logging.info("=== Spotizerr Application Starting ===") + + # Start Celery workers + start_celery_workers() + + # Create and start Flask app app = create_app() logging.info("Starting Flask server on port 7171") from waitress import serve diff --git a/entrypoint.sh b/entrypoint.sh index 62c0453..e7dcc74 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -6,40 +6,10 @@ if [ -n "${UMASK}" ]; then umask "${UMASK}" fi -# Function to start the application -start_application() { - # Start Flask app in the background - echo "Starting Flask application..." - python app.py & - - # Wait a moment for Flask to initialize - sleep 2 - - # Start Celery worker - echo "Starting Celery worker..." - celery -A routes.utils.celery_tasks.celery_app worker --loglevel=info --concurrency=${MAX_CONCURRENT_DL:-3} -Q downloads & - - # Keep the script running - wait -} - -# Check if custom command was provided -if [ $# -gt 0 ]; then - # Custom command provided, use it instead of default app startup - RUN_COMMAND="$@" -else - # No custom command, use our default application startup - RUN_COMMAND="start_application" -fi - # Check if both PUID and PGID are not set if [ -z "${PUID}" ] && [ -z "${PGID}" ]; then # Run as root directly - if [ $# -gt 0 ]; then - exec "$@" - else - start_application - fi + exec "$@" else # Verify both PUID and PGID are set if [ -z "${PUID}" ] || [ -z "${PGID}" ]; then @@ -49,11 +19,7 @@ else # Check for root user request if [ "${PUID}" -eq 0 ] && [ "${PGID}" -eq 0 ]; then - if [ $# -gt 0 ]; then - exec "$@" - else - start_application - fi + exec "$@" else # Check if the group with the specified GID already exists if getent group "${PGID}" >/dev/null; then @@ -79,10 +45,6 @@ else chown -R "${USER_NAME}:${GROUP_NAME}" /app || true # Run as specified user - if [ $# -gt 0 ]; then - exec gosu "${USER_NAME}" "$@" - else - exec gosu "${USER_NAME}" bash -c "$(declare -f start_application); start_application" - fi + exec gosu "${USER_NAME}" "$@" fi -fi +fi \ No newline at end of file diff --git a/routes/config.py b/routes/config.py index 5a0b494..fb46c07 100644 --- a/routes/config.py +++ b/routes/config.py @@ -2,10 +2,25 @@ from flask import Blueprint, jsonify, request import json from pathlib import Path import logging +import threading +import time config_bp = Blueprint('config_bp', __name__) CONFIG_PATH = Path('./config/main.json') +# Flag for config change notifications +config_changed = False +last_config = {} + +# Define parameters that should trigger notification when changed +NOTIFY_PARAMETERS = [ + 'maxConcurrentDownloads', + 'service', + 'fallback', + 'spotifyQuality', + 'deezerQuality' +] + def get_config(): try: if not CONFIG_PATH.exists(): @@ -19,6 +34,34 @@ def get_config(): logging.error(f"Error reading config: {str(e)}") return None +def save_config(config_data): + """Save config and track changes to important parameters""" + global config_changed, last_config + + try: + # Load current config for comparison + current_config = get_config() or {} + + # Check if any notify parameters changed + for param in NOTIFY_PARAMETERS: + if param in config_data: + if param not in current_config or config_data[param] != current_config.get(param): + config_changed = True + logging.info(f"Config parameter '{param}' changed from '{current_config.get(param)}' to '{config_data[param]}'") + + # Save last known config + last_config = config_data.copy() + + # Write the config file + CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) + with open(CONFIG_PATH, 'w') as f: + json.dump(config_data, f, indent=2) + + return True + except Exception as e: + logging.error(f"Error saving config: {str(e)}") + return False + @config_bp.route('/config', methods=['GET']) def handle_config(): config = get_config() @@ -58,13 +101,32 @@ def update_config(): if not isinstance(new_config, dict): return jsonify({"error": "Invalid config format"}), 400 - CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) - with open(CONFIG_PATH, 'w') as f: - json.dump(new_config, f, indent=2) + if not save_config(new_config): + return jsonify({"error": "Failed to save config"}), 500 return jsonify({"message": "Config updated successfully"}) except json.JSONDecodeError: return jsonify({"error": "Invalid JSON data"}), 400 except Exception as e: logging.error(f"Error updating config: {str(e)}") - return jsonify({"error": "Failed to update config"}), 500 \ No newline at end of file + return jsonify({"error": "Failed to update config"}), 500 + +@config_bp.route('/config/check', methods=['GET']) +def check_config_changes(): + """ + Check if config has changed since last check + Returns: Status of config changes + """ + global config_changed + + # Get current state + has_changed = config_changed + + # Reset flag after checking + if has_changed: + config_changed = False + + return jsonify({ + "changed": has_changed, + "last_config": last_config + }) \ No newline at end of file diff --git a/routes/utils/celery_config.py b/routes/utils/celery_config.py index 60ce9fd..b9bb4ad 100644 --- a/routes/utils/celery_config.py +++ b/routes/utils/celery_config.py @@ -1,53 +1,75 @@ import os import json +import logging +from pathlib import Path -# Load configuration from ./config/main.json and get the max_concurrent_dl value. +# Configure logging +logger = logging.getLogger(__name__) + +# Redis configuration +REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') +REDIS_PORT = os.getenv('REDIS_PORT', '6379') +REDIS_DB = os.getenv('REDIS_DB', '0') +REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}" +REDIS_BACKEND = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}" + +# Config path CONFIG_PATH = './config/main.json' -try: - with open(CONFIG_PATH, 'r') as f: - config_data = json.load(f) - MAX_CONCURRENT_DL = config_data.get("maxConcurrentDownloads", 3) - MAX_RETRIES = config_data.get("maxRetries", 3) - RETRY_DELAY = config_data.get("retryDelaySeconds", 5) - RETRY_DELAY_INCREASE = config_data.get("retry_delay_increase", 5) -except Exception as e: - print(f"Error loading configuration: {e}") - # Fallback to default values if there's an error reading the config. - MAX_CONCURRENT_DL = 3 - MAX_RETRIES = 3 - RETRY_DELAY = 5 - RETRY_DELAY_INCREASE = 5 - def get_config_params(): """ - Get common download parameters from the config file. - This centralizes parameter retrieval and reduces redundancy in API calls. + Get configuration parameters from the config file. Returns: - dict: A dictionary containing common parameters from config + dict: A dictionary containing configuration parameters """ try: + if not Path(CONFIG_PATH).exists(): + return { + 'service': 'spotify', + 'spotify': '', + 'deezer': '', + 'fallback': False, + 'spotifyQuality': 'NORMAL', + 'deezerQuality': 'MP3_128', + 'realTime': False, + 'customDirFormat': '%ar_album%/%album%', + 'customTrackFormat': '%tracknum%. %music%', + 'tracknum_padding': True, + 'maxConcurrentDownloads': 3, + 'maxRetries': 3, + 'retryDelaySeconds': 5, + 'retry_delay_increase': 5 + } + with open(CONFIG_PATH, 'r') as f: config = json.load(f) - return { - 'service': config.get('service', 'spotify'), - 'spotify': config.get('spotify', ''), - 'deezer': config.get('deezer', ''), - 'fallback': config.get('fallback', False), - 'spotifyQuality': config.get('spotifyQuality', 'NORMAL'), - 'deezerQuality': config.get('deezerQuality', 'MP3_128'), - 'realTime': config.get('realTime', False), - 'customDirFormat': config.get('customDirFormat', '%ar_album%/%album%'), - 'customTrackFormat': config.get('customTrackFormat', '%tracknum%. %music%'), - 'tracknum_padding': config.get('tracknum_padding', True), - 'maxRetries': config.get('maxRetries', 3), - 'retryDelaySeconds': config.get('retryDelaySeconds', 5), - 'retry_delay_increase': config.get('retry_delay_increase', 5) + # Set defaults for missing values + defaults = { + 'service': 'spotify', + 'spotify': '', + 'deezer': '', + 'fallback': False, + 'spotifyQuality': 'NORMAL', + 'deezerQuality': 'MP3_128', + 'realTime': False, + 'customDirFormat': '%ar_album%/%album%', + 'customTrackFormat': '%tracknum%. %music%', + 'tracknum_padding': True, + 'maxConcurrentDownloads': 3, + 'maxRetries': 3, + 'retryDelaySeconds': 5, + 'retry_delay_increase': 5 } + + for key, value in defaults.items(): + if key not in config: + config[key] = value + + return config except Exception as e: - print(f"Error reading config for parameters: {e}") + logger.error(f"Error reading config: {e}") # Return defaults if config read fails return { 'service': 'spotify', @@ -60,14 +82,18 @@ def get_config_params(): 'customDirFormat': '%ar_album%/%album%', 'customTrackFormat': '%tracknum%. %music%', 'tracknum_padding': True, + 'maxConcurrentDownloads': 3, 'maxRetries': 3, 'retryDelaySeconds': 5, 'retry_delay_increase': 5 } -# Celery configuration -REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') -REDIS_BACKEND = os.environ.get('REDIS_BACKEND', 'redis://localhost:6379/0') +# Load configuration values we need for Celery +config = get_config_params() +MAX_CONCURRENT_DL = config.get('maxConcurrentDownloads', 3) +MAX_RETRIES = config.get('maxRetries', 3) +RETRY_DELAY = config.get('retryDelaySeconds', 5) +RETRY_DELAY_INCREASE = config.get('retry_delay_increase', 5) # Define task queues task_queues = { diff --git a/routes/utils/celery_manager.py b/routes/utils/celery_manager.py new file mode 100644 index 0000000..6268070 --- /dev/null +++ b/routes/utils/celery_manager.py @@ -0,0 +1,214 @@ +import os +import json +import signal +import subprocess +import logging +import time +import atexit +from pathlib import Path +import threading +import queue +import sys + +# Configure logging +logger = logging.getLogger(__name__) + +# Configuration +CONFIG_PATH = './config/main.json' +CELERY_APP = 'routes.utils.celery_tasks.celery_app' +CELERY_PROCESS = None +CONFIG_CHECK_INTERVAL = 30 # seconds + +class CeleryManager: + """ + Manages Celery workers dynamically based on configuration changes. + """ + + def __init__(self): + self.celery_process = None + self.current_worker_count = 0 + self.monitoring_thread = None + self.running = False + self.log_queue = queue.Queue() + self.output_threads = [] + + def start(self): + """Start the Celery manager and initial workers""" + if self.running: + return + + self.running = True + + # Start initial workers + self._update_workers() + + # Start monitoring thread for config changes + self.monitoring_thread = threading.Thread(target=self._monitor_config, daemon=True) + self.monitoring_thread.start() + + # Register shutdown handler + atexit.register(self.stop) + + def stop(self): + """Stop the Celery manager and all workers""" + self.running = False + + # Stop all running threads + for thread in self.output_threads: + if thread.is_alive(): + # We can't really stop the threads, but they'll exit on their own + # when the process is terminated since they're daemon threads + pass + + if self.celery_process: + logger.info("Stopping Celery workers...") + try: + # Send SIGTERM to process group + os.killpg(os.getpgid(self.celery_process.pid), signal.SIGTERM) + self.celery_process.wait(timeout=5) + except (subprocess.TimeoutExpired, ProcessLookupError): + # Force kill if not terminated + try: + os.killpg(os.getpgid(self.celery_process.pid), signal.SIGKILL) + except ProcessLookupError: + pass + + self.celery_process = None + self.current_worker_count = 0 + + def _get_worker_count(self): + """Get the configured worker count from config file""" + try: + if not Path(CONFIG_PATH).exists(): + return 3 # Default + + with open(CONFIG_PATH, 'r') as f: + config = json.load(f) + + return int(config.get('maxConcurrentDownloads', 3)) + except Exception as e: + logger.error(f"Error reading worker count from config: {e}") + return 3 # Default on error + + def _update_workers(self): + """Update workers if needed based on configuration""" + new_worker_count = self._get_worker_count() + + if new_worker_count == self.current_worker_count and self.celery_process and self.celery_process.poll() is None: + return # No change and process is running + + logger.info(f"Updating Celery workers from {self.current_worker_count} to {new_worker_count}") + + # Stop existing workers if running + if self.celery_process: + try: + os.killpg(os.getpgid(self.celery_process.pid), signal.SIGTERM) + self.celery_process.wait(timeout=5) + except (subprocess.TimeoutExpired, ProcessLookupError): + try: + os.killpg(os.getpgid(self.celery_process.pid), signal.SIGKILL) + except ProcessLookupError: + pass + + # Clear output threads list + self.output_threads = [] + + # Start new workers with updated concurrency + try: + # Set environment variables to configure Celery logging + env = os.environ.copy() + env['PYTHONUNBUFFERED'] = '1' # Ensure Python output is unbuffered + + # Construct command with extra logging options + cmd = [ + 'celery', + '-A', CELERY_APP, + 'worker', + '--loglevel=info', + f'--concurrency={new_worker_count}', + '-Q', 'downloads', + # Add timestamp to Celery logs + '--logfile=-', # Output logs to stdout + '--without-heartbeat', # Reduce log noise + '--without-gossip', # Reduce log noise + '--without-mingle' # Reduce log noise + ] + + self.celery_process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + preexec_fn=os.setsid, # New process group for clean termination + universal_newlines=True, + bufsize=1 # Line buffered + ) + + self.current_worker_count = new_worker_count + logger.info(f"Started Celery workers with concurrency {new_worker_count}") + + # Start non-blocking output reader threads for both stdout and stderr + stdout_thread = threading.Thread( + target=self._process_output_reader, + args=(self.celery_process.stdout, "STDOUT"), + daemon=True + ) + stdout_thread.start() + self.output_threads.append(stdout_thread) + + stderr_thread = threading.Thread( + target=self._process_output_reader, + args=(self.celery_process.stderr, "STDERR"), + daemon=True + ) + stderr_thread.start() + self.output_threads.append(stderr_thread) + + except Exception as e: + logger.error(f"Error starting Celery workers: {e}") + + def _process_output_reader(self, pipe, stream_name): + """Read and log output from the process""" + try: + for line in iter(pipe.readline, ''): + if not line: + break + + line = line.strip() + if not line: + continue + + # Format the message to identify it's from Celery + if "ERROR" in line or "CRITICAL" in line: + logger.error(f"Celery[{stream_name}]: {line}") + elif "WARNING" in line: + logger.warning(f"Celery[{stream_name}]: {line}") + elif "DEBUG" in line: + logger.debug(f"Celery[{stream_name}]: {line}") + else: + logger.info(f"Celery[{stream_name}]: {line}") + + except Exception as e: + logger.error(f"Error processing Celery output: {e}") + finally: + pipe.close() + + def _monitor_config(self): + """Monitor configuration file for changes""" + logger.info("Starting config monitoring thread") + last_check_time = 0 + + while self.running: + try: + # Check for changes + if time.time() - last_check_time >= CONFIG_CHECK_INTERVAL: + self._update_workers() + last_check_time = time.time() + + time.sleep(1) + except Exception as e: + logger.error(f"Error in config monitoring thread: {e}") + time.sleep(5) # Wait before retrying + +# Create single instance +celery_manager = CeleryManager() \ No newline at end of file diff --git a/routes/utils/celery_tasks.py b/routes/utils/celery_tasks.py index 215bb2b..71f156a 100644 --- a/routes/utils/celery_tasks.py +++ b/routes/utils/celery_tasks.py @@ -5,15 +5,15 @@ import logging import traceback from datetime import datetime from celery import Celery, Task, states -from celery.signals import task_prerun, task_postrun, task_failure, worker_ready +from celery.signals import task_prerun, task_postrun, task_failure, worker_ready, worker_init, setup_logging from celery.exceptions import Retry -# Setup Redis and Celery -from routes.utils.celery_config import REDIS_URL, REDIS_BACKEND, get_config_params - # Configure logging logger = logging.getLogger(__name__) +# Setup Redis and Celery +from routes.utils.celery_config import REDIS_URL, REDIS_BACKEND, get_config_params + # Initialize Celery app celery_app = Celery('download_tasks', broker=REDIS_URL, @@ -35,6 +35,25 @@ class ProgressState: RETRYING = "retrying" CANCELLED = "cancel" +# Reuse the application's logging configuration for Celery workers +@setup_logging.connect +def setup_celery_logging(**kwargs): + """ + This handler ensures Celery uses our application logging settings + instead of its own. Prevents duplicate log configurations. + """ + # Using the root logger's handlers and level preserves our config + return logging.getLogger() + +# The initialization of a worker will log the worker configuration +@worker_init.connect +def worker_init_handler(**kwargs): + """Log when a worker initializes with its configuration details""" + config = get_config_params() + logger.info(f"Celery worker initialized with concurrency {config.get('maxConcurrentDownloads', 3)}") + logger.info(f"Worker config: spotifyQuality={config.get('spotifyQuality')}, deezerQuality={config.get('deezerQuality')}") + logger.debug("Worker Redis connection: " + REDIS_URL) + def store_task_status(task_id, status_data): """Store task status information in Redis""" # Add timestamp if not present @@ -102,6 +121,7 @@ def cancel_task(task_id): # Try to revoke the Celery task if it hasn't started yet celery_app.control.revoke(task_id, terminate=True, signal='SIGTERM') + logger.info(f"Task {task_id} cancelled by user") return {"status": "cancelled", "task_id": task_id} except Exception as e: logger.error(f"Error cancelling task {task_id}: {e}") @@ -209,6 +229,8 @@ def retry_task(task_id): download_type = task_info.get("download_type", "unknown") task = None + logger.info(f"Retrying task {task_id} as {new_task_id} (retry {retry_count + 1}/{max_retries})") + if download_type == "track": task = download_track.apply_async( kwargs=task_info, @@ -228,6 +250,7 @@ def retry_task(task_id): queue='downloads' ) else: + logger.error(f"Unknown download type for retry: {download_type}") return { "status": "error", "message": f"Unknown download type: {download_type}" @@ -303,8 +326,22 @@ class ProgressTrackingTask(Task): # Store the progress update in Redis store_task_status(task_id, progress_data) - # Log the progress update - logger.info(f"Task {task_id} progress: {progress_data}") + # Log the progress update with appropriate level + message = progress_data.get("message", "Progress update") + + if status == "processing": + progress = progress_data.get("progress", 0) + if progress > 0: + logger.debug(f"Task {task_id} progress: {progress}% - {message}") + else: + logger.info(f"Task {task_id} processing: {message}") + elif status == "error": + error_message = progress_data.get("error", message) + logger.error(f"Task {task_id} error: {error_message}") + elif status == "complete": + logger.info(f"Task {task_id} completed: {message}") + else: + logger.info(f"Task {task_id} {status}: {message}") # Celery signal handlers @task_prerun.connect @@ -378,21 +415,24 @@ def task_failure_handler(task_id=None, exception=None, traceback=None, *args, ** can_retry = retry_count < max_retries # Update task status to error + error_message = str(exception) store_task_status(task_id, { "status": ProgressState.ERROR, "timestamp": time.time(), "type": task_info.get("type", "unknown"), "name": task_info.get("name", "Unknown"), "artist": task_info.get("artist", ""), - "error": str(exception), + "error": error_message, "traceback": str(traceback), "can_retry": can_retry, "retry_count": retry_count, "max_retries": max_retries, - "message": f"Error: {str(exception)}" + "message": f"Error: {error_message}" }) - logger.error(f"Task {task_id} failed: {str(exception)}") + logger.error(f"Task {task_id} failed: {error_message}") + if can_retry: + logger.info(f"Task {task_id} can be retried ({retry_count}/{max_retries})") except Exception as e: logger.error(f"Error in task_failure_handler: {e}") @@ -469,6 +509,9 @@ def download_track(self, **task_data): custom_track_format = task_data.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%")) pad_tracks = task_data.get("pad_tracks", config_params.get("tracknum_padding", True)) + # Log task parameters for debugging + logger.debug(f"Track download parameters: service={service}, quality={quality}, real_time={real_time}") + # Execute the download function with progress callback download_track_func( service=service, @@ -550,6 +593,9 @@ def download_album(self, **task_data): custom_track_format = task_data.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%")) pad_tracks = task_data.get("pad_tracks", config_params.get("tracknum_padding", True)) + # Log task parameters for debugging + logger.debug(f"Album download parameters: service={service}, quality={quality}, real_time={real_time}") + # Execute the download function with progress callback download_album_func( service=service, @@ -631,6 +677,9 @@ def download_playlist(self, **task_data): custom_track_format = task_data.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%")) pad_tracks = task_data.get("pad_tracks", config_params.get("tracknum_padding", True)) + # Log task parameters for debugging + logger.debug(f"Playlist download parameters: service={service}, quality={quality}, real_time={real_time}") + # Execute the download function with progress callback download_playlist_func( service=service,