diff --git a/app.py b/app.py index 2d26b3a..a313565 100755 --- a/app.py +++ b/app.py @@ -208,31 +208,37 @@ if __name__ == "__main__": # Configure application logging log_handler = setup_logging() - # Set file permissions for log files if needed + # Set permissions for log file try: - os.chmod(log_handler.baseFilename, 0o666) - except (OSError, FileNotFoundError) as e: - logging.warning(f"Could not set permissions on log file: {str(e)}") + if os.name != "nt": # Not Windows + os.chmod(log_handler.baseFilename, 0o666) + except Exception as e: + logging.warning(f"Could not set permissions on log file: {e}") - # Log application startup - logging.info("=== Spotizerr Application Starting ===") - - # Check Redis connection before starting workers - if check_redis_connection(): - # Start Watch Manager - from routes.utils.watch.manager import start_watch_manager - - start_watch_manager() - - # Start Celery workers - start_celery_workers() - - # Create and start Flask app - app = create_app() - logging.info("Starting Flask server on port 7171") - from waitress import serve - - serve(app, host="0.0.0.0", port=7171) - else: - logging.error("Cannot start application: Redis connection failed") + # Check Redis connection before starting + if not check_redis_connection(): + logging.error("Exiting: Could not establish Redis connection.") sys.exit(1) + + # Start Celery workers in a separate thread + start_celery_workers() + + # Clean up Celery workers on exit + atexit.register(celery_manager.stop) + + # Create Flask app + app = create_app() + + # Get host and port from environment variables or use defaults + host = os.environ.get("HOST", "0.0.0.0") + port = int(os.environ.get("PORT", 7171)) + + # Use Flask's built-in server for development + # logging.info(f"Starting Flask development server on http://{host}:{port}") + # app.run(host=host, port=port, debug=True) + + # The following uses Waitress, a production-ready server. + # To use it, comment out the app.run() line above and uncomment the lines below. + logging.info(f"Starting server with Waitress on http://{host}:{port}") + from waitress import serve + serve(app, host=host, port=port) diff --git a/celerybeat-schedule b/celerybeat-schedule new file mode 100644 index 0000000..14f412a Binary files /dev/null and b/celerybeat-schedule differ diff --git a/routes/album.py b/routes/album.py index 6b24c4a..b22e64e 100755 --- a/routes/album.py +++ b/routes/album.py @@ -6,6 +6,7 @@ import time from routes.utils.celery_queue_manager import download_queue_manager from routes.utils.celery_tasks import store_task_info, store_task_status, ProgressState from routes.utils.get_info import get_spotify_info +from routes.utils.errors import DuplicateDownloadError album_bp = Blueprint("album", __name__) @@ -74,6 +75,17 @@ def handle_download(album_id): "orig_request": orig_params, } ) + except DuplicateDownloadError as e: + return Response( + json.dumps( + { + "error": "Duplicate download detected.", + "existing_task": e.existing_task, + } + ), + status=409, + mimetype="application/json", + ) except Exception as e: # Generic error handling for other issues during task submission # Create an error task ID if add_task itself fails before returning an ID diff --git a/routes/playlist.py b/routes/playlist.py index a17a98f..67aaa8f 100755 --- a/routes/playlist.py +++ b/routes/playlist.py @@ -27,6 +27,7 @@ from routes.utils.watch.manager import ( check_watched_playlists, get_watch_config, ) # For manual trigger & config +from routes.utils.errors import DuplicateDownloadError logger = logging.getLogger(__name__) # Added logger initialization playlist_bp = Blueprint("playlist", __name__, url_prefix="/api/playlist") @@ -97,7 +98,17 @@ def handle_download(playlist_id): "orig_request": orig_params, } ) - # Removed DuplicateDownloadError handling, add_task now manages this by creating an error task. + except DuplicateDownloadError as e: + return Response( + json.dumps( + { + "error": "Duplicate download detected.", + "existing_task": e.existing_task, + } + ), + status=409, + mimetype="application/json", + ) except Exception as e: # Generic error handling for other issues during task submission error_task_id = str(uuid.uuid4()) diff --git a/routes/prgs.py b/routes/prgs.py index 33467f2..fc1eda1 100755 --- a/routes/prgs.py +++ b/routes/prgs.py @@ -10,6 +10,7 @@ from routes.utils.celery_tasks import ( cancel_task, retry_task, redis_client, + delete_task_data, ) # Configure logging @@ -20,6 +21,60 @@ prgs_bp = Blueprint("prgs", __name__, url_prefix="/api/prgs") # (Old .prg file system removed. Using new task system only.) +def _build_error_callback_object(last_status): + """ + Constructs a structured error callback object based on the last status of a task. + This conforms to the CallbackObject types in the frontend. + """ + # The 'type' from the status update corresponds to the download_type (album, playlist, track) + download_type = last_status.get("type") + name = last_status.get("name") + # The 'artist' field from the status may contain artist names or a playlist owner's name + artist_or_owner = last_status.get("artist") + error_message = last_status.get("error", "An unknown error occurred.") + + status_info = {"status": "error", "error": error_message} + + callback_object = {"status_info": status_info} + + if download_type == "album": + callback_object["album"] = { + "type": "album", + "title": name, + "artists": [{ + "type": "artistAlbum", + "name": artist_or_owner + }] if artist_or_owner else [], + } + elif download_type == "playlist": + playlist_payload = {"type": "playlist", "title": name} + if artist_or_owner: + playlist_payload["owner"] = {"type": "user", "name": artist_or_owner} + callback_object["playlist"] = playlist_payload + elif download_type == "track": + callback_object["track"] = { + "type": "track", + "title": name, + "artists": [{ + "type": "artistTrack", + "name": artist_or_owner + }] if artist_or_owner else [], + } + else: + # Fallback for unknown types to avoid breaking the client, returning a basic error structure. + return { + "status_info": status_info, + "unstructured_error": True, + "details": { + "type": download_type, + "name": name, + "artist_or_owner": artist_or_owner, + }, + } + + return callback_object + + @prgs_bp.route("/", methods=["GET"]) def get_task_details(task_id): """ @@ -77,17 +132,26 @@ def get_task_details(task_id): last_status = get_last_task_status(task_id) status_count = len(get_task_status(task_id)) - # Default to the full last_status object, then check for the raw callback - last_line_content = last_status + # Determine last_line content if last_status and "raw_callback" in last_status: last_line_content = last_status["raw_callback"] + elif last_status and last_status.get("status") == "error": + last_line_content = _build_error_callback_object(last_status) + else: + # Fallback for non-error, no raw_callback, or if last_status is None + last_line_content = last_status response = { "original_url": dynamic_original_url, "last_line": last_line_content, - "timestamp": time.time(), + "timestamp": last_status.get("timestamp") if last_status else time.time(), "task_id": task_id, "status_count": status_count, + "created_at": task_info.get("created_at"), + "name": task_info.get("name"), + "artist": task_info.get("artist"), + "type": task_info.get("type"), + "download_type": task_info.get("download_type"), } if last_status and last_status.get("summary"): response["summary"] = last_status["summary"] @@ -106,9 +170,13 @@ def delete_task(task_id): task_info = get_task_info(task_id) if not task_info: abort(404, "Task not found") + + # First, cancel the task if it's running cancel_task(task_id) - redis_client.delete(f"task:{task_id}:info") - redis_client.delete(f"task:{task_id}:status") + + # Then, delete all associated data from Redis + delete_task_data(task_id) + return {"message": f"Task {task_id} deleted successfully"}, 200 @@ -116,8 +184,7 @@ def delete_task(task_id): def list_tasks(): """ Retrieve a list of all tasks in the system. - Returns a detailed list of task objects including status and metadata, - formatted according to the callback documentation. + Returns a detailed list of task objects including status and metadata. By default, it returns active tasks. Use ?include_finished=true to include completed tasks. """ try: @@ -133,90 +200,86 @@ def list_tasks(): continue task_info = get_task_info(task_id) - last_status = get_last_task_status(task_id) + if not task_info: + continue - if task_info and last_status: - # Start with the last status object as the base. - # This object should conform to one of the callback types. - task_details = last_status.copy() + # Dynamically construct original_url + dynamic_original_url = "" + download_type = task_info.get("download_type") + # The 'url' field in task_info stores the Spotify/Deezer URL of the item + # e.g., https://open.spotify.com/album/albumId or https://www.deezer.com/track/trackId + item_url = task_info.get("url") - # Add essential metadata to the task details - task_details["task_id"] = task_id - task_details["original_request"] = task_info.get( - "original_request", {} - ) - task_details["created_at"] = task_info.get("created_at", 0) - - # Ensure core properties from task_info are present if not in status - if "type" not in task_details: - task_details["type"] = task_info.get("type", "unknown") - if "name" not in task_details: - task_details["name"] = task_info.get("name", "Unknown") - if "artist" not in task_details: - task_details["artist"] = task_info.get("artist", "") - if "download_type" not in task_details: - task_details["download_type"] = task_info.get( - "download_type", "unknown" + if download_type and item_url: + try: + # Extract the ID from the item_url (last part of the path) + item_id = item_url.split("/")[-1] + if item_id: # Ensure item_id is not empty + base_url = request.host_url.rstrip("/") + dynamic_original_url = ( + f"{base_url}/api/{download_type}/download/{item_id}" + ) + else: + logger.warning( + f"Could not extract item ID from URL: {item_url} for task {task_id}. Falling back for original_url." + ) + original_request_obj = task_info.get("original_request", {}) + dynamic_original_url = original_request_obj.get( + "original_url", "" + ) + except Exception as e: + logger.error( + f"Error constructing dynamic original_url for task {task_id}: {e}", + exc_info=True, ) - - detailed_tasks.append(task_details) - elif ( - task_info - ): # If last_status is somehow missing, still provide some info - detailed_tasks.append( - { - "task_id": task_id, - "type": task_info.get("type", "unknown"), - "name": task_info.get("name", "Unknown"), - "artist": task_info.get("artist", ""), - "download_type": task_info.get("download_type", "unknown"), - "status": "unknown", - "original_request": task_info.get("original_request", {}), - "created_at": task_info.get("created_at", 0), - "timestamp": task_info.get("created_at", 0), - } + original_request_obj = task_info.get("original_request", {}) + dynamic_original_url = original_request_obj.get( + "original_url", "" + ) # Fallback on any error + else: + logger.warning( + f"Missing download_type ('{download_type}') or item_url ('{item_url}') in task_info for task {task_id}. Falling back for original_url." ) + original_request_obj = task_info.get("original_request", {}) + dynamic_original_url = original_request_obj.get("original_url", "") - # Sort tasks by creation time (newest first, or by timestamp if creation time is missing) - detailed_tasks.sort( - key=lambda x: x.get("timestamp", x.get("created_at", 0)), reverse=True - ) + last_status = get_last_task_status(task_id) + status_count = len(get_task_status(task_id)) + + # Determine last_line content + if last_status and "raw_callback" in last_status: + last_line_content = last_status["raw_callback"] + elif last_status and last_status.get("status") == "error": + last_line_content = _build_error_callback_object(last_status) + else: + # Fallback for non-error, no raw_callback, or if last_status is None + last_line_content = last_status + + response = { + "original_url": dynamic_original_url, + "last_line": last_line_content, + "timestamp": last_status.get("timestamp") if last_status else time.time(), + "task_id": task_id, + "status_count": status_count, + "created_at": task_info.get("created_at"), + "name": task_info.get("name"), + "artist": task_info.get("artist"), + "type": task_info.get("type"), + "download_type": task_info.get("download_type"), + } + if last_status and last_status.get("summary"): + response["summary"] = last_status["summary"] + + detailed_tasks.append(response) + + # Sort tasks by creation time (newest first) + detailed_tasks.sort(key=lambda x: x.get("created_at", 0), reverse=True) return jsonify(detailed_tasks) except Exception as e: logger.error(f"Error in /api/prgs/list: {e}", exc_info=True) return jsonify({"error": "Failed to retrieve task list"}), 500 - -@prgs_bp.route("/retry/", methods=["POST"]) -def retry_task_endpoint(task_id): - """ - Retry a failed task. - - Args: - task_id: The ID of the task to retry - """ - try: - # First check if this is a task ID in the new system - task_info = get_task_info(task_id) - - if task_info: - # This is a task ID in the new system - result = retry_task(task_id) - return jsonify(result) - - # If not found in new system, we need to handle the old system retry - # For now, return an error as we're transitioning to the new system - return jsonify( - { - "status": "error", - "message": "Retry for old system is not supported in the new API. Please use the new task ID format.", - } - ), 400 - except Exception as e: - abort(500, f"An error occurred: {e}") - - @prgs_bp.route("/cancel/", methods=["POST"]) def cancel_task_endpoint(task_id): """ @@ -244,3 +307,36 @@ def cancel_task_endpoint(task_id): ), 400 except Exception as e: abort(500, f"An error occurred: {e}") + + +@prgs_bp.route("/cancel/all", methods=["POST"]) +def cancel_all_tasks(): + """ + Cancel all active (running or queued) tasks. + """ + try: + tasks_to_cancel = get_all_tasks(include_finished=False) + cancelled_count = 0 + errors = [] + + for task_summary in tasks_to_cancel: + task_id = task_summary.get("task_id") + if not task_id: + continue + try: + cancel_task(task_id) + cancelled_count += 1 + except Exception as e: + error_message = f"Failed to cancel task {task_id}: {e}" + logger.error(error_message) + errors.append(error_message) + + response = { + "message": f"Attempted to cancel all active tasks. {cancelled_count} tasks cancelled.", + "cancelled_count": cancelled_count, + "errors": errors, + } + return jsonify(response), 200 + except Exception as e: + logger.error(f"Error in /api/prgs/cancel/all: {e}", exc_info=True) + return jsonify({"error": "Failed to cancel all tasks"}), 500 diff --git a/routes/track.py b/routes/track.py index 3f86828..5c0f7e8 100755 --- a/routes/track.py +++ b/routes/track.py @@ -3,7 +3,10 @@ import json import traceback import uuid # For generating error task IDs import time # For timestamps -from routes.utils.celery_queue_manager import download_queue_manager +from routes.utils.celery_queue_manager import ( + download_queue_manager, + get_existing_task_id, +) from routes.utils.celery_tasks import ( store_task_info, store_task_status, @@ -81,6 +84,20 @@ def handle_download(track_id): mimetype="application/json", ) + # Check for existing task before adding to the queue + existing_task = get_existing_task_id(url) + if existing_task: + return Response( + json.dumps( + { + "error": "Duplicate download detected.", + "existing_task": existing_task, + } + ), + status=409, + mimetype="application/json", + ) + try: task_id = download_queue_manager.add_task( { diff --git a/routes/utils/album.py b/routes/utils/album.py index 63e1cce..50f3986 100755 --- a/routes/utils/album.py +++ b/routes/utils/album.py @@ -6,6 +6,8 @@ from routes.utils.credentials import ( _get_global_spotify_api_creds, get_spotify_blob_path, ) +from routes.utils.celery_queue_manager import get_existing_task_id +from routes.utils.errors import DuplicateDownloadError def download_album( @@ -25,7 +27,15 @@ def download_album( progress_callback=None, convert_to=None, bitrate=None, + _is_celery_task_execution=False, # Added to skip duplicate check from Celery task ): + if not _is_celery_task_execution: + existing_task = get_existing_task_id(url) # Check for duplicates only if not called by Celery task + if existing_task: + raise DuplicateDownloadError( + f"Download for this URL is already in progress.", + existing_task=existing_task, + ) try: # Detect URL source (Spotify or Deezer) from URL is_spotify_url = "open.spotify.com" in url.lower() diff --git a/routes/utils/artist.py b/routes/utils/artist.py index 88d706e..49872cf 100644 --- a/routes/utils/artist.py +++ b/routes/utils/artist.py @@ -4,7 +4,7 @@ from flask import url_for from routes.utils.celery_queue_manager import download_queue_manager from routes.utils.get_info import get_spotify_info from routes.utils.credentials import get_credential, _get_global_spotify_api_creds -from routes.utils.celery_tasks import get_last_task_status, ProgressState +from routes.utils.errors import DuplicateDownloadError from deezspot.easy_spoty import Spo from deezspot.libutils.utils import get_ids, link_is_valid @@ -112,48 +112,34 @@ def download_artist_albums( if not url: raise ValueError("Missing required parameter: url") - # Extract artist ID from URL artist_id = url.split("/")[-1] if "?" in artist_id: artist_id = artist_id.split("?")[0] logger.info(f"Fetching artist info for ID: {artist_id}") - # Detect URL source (only Spotify is supported for artists) - is_spotify_url = "open.spotify.com" in url.lower() - - # Artist functionality only works with Spotify URLs currently - if not is_spotify_url: + if "open.spotify.com" not in url.lower(): error_msg = ( "Invalid URL: Artist functionality only supports open.spotify.com URLs" ) logger.error(error_msg) raise ValueError(error_msg) - # Get artist info with albums artist_data = get_spotify_info(artist_id, "artist_discography") - # Debug logging to inspect the structure of artist_data - logger.debug( - f"Artist data structure has keys: {list(artist_data.keys() if isinstance(artist_data, dict) else [])}" - ) - if not artist_data or "items" not in artist_data: raise ValueError( f"Failed to retrieve artist data or no albums found for artist ID {artist_id}" ) - # Parse the album types to filter by allowed_types = [t.strip().lower() for t in album_type.split(",")] logger.info(f"Filtering albums by types: {allowed_types}") - # Filter albums by the specified types filtered_albums = [] for album in artist_data.get("items", []): album_type_value = album.get("album_type", "").lower() album_group_value = album.get("album_group", "").lower() - # Apply filtering logic based on album_type and album_group if ( ( "album" in allowed_types @@ -174,116 +160,54 @@ def download_artist_albums( logger.warning(f"No albums match the specified types: {album_type}") return [], [] - # Queue each album as a separate download task - album_task_ids = [] successfully_queued_albums = [] - duplicate_albums = [] # To store info about albums that were duplicates + duplicate_albums = [] for album in filtered_albums: - # Add detailed logging to inspect each album's structure and URLs - logger.debug(f"Processing album: {album.get('name', 'Unknown')}") - logger.debug(f"Album structure has keys: {list(album.keys())}") - - external_urls = album.get("external_urls", {}) - logger.debug(f"Album external_urls: {external_urls}") - - album_url = external_urls.get("spotify", "") + album_url = album.get("external_urls", {}).get("spotify", "") album_name = album.get("name", "Unknown Album") album_artists = album.get("artists", []) album_artist = ( - album_artists[0].get("name", "Unknown Artist") - if album_artists - else "Unknown Artist" + album_artists[0].get("name", "Unknown Artist") if album_artists else "Unknown Artist" ) - album_id = album.get("id") - logger.debug(f"Extracted album URL: {album_url}") - logger.debug(f"Extracted album ID: {album_id}") - - if not album_url or not album_id: - logger.warning(f"Skipping album without URL or ID: {album_name}") + if not album_url: + logger.warning(f"Skipping album '{album_name}' because it has no Spotify URL.") continue - # Create album-specific request args instead of using original artist request - album_request_args = { + task_data = { + "download_type": "album", "url": album_url, "name": album_name, "artist": album_artist, - "type": "album", - # URL source will be automatically detected in the download functions - "parent_artist_url": url, - "parent_request_type": "artist", + "orig_request": request_args, } - # Include original download URL for this album task - album_request_args["original_url"] = url_for( - "album.handle_download", album_id=album_id, _external=True - ) - - # Create task for this album - task_data = { - "download_type": "album", - "type": "album", # Type for the download task - "url": album_url, # Important: use the album URL, not artist URL - "retry_url": album_url, # Use album URL for retry logic, not artist URL - "name": album_name, - "artist": album_artist, - "orig_request": album_request_args, # Store album-specific request params - } - - # Debug log the task data being sent to the queue - logger.debug( - f"Album task data: url={task_data['url']}, retry_url={task_data['retry_url']}" - ) - try: task_id = download_queue_manager.add_task(task_data) - - # Check the status of the newly added task to see if it was marked as a duplicate error - last_status = get_last_task_status(task_id) - - if ( - last_status - and last_status.get("status") == ProgressState.ERROR - and last_status.get("existing_task_id") - ): - logger.warning( - f"Album {album_name} (URL: {album_url}) is a duplicate. Error task ID: {task_id}. Existing task ID: {last_status.get('existing_task_id')}" - ) - duplicate_albums.append( - { - "name": album_name, - "artist": album_artist, - "url": album_url, - "error_task_id": task_id, # This is the ID of the task marked as a duplicate error - "existing_task_id": last_status.get("existing_task_id"), - "message": last_status.get( - "message", "Duplicate download attempt." - ), - } - ) - else: - # If not a duplicate error, it was successfully queued (or failed for other reasons handled by add_task) - # We only add to successfully_queued_albums if it wasn't a duplicate error from add_task - # Other errors from add_task (like submission failure) would also result in an error status for task_id - # but won't have 'existing_task_id'. The client can check the status of this task_id. - album_task_ids.append( - task_id - ) # Keep track of all task_ids returned by add_task - successfully_queued_albums.append( - { - "name": album_name, - "artist": album_artist, - "url": album_url, - "task_id": task_id, - } - ) - logger.info(f"Queued album download: {album_name} ({task_id})") - except Exception as e: # Catch any other unexpected error from add_task itself (though it should be rare now) - logger.error( - f"Failed to queue album {album_name} due to an unexpected error in add_task: {str(e)}" + successfully_queued_albums.append( + { + "name": album_name, + "artist": album_artist, + "url": album_url, + "task_id": task_id, + } ) - # Optionally, collect these errors. For now, just logging and continuing. + except DuplicateDownloadError as e: + logger.warning( + f"Skipping duplicate album {album_name} (URL: {album_url}). Existing task: {e.existing_task}" + ) + duplicate_albums.append( + { + "name": album_name, + "artist": album_artist, + "url": album_url, + "existing_task": e.existing_task, + "message": str(e), + } + ) + except Exception as e: + logger.error(f"Failed to queue album {album_name} for an unknown reason: {e}") logger.info( f"Artist album processing: {len(successfully_queued_albums)} queued, {len(duplicate_albums)} duplicates found." diff --git a/routes/utils/celery_config.py b/routes/utils/celery_config.py index 70ace42..35abb9c 100644 --- a/routes/utils/celery_config.py +++ b/routes/utils/celery_config.py @@ -149,7 +149,7 @@ task_max_retries = MAX_RETRIES # Task result settings task_track_started = True -result_expires = 60 * 60 * 24 * 7 # 7 days +result_expires = 3600 # 1 hour # Configure visibility timeout for task messages broker_transport_options = { @@ -167,3 +167,11 @@ broker_pool_limit = 10 worker_prefetch_multiplier = 1 # Process one task at a time per worker worker_max_tasks_per_child = 100 # Restart worker after 100 tasks worker_disable_rate_limits = False + +# Celery Beat schedule +beat_schedule = { + 'cleanup-old-tasks': { + 'task': 'routes.utils.celery_tasks.cleanup_old_tasks', + 'schedule': 3600.0, # Run every hour + }, +} diff --git a/routes/utils/celery_queue_manager.py b/routes/utils/celery_queue_manager.py index b472f70..bf82787 100644 --- a/routes/utils/celery_queue_manager.py +++ b/routes/utils/celery_queue_manager.py @@ -83,6 +83,89 @@ def get_config_params(): } +def get_existing_task_id(url, download_type=None): + """ + Check if an active task with the same URL (and optionally, type) already exists. + This function ignores tasks that are in a terminal state (e.g., completed, cancelled, or failed). + + Args: + url (str): The URL to check for duplicates. + download_type (str, optional): The type of download to check. Defaults to None. + + Returns: + str | None: The task ID of the existing active task, or None if no active duplicate is found. + """ + logger.debug(f"GET_EXISTING_TASK_ID: Checking for URL='{url}', type='{download_type}'") + if not url: + logger.debug("GET_EXISTING_TASK_ID: No URL provided, returning None.") + return None + + # Define terminal states. Tasks in these states are considered inactive and will be ignored. + TERMINAL_STATES = { + ProgressState.COMPLETE, + ProgressState.DONE, + ProgressState.CANCELLED, + ProgressState.ERROR, + ProgressState.ERROR_RETRIED, + ProgressState.ERROR_AUTO_CLEANED, + } + logger.debug(f"GET_EXISTING_TASK_ID: Terminal states defined as: {TERMINAL_STATES}") + + all_existing_tasks_summary = get_all_tasks() # This function already filters by default based on its own TERMINAL_STATES + logger.debug(f"GET_EXISTING_TASK_ID: Found {len(all_existing_tasks_summary)} tasks from get_all_tasks(). Iterating...") + + for task_summary in all_existing_tasks_summary: + existing_task_id = task_summary.get("task_id") + if not existing_task_id: + logger.debug("GET_EXISTING_TASK_ID: Skipping summary with no task_id.") + continue + + logger.debug(f"GET_EXISTING_TASK_ID: Processing existing task_id='{existing_task_id}' from summary.") + + # First, check the status of the task directly from its latest status record. + # get_all_tasks() might have its own view of terminal, but we re-check here for absolute certainty. + existing_last_status_obj = get_last_task_status(existing_task_id) + if not existing_last_status_obj: + logger.debug(f"GET_EXISTING_TASK_ID: No last status object for task_id='{existing_task_id}'. Skipping.") + continue + + existing_status = existing_last_status_obj.get("status") + logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}', last_status_obj='{existing_last_status_obj}', extracted status='{existing_status}'.") + + # If the task is in a terminal state, ignore it and move to the next one. + if existing_status in TERMINAL_STATES: + logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' has terminal status='{existing_status}'. Skipping.") + continue + + logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' has ACTIVE status='{existing_status}'. Proceeding to check URL/type.") + + # If the task is active, then check if its URL and type match. + existing_task_info = get_task_info(existing_task_id) + if not existing_task_info: + logger.debug(f"GET_EXISTING_TASK_ID: No task info for active task_id='{existing_task_id}'. Skipping.") + continue + + existing_url = existing_task_info.get("url") + logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}', info_url='{existing_url}'. Comparing with target_url='{url}'.") + if existing_url != url: + logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' URL mismatch. Skipping.") + continue + + if download_type: + existing_type = existing_task_info.get("download_type") + logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}', info_type='{existing_type}'. Comparing with target_type='{download_type}'.") + if existing_type != download_type: + logger.debug(f"GET_EXISTING_TASK_ID: Task_id='{existing_task_id}' type mismatch. Skipping.") + continue + + # Found an active task that matches the criteria. + logger.info(f"GET_EXISTING_TASK_ID: Found ACTIVE duplicate: task_id='{existing_task_id}' for URL='{url}', type='{download_type}'. Returning this ID.") + return existing_task_id + + logger.debug(f"GET_EXISTING_TASK_ID: No active duplicate found for URL='{url}', type='{download_type}'. Returning None.") + return None + + class CeleryDownloadQueueManager: """ Manages a queue of download tasks using Celery. @@ -125,14 +208,14 @@ class CeleryDownloadQueueManager: "Task being added with no URL. Duplicate check might be unreliable." ) - NON_BLOCKING_STATES = [ + TERMINAL_STATES = { # Renamed and converted to a set for consistency ProgressState.COMPLETE, ProgressState.DONE, ProgressState.CANCELLED, ProgressState.ERROR, ProgressState.ERROR_RETRIED, ProgressState.ERROR_AUTO_CLEANED, - ] + } all_existing_tasks_summary = get_all_tasks() if incoming_url: @@ -154,7 +237,7 @@ class CeleryDownloadQueueManager: if ( existing_url == incoming_url and existing_type == incoming_type - and existing_status not in NON_BLOCKING_STATES + and existing_status not in TERMINAL_STATES ): message = f"Duplicate download: URL '{incoming_url}' (type: {incoming_type}) is already being processed by task {existing_task_id} (status: {existing_status})." logger.warning(message) diff --git a/routes/utils/celery_tasks.py b/routes/utils/celery_tasks.py index 2d2b9e4..a461322 100644 --- a/routes/utils/celery_tasks.py +++ b/routes/utils/celery_tasks.py @@ -181,6 +181,70 @@ def get_task_info(task_id): return {} +def delete_task_data(task_id): + """Deletes all Redis data associated with a task_id.""" + try: + redis_client.delete(f"task:{task_id}:info") + redis_client.delete(f"task:{task_id}:status") + redis_client.delete(f"task:{task_id}:status:next_id") + logger.info(f"Deleted data for task {task_id}") + except Exception as e: + logger.error(f"Error deleting data for task {task_id}: {e}") + + +CLEANUP_THRESHOLD_SECONDS = 3600 # 1 hour + + +@celery_app.task(name="routes.utils.celery_tasks.cleanup_old_tasks") +def cleanup_old_tasks(): + """ + Periodically cleans up old, finished tasks from Redis to prevent data buildup. + """ + logger.info("Starting cleanup of old finished tasks...") + + # Define terminal states that are safe to clean up + TERMINAL_STATES = { + ProgressState.COMPLETE, + ProgressState.DONE, + ProgressState.CANCELLED, + ProgressState.ERROR, + ProgressState.ERROR_RETRIED, + ProgressState.ERROR_AUTO_CLEANED, + } + + cleaned_count = 0 + # Scan for all task info keys, which serve as the master record for a task's existence + task_info_keys = redis_client.keys("task:*:info") + + for key in task_info_keys: + try: + task_id = key.decode("utf-8").split(":")[1] + last_status = get_last_task_status(task_id) + + if not last_status: + # If there's no status, we can't determine age or state. + # Could be an orphaned task info key. Consider a separate cleanup for these. + continue + + status = last_status.get("status") + timestamp = last_status.get("timestamp", 0) + + # Check if the task is in a terminal state and has expired + if status in TERMINAL_STATES: + if (time.time() - timestamp) > CLEANUP_THRESHOLD_SECONDS: + logger.info( + f"Cleaning up expired task {task_id} (status: {status}, age: {time.time() - timestamp}s)" + ) + delete_task_data(task_id) + cleaned_count += 1 + except Exception as e: + logger.error( + f"Error processing task key {key} for cleanup: {e}", exc_info=True + ) + + logger.info(f"Finished cleanup of old tasks. Removed {cleaned_count} tasks.") + + # --- History Logging Helper --- def _log_task_to_history(task_id, final_status_str, error_msg=None): """Helper function to gather task data and log it to the history database.""" @@ -1324,6 +1388,7 @@ def download_track(self, **task_data): progress_callback=self.progress_callback, convert_to=convert_to, bitrate=bitrate, + _is_celery_task_execution=True, # Skip duplicate check inside Celery task (consistency) ) return {"status": "success", "message": "Track download completed"} @@ -1410,6 +1475,7 @@ def download_album(self, **task_data): progress_callback=self.progress_callback, convert_to=convert_to, bitrate=bitrate, + _is_celery_task_execution=True, # Skip duplicate check inside Celery task ) return {"status": "success", "message": "Album download completed"} @@ -1508,6 +1574,7 @@ def download_playlist(self, **task_data): progress_callback=self.progress_callback, convert_to=convert_to, bitrate=bitrate, + _is_celery_task_execution=True, # Skip duplicate check inside Celery task ) return {"status": "success", "message": "Playlist download completed"} diff --git a/routes/utils/errors.py b/routes/utils/errors.py new file mode 100644 index 0000000..3a8a59f --- /dev/null +++ b/routes/utils/errors.py @@ -0,0 +1,6 @@ +class DuplicateDownloadError(Exception): + def __init__(self, message, existing_task=None): + if existing_task: + message = f"{message} (Conflicting Task ID: {existing_task})" + super().__init__(message) + self.existing_task = existing_task diff --git a/routes/utils/playlist.py b/routes/utils/playlist.py index 5605a51..78c3f40 100755 --- a/routes/utils/playlist.py +++ b/routes/utils/playlist.py @@ -3,6 +3,8 @@ from deezspot.spotloader import SpoLogin from deezspot.deezloader import DeeLogin from pathlib import Path from routes.utils.credentials import get_credential, _get_global_spotify_api_creds +from routes.utils.celery_queue_manager import get_existing_task_id +from routes.utils.errors import DuplicateDownloadError def download_playlist( @@ -22,7 +24,15 @@ def download_playlist( progress_callback=None, convert_to=None, bitrate=None, + _is_celery_task_execution=False, # Added to skip duplicate check from Celery task ): + if not _is_celery_task_execution: + existing_task = get_existing_task_id(url) # Check for duplicates only if not called by Celery task + if existing_task: + raise DuplicateDownloadError( + f"Download for this URL is already in progress.", + existing_task=existing_task, + ) try: # Detect URL source (Spotify or Deezer) from URL is_spotify_url = "open.spotify.com" in url.lower() diff --git a/routes/utils/track.py b/routes/utils/track.py index 79c40bc..6344816 100755 --- a/routes/utils/track.py +++ b/routes/utils/track.py @@ -25,6 +25,7 @@ def download_track( progress_callback=None, convert_to=None, bitrate=None, + _is_celery_task_execution=False, # Added for consistency, not currently used for duplicate check ): try: # Detect URL source (Spotify or Deezer) from URL diff --git a/spotizerr-ui/src/contexts/QueueProvider.tsx b/spotizerr-ui/src/contexts/QueueProvider.tsx index 29aea5f..bb6c62e 100644 --- a/spotizerr-ui/src/contexts/QueueProvider.tsx +++ b/spotizerr-ui/src/contexts/QueueProvider.tsx @@ -37,37 +37,10 @@ function isPlaylistCallback(obj: any): obj is PlaylistCallbackObject { } export function QueueProvider({ children }: { children: ReactNode }) { - const [items, setItems] = useState(() => { - try { - const storedItems = localStorage.getItem("queueItems"); - return storedItems ? JSON.parse(storedItems) : []; - } catch { - return []; - } - }); + const [items, setItems] = useState([]); const [isVisible, setIsVisible] = useState(false); const pollingIntervals = useRef>({}); - useEffect(() => { - localStorage.setItem("queueItems", JSON.stringify(items)); - }, [items]); - - // Effect to resume polling for active tasks on component mount - useEffect(() => { - if (items.length > 0) { - items.forEach((item) => { - // If a task has an ID and is not in a finished state, restart polling. - if (item.taskId && !isTerminalStatus(item.status)) { - console.log(`Resuming polling for ${item.name} (Task ID: ${item.taskId})`); - startPolling(item.id, item.taskId); - } - }); - } - // This effect should only run once on mount to avoid re-triggering polling unnecessarily. - // We are disabling the dependency warning because we intentionally want to use the initial `items` state. - // eslint-disable-next-line react-hooks/exhaustive-deps - }, []); - const stopPolling = useCallback((internalId: string) => { if (pollingIntervals.current[internalId]) { clearInterval(pollingIntervals.current[internalId]); @@ -75,122 +48,126 @@ export function QueueProvider({ children }: { children: ReactNode }) { } }, []); + const updateItemFromPrgs = useCallback((item: QueueItem, prgsData: any): QueueItem => { + const updatedItem: QueueItem = { ...item }; + const { last_line, summary, status, name, artist, download_type } = prgsData; + + if (status) updatedItem.status = status as QueueStatus; + if (summary) updatedItem.summary = summary; + if (name) updatedItem.name = name; + if (artist) updatedItem.artist = artist; + if (download_type) updatedItem.type = download_type; + + if (last_line) { + if (isProcessingCallback(last_line)) { + updatedItem.status = "processing"; + } else if (isTrackCallback(last_line)) { + const { status_info, track, current_track, total_tracks, parent } = last_line; + updatedItem.currentTrackTitle = track.title; + if (current_track) updatedItem.currentTrackNumber = current_track; + if (total_tracks) updatedItem.totalTracks = total_tracks; + updatedItem.status = (parent && ["done", "skipped"].includes(status_info.status)) ? "downloading" : status_info.status as QueueStatus; + if (status_info.status === "skipped") { + updatedItem.error = status_info.reason; + } else if (status_info.status === "error" || status_info.status === "retrying") { + updatedItem.error = status_info.error; + } + if (!parent && status_info.status === "done" && status_info.summary) updatedItem.summary = status_info.summary; + } else if (isAlbumCallback(last_line)) { + const { status_info, album } = last_line; + updatedItem.status = status_info.status as QueueStatus; + updatedItem.name = album.title; + updatedItem.artist = album.artists.map(a => a.name).join(", "); + if (status_info.status === "done") { + if (status_info.summary) updatedItem.summary = status_info.summary; + updatedItem.currentTrackTitle = undefined; + } else if (status_info.status === "error") { + updatedItem.error = status_info.error; + } + } else if (isPlaylistCallback(last_line)) { + const { status_info, playlist } = last_line; + updatedItem.status = status_info.status as QueueStatus; + updatedItem.name = playlist.title; + updatedItem.playlistOwner = playlist.owner.name; + if (status_info.status === "done") { + if (status_info.summary) updatedItem.summary = status_info.summary; + updatedItem.currentTrackTitle = undefined; + } else if (status_info.status === "error") { + updatedItem.error = status_info.error; + } + } + } + + return updatedItem; + }, []); + const startPolling = useCallback( - (internalId: string, taskId: string) => { - if (pollingIntervals.current[internalId]) return; + (taskId: string) => { + if (pollingIntervals.current[taskId]) return; const intervalId = window.setInterval(async () => { try { - interface PrgsResponse { - status?: string; - summary?: SummaryObject; - last_line?: CallbackObject; - } - - const response = await apiClient.get(`/prgs/${taskId}`); - const { last_line, summary, status } = response.data; - + const response = await apiClient.get(`/prgs/${taskId}`); setItems(prev => prev.map(item => { - if (item.id !== internalId) return item; - - const updatedItem: QueueItem = { ...item }; - - if (status) { - updatedItem.status = status as QueueStatus; - } - - if (summary) { - updatedItem.summary = summary; - } - - if (last_line) { - if (isProcessingCallback(last_line)) { - updatedItem.status = "processing"; - } else if (isTrackCallback(last_line)) { - const { status_info, track, current_track, total_tracks, parent } = last_line; - - updatedItem.currentTrackTitle = track.title; - if (current_track) updatedItem.currentTrackNumber = current_track; - if (total_tracks) updatedItem.totalTracks = total_tracks; - - // A child track being "done" doesn't mean the whole download is done. - // The final "done" status comes from the parent (album/playlist) callback. - if (parent && status_info.status === "done") { - updatedItem.status = "downloading"; // Or keep current status if not 'error' - } else { - updatedItem.status = status_info.status as QueueStatus; - } - - if (status_info.status === "error" || status_info.status === "retrying") { - updatedItem.error = status_info.error; - } - - // For single tracks, the "done" status is final. - if (!parent && status_info.status === "done") { - if (status_info.summary) updatedItem.summary = status_info.summary; - } - } else if (isAlbumCallback(last_line)) { - const { status_info, album } = last_line; - updatedItem.status = status_info.status as QueueStatus; - updatedItem.name = album.title; - updatedItem.artist = album.artists.map(a => a.name).join(", "); - if (status_info.status === "done" && status_info.summary) { - updatedItem.summary = status_info.summary; - } - if (status_info.status === "error") { - updatedItem.error = status_info.error; - } - } else if (isPlaylistCallback(last_line)) { - const { status_info, playlist } = last_line; - updatedItem.status = status_info.status as QueueStatus; - updatedItem.name = playlist.title; - updatedItem.playlistOwner = playlist.owner.name; - if (status_info.status === "done" && status_info.summary) { - updatedItem.summary = status_info.summary; - } - if (status_info.status === "error") { - updatedItem.error = status_info.error; - } - } - } - + if (item.taskId !== taskId) return item; + const updatedItem = updateItemFromPrgs(item, response.data); if (isTerminalStatus(updatedItem.status as QueueStatus)) { - stopPolling(internalId); + stopPolling(taskId); } - return updatedItem; }), ); } catch (error) { console.error(`Polling failed for task ${taskId}:`, error); - stopPolling(internalId); + stopPolling(taskId); setItems(prev => prev.map(i => - i.id === internalId - ? { - ...i, - status: "error", - error: "Connection lost", - } + i.taskId === taskId + ? { ...i, status: "error", error: "Connection lost" } : i, ), ); } }, 2000); - pollingIntervals.current[internalId] = intervalId; + pollingIntervals.current[taskId] = intervalId; }, - [stopPolling], + [stopPolling, updateItemFromPrgs], ); useEffect(() => { - items.forEach((item) => { - if (item.taskId && !isTerminalStatus(item.status)) { - startPolling(item.id, item.taskId); + const fetchQueue = async () => { + try { + const response = await apiClient.get("/prgs/list"); + const backendItems = response.data.map((task: any) => { + const spotifyId = task.original_url?.split("/").pop() || ""; + const baseItem: QueueItem = { + id: task.task_id, + taskId: task.task_id, + name: task.name || "Unknown", + type: task.download_type || "track", + spotifyId: spotifyId, + status: "initializing", + artist: task.artist, + }; + return updateItemFromPrgs(baseItem, task); + }); + + setItems(backendItems); + + backendItems.forEach((item: QueueItem) => { + if (item.taskId && !isTerminalStatus(item.status)) { + startPolling(item.taskId); + } + }); + } catch (error) { + console.error("Failed to fetch queue from backend:", error); + toast.error("Could not load queue. Please refresh the page."); } - }); - // We only want to run this on mount, so we disable the exhaustive-deps warning. + }; + + fetchQueue(); // eslint-disable-next-line react-hooks/exhaustive-deps }, []); @@ -200,35 +177,31 @@ export function QueueProvider({ children }: { children: ReactNode }) { const newItem: QueueItem = { ...item, id: internalId, - status: "queued", + status: "initializing", }; - setItems((prev) => [...prev, newItem]); - if (!isVisible) setIsVisible(true); + setItems(prev => [newItem, ...prev]); + setIsVisible(true); try { - let endpoint = ""; - if (item.type === "track") { - endpoint = `/track/download/${item.spotifyId}`; - } else if (item.type === "album") { - endpoint = `/album/download/${item.spotifyId}`; - } else if (item.type === "playlist") { - endpoint = `/playlist/download/${item.spotifyId}`; - } else if (item.type === "artist") { - endpoint = `/artist/download/${item.spotifyId}`; - } - - const response = await apiClient.get<{ task_id: string }>(endpoint); - const task_id = response.data.task_id; - - setItems((prev) => - prev.map((i) => (i.id === internalId ? { ...i, taskId: task_id, status: "initializing" } : i)), + const response = await apiClient.get<{ task_id: string }>( + `/${item.type}/download/${item.spotifyId}`, ); - startPolling(internalId, task_id); - } catch (error) { + const { task_id: taskId } = response.data; + + setItems(prev => + prev.map(i => + i.id === internalId + ? { ...i, id: taskId, taskId, status: "queued" } + : i, + ), + ); + + startPolling(taskId); + } catch (error: any) { console.error(`Failed to start download for ${item.name}:`, error); toast.error(`Failed to start download for ${item.name}`); - setItems((prev) => - prev.map((i) => + setItems(prev => + prev.map(i => i.id === internalId ? { ...i, @@ -243,27 +216,28 @@ export function QueueProvider({ children }: { children: ReactNode }) { [isVisible, startPolling], ); - const removeItem = useCallback( - (id: string) => { - const item = items.find((i) => i.id === id); - if (item?.taskId) { - stopPolling(item.id); - } - setItems((prev) => prev.filter((item) => item.id !== id)); - }, - [items, stopPolling], - ); + const removeItem = useCallback((id: string) => { + const item = items.find(i => i.id === id); + if (item && item.taskId) { + stopPolling(item.taskId); + apiClient.delete(`/prgs/delete/${item.taskId}`).catch(err => { + console.error(`Failed to delete task ${item.taskId} from backend`, err); + // Proceed with frontend removal anyway + }); + } + setItems(prev => prev.filter(i => i.id !== id)); + }, [items, stopPolling]); const cancelItem = useCallback( async (id: string) => { - const item = items.find((i) => i.id === id); + const item = items.find(i => i.id === id); if (!item || !item.taskId) return; try { await apiClient.post(`/prgs/cancel/${item.taskId}`); - stopPolling(id); - setItems((prev) => - prev.map((i) => + stopPolling(item.taskId); + setItems(prev => + prev.map(i => i.id === id ? { ...i, @@ -296,7 +270,7 @@ export function QueueProvider({ children }: { children: ReactNode }) { : i, ), ); - startPolling(id, item.taskId); + startPolling(item.taskId); toast.info(`Retrying download: ${item.name}`); } }, @@ -320,7 +294,7 @@ export function QueueProvider({ children }: { children: ReactNode }) { try { const taskIds = activeItems.map((item) => item.taskId!); - await apiClient.post("/prgs/cancel/many", { task_ids: taskIds }); + await apiClient.post("/prgs/cancel/all", { task_ids: taskIds }); activeItems.forEach((item) => stopPolling(item.id)); @@ -404,7 +378,11 @@ export function QueueProvider({ children }: { children: ReactNode }) { } else { newItems[existingIndex] = { ...newItems[existingIndex], ...task }; } - startPolling(task.id, task.taskId!); + if (task.taskId && !isTerminalStatus(task.status)) { + if (task.taskId && !isTerminalStatus(task.status)) { + startPolling(task.taskId); + } + } }); return newItems; });