From b67637abe8b3b152777fe67f22a996800304e596 Mon Sep 17 00:00:00 2001 From: "architect.in.git" Date: Sun, 23 Mar 2025 19:32:47 -0600 Subject: [PATCH] uuhm --- routes/prgs.py | 128 +++--- routes/utils/artist.py | 17 +- routes/utils/celery_queue_manager.py | 53 +-- routes/utils/celery_tasks.py | 21 + routes/utils/get_info.py | 17 +- routes/utils/search.py | 41 +- static/css/queue/queue.css | 76 ++++ static/js/queue.js | 564 +++++++++++++++------------ 8 files changed, 551 insertions(+), 366 deletions(-) diff --git a/routes/prgs.py b/routes/prgs.py index 1f25f97..1858ba4 100755 --- a/routes/prgs.py +++ b/routes/prgs.py @@ -3,6 +3,7 @@ import os import json import logging import time +import random from routes.utils.celery_tasks import ( get_task_info, @@ -412,8 +413,8 @@ def stream_task_status(task_id): # Sort updates by id sorted_updates = sorted(all_updates, key=lambda x: x.get("id", 0)) - # Send the most recent updates first (up to 10) - for i, update in enumerate(sorted_updates[-10:]): + # Limit to send only the 5 most recent updates to reduce initial payload + for i, update in enumerate(sorted_updates[-5:]): # Add the task_id to each update message update["task_id"] = task_id yield f"event: update\ndata: {json.dumps(update)}\n\n" @@ -429,65 +430,76 @@ def stream_task_status(task_id): # Hold the connection open and check for updates last_heartbeat = time.time() - heartbeat_interval = 15 # Send heartbeat every 15 seconds + heartbeat_interval = 30 # Increased from 15 to 30 seconds to reduce overhead + + # Optimize polling with a more efficient loop structure + check_interval = 0.2 # Check for messages every 200ms instead of continuously + message_batch_size = 5 # Process up to 5 messages at a time while True: - # Check for new updates via Redis Pub/Sub - message = redis_pubsub.get_message(timeout=1.0) - - if message and message['type'] == 'message': - # Got a new message from Redis Pub/Sub - try: - data = json.loads(message['data'].decode('utf-8')) - status_id = data.get('status_id', 0) - - # Fetch the actual status data - if status_id > last_sent_id: - all_status = redis_client.lrange(f"task:{task_id}:status", 0, -1) + # Process a batch of messages to reduce CPU usage + messages_processed = 0 + while messages_processed < message_batch_size: + # Check for new updates via Redis Pub/Sub with a timeout + message = redis_pubsub.get_message(timeout=check_interval) + + if not message: + break # No more messages to process + + if message['type'] == 'message': + messages_processed += 1 + # Got a new message from Redis Pub/Sub + try: + data = json.loads(message['data'].decode('utf-8')) + status_id = data.get('status_id', 0) - for status_data in all_status: - try: - status = json.loads(status_data.decode('utf-8')) - if status.get("id") == status_id: - # Add the task_id to the update - status["task_id"] = task_id - - # Choose the appropriate event type based on status - status_type = status.get("status", "") - event_type = "update" - - if status_type == ProgressState.COMPLETE or status_type == ProgressState.DONE: - event_type = "complete" - elif status_type == ProgressState.TRACK_COMPLETE: - # Create a distinct event type for track completion to prevent UI issues - event_type = "track_complete" - elif status_type == ProgressState.ERROR: - event_type = "error" - elif status_type in [ProgressState.TRACK_PROGRESS, ProgressState.REAL_TIME]: - event_type = "progress" - - # Send the update - yield f"event: {event_type}\ndata: {json.dumps(status)}\n\n" - last_sent_id = status_id - break - except Exception as e: - logger.error(f"Error parsing status data: {e}") - except Exception as e: - logger.error(f"Error processing Redis Pub/Sub message: {e}") + # Only process if this is a new status update + if status_id > last_sent_id: + # Efficient fetch - only get the specific status update we need + for idx in range(-10, 0): # Check last 10 entries for efficiency + status_data = redis_client.lindex(f"task:{task_id}:status", idx) + if status_data: + status = json.loads(status_data.decode('utf-8')) + if status.get("id") == status_id: + # Add the task_id to the update + status["task_id"] = task_id + + # Choose the appropriate event type based on status + status_type = status.get("status", "") + event_type = "update" + + if status_type == ProgressState.COMPLETE or status_type == ProgressState.DONE: + event_type = "complete" + elif status_type == ProgressState.TRACK_COMPLETE: + # Create a distinct event type for track completion to prevent UI issues + event_type = "track_complete" + elif status_type == ProgressState.ERROR: + event_type = "error" + elif status_type in [ProgressState.TRACK_PROGRESS, ProgressState.REAL_TIME]: + event_type = "progress" + + # Send the update + yield f"event: {event_type}\ndata: {json.dumps(status)}\n\n" + last_sent_id = status_id + break + except Exception as e: + logger.error(f"Error processing Redis Pub/Sub message: {e}") # Check if task is complete, error, or cancelled - if so, end the stream - last_status = get_last_task_status(task_id) - if last_status and last_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED, ProgressState.DONE]: - # Send final message - final_data = { - "event": "end", - "task_id": task_id, - "status": last_status.get("status"), - "message": last_status.get("message", "Download complete"), - "timestamp": time.time() - } - yield f"event: end\ndata: {json.dumps(final_data)}\n\n" - break + # Only do this check every 5 loops to reduce load + if random.random() < 0.2: # ~20% chance to check terminal status each loop + last_status = get_last_task_status(task_id) + if last_status and last_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED, ProgressState.DONE]: + # Send final message + final_data = { + "event": "end", + "task_id": task_id, + "status": last_status.get("status"), + "message": last_status.get("message", "Download complete"), + "timestamp": time.time() + } + yield f"event: end\ndata: {json.dumps(final_data)}\n\n" + break # Send a heartbeat periodically to keep the connection alive now = time.time() @@ -495,8 +507,8 @@ def stream_task_status(task_id): yield f"event: heartbeat\ndata: {json.dumps({'timestamp': now})}\n\n" last_heartbeat = now - # Small sleep to prevent CPU spinning - time.sleep(0.1) + # More efficient sleep between batch checks + time.sleep(check_interval) except Exception as e: logger.error(f"Error in SSE stream: {e}") diff --git a/routes/utils/artist.py b/routes/utils/artist.py index 65f92fd..fac85de 100644 --- a/routes/utils/artist.py +++ b/routes/utils/artist.py @@ -90,6 +90,9 @@ def download_artist_albums(url, album_type="album,single,compilation", request_a # Get artist info with albums artist_data = get_spotify_info(artist_id, "artist") + # Debug logging to inspect the structure of artist_data + logger.debug(f"Artist data structure has keys: {list(artist_data.keys() if isinstance(artist_data, dict) else [])}") + if not artist_data or 'items' not in artist_data: raise ValueError(f"Failed to retrieve artist data or no albums found for artist ID {artist_id}") @@ -125,11 +128,20 @@ def download_artist_albums(url, album_type="album,single,compilation", request_a album_task_ids = [] for album in filtered_albums: - album_url = album.get('external_urls', {}).get('spotify', '') + # Add detailed logging to inspect each album's structure and URLs + logger.debug(f"Processing album: {album.get('name', 'Unknown')}") + logger.debug(f"Album structure has keys: {list(album.keys())}") + + external_urls = album.get('external_urls', {}) + logger.debug(f"Album external_urls: {external_urls}") + + album_url = external_urls.get('spotify', '') album_name = album.get('name', 'Unknown Album') album_artists = album.get('artists', []) album_artist = album_artists[0].get('name', 'Unknown Artist') if album_artists else 'Unknown Artist' + logger.debug(f"Extracted album URL: {album_url}") + if not album_url: logger.warning(f"Skipping album without URL: {album_name}") continue @@ -146,6 +158,9 @@ def download_artist_albums(url, album_type="album,single,compilation", request_a "orig_request": request_args or {} # Store original request params } + # Debug log the task data being sent to the queue + logger.debug(f"Album task data: url={task_data['url']}, retry_url={task_data['retry_url']}") + # Add the task to the queue manager task_id = download_queue_manager.add_task(task_data) album_task_ids.append(task_id) diff --git a/routes/utils/celery_queue_manager.py b/routes/utils/celery_queue_manager.py index 107512c..075be2e 100644 --- a/routes/utils/celery_queue_manager.py +++ b/routes/utils/celery_queue_manager.py @@ -29,11 +29,11 @@ CONFIG_PATH = './config/main.json' try: with open(CONFIG_PATH, 'r') as f: config_data = json.load(f) - MAX_CONCURRENT_DL = config_data.get("maxConcurrentDownloads", 3) + MAX_CONCURRENT_DL = config_data.get("maxConcurrentDownloads", 10) except Exception as e: print(f"Error loading configuration: {e}") # Fallback default - MAX_CONCURRENT_DL = 3 + MAX_CONCURRENT_DL = 10 def get_config_params(): """ @@ -96,50 +96,36 @@ class CeleryDownloadQueueManager: def add_task(self, task): """ - Adds a new download task to the queue. + Add a new download task to the Celery queue Args: - task (dict): Dictionary containing task parameters + task (dict): Task parameters including download_type, url, etc. Returns: - str: The task ID for status tracking + str: Task ID """ try: + # Extract essential parameters download_type = task.get("download_type", "unknown") - service = task.get("service", "") - # Get common parameters from config - config_params = get_config_params() + # Debug existing task data + logger.debug(f"Adding {download_type} task with data: {json.dumps({k: v for k, v in task.items() if k != 'orig_request'})}") - # Use service from config instead of task - service = config_params.get('service') - - # Generate a unique task ID + # Create a unique task ID task_id = str(uuid.uuid4()) - # Store the original request in task info - original_request = task.get("orig_request", {}).copy() + # Get config parameters and process original request + config_params = get_config_params() - # Add essential metadata for retry operations - original_request["download_type"] = download_type + # Extract original request or use empty dict + original_request = task.get("orig_request", task.get("original_request", {})) - # Add type from download_type if not provided - if "type" not in task: - task["type"] = download_type + # Determine service (spotify or deezer) from config or request + service = original_request.get("service", config_params.get("service", "spotify")) - # Ensure key information is included - for key in ["type", "name", "artist", "service", "url"]: - if key in task and key not in original_request: - original_request[key] = task[key] - - # Add API endpoint information - if "endpoint" not in original_request: - original_request["endpoint"] = f"/api/{download_type}/download" - - # Add explicit display information for the frontend - original_request["display_title"] = task.get("name", original_request.get("name", "Unknown")) - original_request["display_type"] = task.get("type", original_request.get("type", download_type)) - original_request["display_artist"] = task.get("artist", original_request.get("artist", "")) + # Debug retry_url if present + if "retry_url" in task: + logger.debug(f"Task has retry_url: {task['retry_url']}") # Build the complete task with config parameters complete_task = { @@ -150,6 +136,9 @@ class CeleryDownloadQueueManager: "service": service, "url": task.get("url", ""), + # Preserve retry_url if present + "retry_url": task.get("retry_url", ""), + # Use config values but allow override from request "main": original_request.get("main", config_params['spotify'] if service == 'spotify' else config_params['deezer']), diff --git a/routes/utils/celery_tasks.py b/routes/utils/celery_tasks.py index 9200d35..8abf183 100644 --- a/routes/utils/celery_tasks.py +++ b/routes/utils/celery_tasks.py @@ -85,6 +85,9 @@ def store_task_status(task_id, status_data): # Convert to JSON and store in Redis redis_client.rpush(f"task:{task_id}:status", json.dumps(status_data)) + # Trim the list to keep only the most recent 100 updates to avoid excessive memory usage + redis_client.ltrim(f"task:{task_id}:status", -100, -1) + # Set expiry for the list to avoid filling up Redis with old data redis_client.expire(f"task:{task_id}:status", 60 * 60 * 24 * 7) # 7 days redis_client.expire(f"task:{task_id}:status:next_id", 60 * 60 * 24 * 7) # 7 days @@ -243,6 +246,8 @@ def retry_task(task_id): if not task_info: return {"status": "error", "message": f"Task {task_id} not found"} + logger.debug(f"Retry task {task_id} - Initial task_info: {json.dumps({k: v for k, v in task_info.items() if k != 'orig_request'})}") + # Check if task has retry_count information last_status = get_last_task_status(task_id) if last_status and last_status.get("status") == "error": @@ -272,6 +277,19 @@ def retry_task(task_id): task_info["retry_count"] = retry_count + 1 task_info["retry_of"] = task_id + # Log current URL before potentially updating it + logger.debug(f"Retry task {task_id} - Current URL: {task_info.get('url', 'N/A')}") + logger.debug(f"Retry task {task_id} - Retry URL available: {'Yes' if 'retry_url' in task_info and task_info['retry_url'] else 'No'}") + + # Use retry_url if available, otherwise use the original url + # This is crucial for album tasks created from artist downloads + if "retry_url" in task_info and task_info["retry_url"]: + logger.info(f"Using retry_url for task {task_id}: {task_info['retry_url']}") + logger.debug(f"Retry task {task_id} - Replacing URL {task_info.get('url', 'N/A')} with retry_url {task_info['retry_url']}") + task_info["url"] = task_info["retry_url"] + else: + logger.debug(f"Retry task {task_id} - No retry_url found, keeping original URL: {task_info.get('url', 'N/A')}") + # Get the service and fallback configuration from config service = config_params.get("service") fallback_enabled = config_params.get("fallback", False) @@ -318,6 +336,9 @@ def retry_task(task_id): task_info["custom_track_format"] = task_info.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%")) task_info["pad_tracks"] = task_info.get("pad_tracks", config_params.get("tracknum_padding", True)) + # Log the final URL that will be used + logger.debug(f"Retry task {task_id} - Final URL for retry: {task_info.get('url', 'N/A')}") + # Store the updated task info store_task_info(new_task_id, task_info) diff --git a/routes/utils/get_info.py b/routes/utils/get_info.py index 6aaf656..602801e 100644 --- a/routes/utils/get_info.py +++ b/routes/utils/get_info.py @@ -3,17 +3,9 @@ from deezspot.easy_spoty import Spo import json from pathlib import Path +from routes.utils.celery_queue_manager import get_config_params -# Load configuration from ./config/main.json -CONFIG_PATH = './config/main.json' -try: - with open(CONFIG_PATH, 'r') as f: - config_data = json.load(f) - # Get the main Spotify account from config - DEFAULT_SPOTIFY_ACCOUNT = config_data.get("spotify", "") -except Exception as e: - print(f"Error loading configuration: {e}") - DEFAULT_SPOTIFY_ACCOUNT = "" +# We'll rely on get_config_params() instead of directly loading the config file def get_spotify_info(spotify_id, spotify_type): """ @@ -29,8 +21,9 @@ def get_spotify_info(spotify_id, spotify_type): client_id = None client_secret = None - # Use the default account from config - main = DEFAULT_SPOTIFY_ACCOUNT + # Get config parameters including Spotify account + config_params = get_config_params() + main = config_params.get('spotify', '') if not main: raise ValueError("No Spotify account configured in settings") diff --git a/routes/utils/search.py b/routes/utils/search.py index e935366..6ea5a0e 100755 --- a/routes/utils/search.py +++ b/routes/utils/search.py @@ -1,6 +1,10 @@ from deezspot.easy_spoty import Spo import json from pathlib import Path +import logging + +# Configure logger +logger = logging.getLogger(__name__) def search( query: str, @@ -8,35 +12,48 @@ def search( limit: int = 3, main: str = None ) -> dict: + logger.info(f"Search requested: query='{query}', type={search_type}, limit={limit}, main={main}") + # If main account is specified, load client ID and secret from the account's search.json client_id = None client_secret = None if main: search_creds_path = Path(f'./creds/spotify/{main}/search.json') - + logger.debug(f"Looking for credentials at: {search_creds_path}") + if search_creds_path.exists(): try: with open(search_creds_path, 'r') as f: search_creds = json.load(f) client_id = search_creds.get('client_id') client_secret = search_creds.get('client_secret') + logger.debug(f"Credentials loaded successfully for account: {main}") except Exception as e: + logger.error(f"Error loading search credentials: {e}") print(f"Error loading search credentials: {e}") + else: + logger.warning(f"Credentials file not found at: {search_creds_path}") # Initialize the Spotify client with credentials (if available) if client_id and client_secret: + logger.debug("Initializing Spotify client with account credentials") Spo.__init__(client_id, client_secret) + else: + logger.debug("Using default Spotify client credentials") # Perform the Spotify search - # Note: We don't need to pass client_id and client_secret again in the search method - # as they've already been set during initialization - spotify_response = Spo.search( - query=query, - search_type=search_type, - limit=limit, - client_id=client_id, - client_secret=client_secret - ) - - return spotify_response + logger.debug(f"Executing Spotify search with query='{query}', type={search_type}") + try: + spotify_response = Spo.search( + query=query, + search_type=search_type, + limit=limit, + client_id=client_id, + client_secret=client_secret + ) + logger.info(f"Search completed successfully") + return spotify_response + except Exception as e: + logger.error(f"Error during Spotify search: {e}") + raise diff --git a/static/css/queue/queue.css b/static/css/queue/queue.css index 14ba98d..f162e4c 100644 --- a/static/css/queue/queue.css +++ b/static/css/queue/queue.css @@ -43,12 +43,88 @@ margin: 0; } +/* Queue subtitle with statistics */ +.queue-subtitle { + display: flex; + gap: 10px; + margin-top: 5px; + font-size: 0.8rem; + color: #b3b3b3; +} + +.queue-stat { + padding: 2px 6px; + border-radius: 4px; + font-weight: 500; +} + +.queue-stat-active { + color: #4a90e2; + background-color: rgba(74, 144, 226, 0.1); +} + +.queue-stat-completed { + color: #1DB954; + background-color: rgba(29, 185, 84, 0.1); +} + +.queue-stat-error { + color: #ff5555; + background-color: rgba(255, 85, 85, 0.1); +} + .header-actions { display: flex; gap: 10px; align-items: center; } +/* Refresh queue button */ +#refreshQueueBtn { + background: #2a2a2a; + border: none; + color: #fff; + padding: 8px; + border-radius: 4px; + cursor: pointer; + transition: background 0.3s ease, transform 0.2s ease; + display: flex; + align-items: center; + justify-content: center; +} + +#refreshQueueBtn:hover { + background: #333; + transform: translateY(-1px); +} + +#refreshQueueBtn:active { + transform: scale(0.95); +} + +#refreshQueueBtn.refreshing { + animation: spin 1s linear infinite; +} + +/* Artist queue message */ +.queue-artist-message { + background: #2a2a2a; + padding: 15px; + border-radius: 8px; + margin-bottom: 15px; + color: #fff; + text-align: center; + border-left: 4px solid #4a90e2; + animation: pulse 1.5s infinite; + font-weight: 500; +} + +@keyframes pulse { + 0% { opacity: 0.8; } + 50% { opacity: 1; } + 100% { opacity: 0.8; } +} + /* Cancel all button styling */ #cancelAllBtn { background: #8b0000; /* Dark blood red */ diff --git a/static/js/queue.js b/static/js/queue.js index 93a2d1a..2de054a 100644 --- a/static/js/queue.js +++ b/static/js/queue.js @@ -20,12 +20,14 @@ class DownloadQueue { this.MAX_RETRIES = 3; // Default max retries this.RETRY_DELAY = 5; // Default retry delay in seconds this.RETRY_DELAY_INCREASE = 5; // Default retry delay increase in seconds + this.MAX_SSE_CONNECTIONS = 5; // Maximum number of active SSE connections this.downloadQueue = {}; // keyed by unique queueId this.currentConfig = {}; // Cache for current config // EventSource connections for SSE tracking this.sseConnections = {}; // keyed by prgFile/task_id + this.pendingForSSE = []; // Queue of entries waiting for SSE connections // Load the saved visible count (or default to 10) const storedVisibleCount = localStorage.getItem("downloadQueueVisibleCount"); @@ -34,6 +36,9 @@ class DownloadQueue { // Load the cached status info (object keyed by prgFile) this.queueCache = JSON.parse(localStorage.getItem("downloadQueueCache") || "{}"); + // Add a throttled update method to reduce UI updates + this.throttledUpdateQueue = this.throttle(this.updateQueueOrder.bind(this), 500); + // Wait for initDOM to complete before setting up event listeners and loading existing PRG files. this.initDOM().then(() => { this.initEventListeners(); @@ -41,6 +46,25 @@ class DownloadQueue { }); } + /* Utility method to throttle frequent function calls */ + throttle(func, delay) { + let lastCall = 0; + let timeout; + return function(...args) { + const now = Date.now(); + if (now - lastCall < delay) { + clearTimeout(timeout); + timeout = setTimeout(() => { + lastCall = now; + func(...args); + }, delay); + } else { + lastCall = now; + func(...args); + } + }; + } + /* DOM Management */ async initDOM() { // New HTML structure for the download queue. @@ -53,6 +77,14 @@ class DownloadQueue { Skull Cancel all +
@@ -129,6 +161,24 @@ class DownloadQueue { }); } + // "Refresh queue" button + const refreshQueueBtn = document.getElementById('refreshQueueBtn'); + if (refreshQueueBtn) { + refreshQueueBtn.addEventListener('click', async () => { + try { + refreshQueueBtn.disabled = true; + refreshQueueBtn.classList.add('refreshing'); + await this.loadExistingPrgFiles(); + console.log('Queue refreshed'); + } catch (error) { + console.error('Error refreshing queue:', error); + } finally { + refreshQueueBtn.disabled = false; + refreshQueueBtn.classList.remove('refreshing'); + } + }); + } + // Close all SSE connections when the page is about to unload window.addEventListener('beforeunload', () => { this.closeAllSSEConnections(); @@ -509,6 +559,8 @@ class DownloadQueue { updateQueueOrder() { const container = document.getElementById('queueItems'); const footer = document.getElementById('queueFooter'); + if (!container || !footer) return; + const entries = Object.values(this.downloadQueue); // Sorting: errors/canceled first (group 0), ongoing next (group 1), queued last (group 2, sorted by position). @@ -536,58 +588,90 @@ class DownloadQueue { } }); - document.getElementById('queueTotalCount').textContent = entries.length; + // Calculate statistics to display in the header + const totalEntries = entries.length; + const completedEntries = entries.filter(e => e.hasEnded && e.lastStatus && e.lastStatus.status === 'complete').length; + const errorEntries = entries.filter(e => e.hasEnded && e.lastStatus && e.lastStatus.status === 'error').length; + const activeEntries = entries.filter(e => !e.hasEnded).length; - // Only recreate the container content if really needed - const visibleEntries = entries.slice(0, this.visibleCount); + // Update the header with detailed count + const countEl = document.getElementById('queueTotalCount'); + if (countEl) { + countEl.textContent = totalEntries; + } - // Handle empty state - if (entries.length === 0) { - container.innerHTML = ` -
- Empty queue -

Your download queue is empty

-
- `; - } else { - // Get currently visible items - const visibleItems = Array.from(container.children).filter(el => el.classList.contains('queue-item')); + // Update subtitle with detailed stats if we have entries + if (totalEntries > 0) { + let statsHtml = ''; + if (activeEntries > 0) { + statsHtml += `${activeEntries} active`; + } + if (completedEntries > 0) { + statsHtml += `${completedEntries} completed`; + } + if (errorEntries > 0) { + statsHtml += `${errorEntries} failed`; + } - // Update container more efficiently - if (visibleItems.length === 0) { - // No items in container, append all visible entries - container.innerHTML = ''; // Clear any empty state - visibleEntries.forEach(entry => { - // We no longer automatically start monitoring here - // Monitoring is now explicitly started by the methods that create downloads - container.appendChild(entry.element); - }); - } else { - // Container already has items, update more efficiently - - // Create a map of current DOM elements by queue ID - const existingElementMap = {}; - visibleItems.forEach(el => { - const queueId = el.querySelector('.cancel-btn')?.dataset.queueid; - if (queueId) existingElementMap[queueId] = el; - }); - - // Clear container to re-add in correct order - container.innerHTML = ''; - - // Add visible entries in correct order - visibleEntries.forEach(entry => { - // We no longer automatically start monitoring here - container.appendChild(entry.element); - - // Mark the entry as not new anymore - entry.isNew = false; - }); + // Only add the subtitle if we have stats to show + if (statsHtml) { + const subtitleEl = document.getElementById('queueSubtitle'); + if (subtitleEl) { + subtitleEl.innerHTML = statsHtml; + } else { + // Create the subtitle if it doesn't exist + const headerEl = document.querySelector('.sidebar-header h2'); + if (headerEl) { + headerEl.insertAdjacentHTML('afterend', `
${statsHtml}
`); + } + } + } + } else { + // Remove subtitle if no entries + const subtitleEl = document.getElementById('queueSubtitle'); + if (subtitleEl) { + subtitleEl.remove(); } } - // We no longer start or stop monitoring based on visibility changes here - // This allows the explicit monitoring control from the download methods + // Use DocumentFragment for better performance when updating the DOM + const fragment = document.createDocumentFragment(); + + // Handle empty state + if (entries.length === 0) { + const emptyDiv = document.createElement('div'); + emptyDiv.className = 'queue-empty'; + emptyDiv.innerHTML = ` + Empty queue +

Your download queue is empty

+ `; + container.innerHTML = ''; + container.appendChild(emptyDiv); + } else { + // Get the visible entries slice + const visibleEntries = entries.slice(0, this.visibleCount); + + // Create a map of current DOM elements by queue ID + const existingElements = container.querySelectorAll('.queue-item'); + const existingElementMap = {}; + Array.from(existingElements).forEach(el => { + const cancelBtn = el.querySelector('.cancel-btn'); + if (cancelBtn) { + const queueId = cancelBtn.dataset.queueid; + if (queueId) existingElementMap[queueId] = el; + } + }); + + // Add visible entries to the fragment in the correct order + visibleEntries.forEach(entry => { + fragment.appendChild(entry.element); + entry.isNew = false; + }); + + // Clear container and append the fragment + container.innerHTML = ''; + container.appendChild(fragment); + } // Update footer footer.innerHTML = ''; @@ -951,8 +1035,14 @@ class DownloadQueue { // Close any existing SSE connection this.closeSSEConnection(queueId); + // For album tasks created from artist downloads, we need to ensure + // we're using the album URL, not the original artist URL + let retryUrl = entry.requestUrl; + + console.log(`Retrying download for ${entry.type} with URL: ${retryUrl}`); + // Use the stored original request URL to create a new download - const retryResponse = await fetch(entry.requestUrl); + const retryResponse = await fetch(retryUrl); if (!retryResponse.ok) { throw new Error(`Server returned ${retryResponse.status}`); } @@ -1052,29 +1142,63 @@ class DownloadQueue { const data = await response.json(); - // Handle artist downloads which return multiple album_prg_files - if (type === 'artist' && data.album_prg_files && Array.isArray(data.album_prg_files)) { - // Add each album to the download queue separately - const queueIds = []; - data.album_prg_files.forEach(prgFile => { - const queueId = this.addDownload(item, 'album', prgFile, apiUrl, false); - queueIds.push({queueId, prgFile}); - }); - - // Wait a short time before setting up SSE connections - await new Promise(resolve => setTimeout(resolve, 1000)); - - // Set up SSE connections for each entry - for (const {queueId, prgFile} of queueIds) { - const entry = this.downloadQueue[queueId]; - if (entry && !entry.hasEnded) { - this.setupSSEConnection(queueId); + // Handle artist downloads which return multiple album tasks + if (type === 'artist') { + // Check for new API response format + if (data.task_ids && Array.isArray(data.task_ids)) { + // For artist discographies, we get individual task IDs for each album + console.log(`Queued artist discography with ${data.task_ids.length} albums`); + + // Make queue visible to show progress + this.toggleVisibility(true); + + // Show a temporary message about the artist download + const artistMessage = document.createElement('div'); + artistMessage.className = 'queue-artist-message'; + artistMessage.textContent = `Queued ${data.task_ids.length} albums for ${item.name || 'artist'}. Loading...`; + document.getElementById('queueItems').prepend(artistMessage); + + // Wait a moment to ensure backend has processed the tasks + await new Promise(resolve => setTimeout(resolve, 1500)); + + // Remove the temporary message + artistMessage.remove(); + + // Fetch the latest tasks to show all newly created album downloads + await this.loadExistingPrgFiles(); + + return data.task_ids; + } + // Check for older API response format + else if (data.album_prg_files && Array.isArray(data.album_prg_files)) { + console.log(`Queued artist discography with ${data.album_prg_files.length} albums (old format)`); + // Add each album to the download queue separately + const queueIds = []; + data.album_prg_files.forEach(prgFile => { + const queueId = this.addDownload(item, 'album', prgFile, apiUrl, false); + queueIds.push({queueId, prgFile}); + }); + + // Make queue visible to show progress + this.toggleVisibility(true); + + // Wait a short time before setting up SSE connections + await new Promise(resolve => setTimeout(resolve, 1000)); + + // Set up SSE connections for each entry + for (const {queueId, prgFile} of queueIds) { + const entry = this.downloadQueue[queueId]; + if (entry && !entry.hasEnded) { + this.setupSSEConnection(queueId); + } } + + return queueIds.map(({queueId}) => queueId); } - - return queueIds.map(({queueId}) => queueId); - } else if (data.prg_file) { - // Handle single-file downloads (tracks, albums, playlists) + } + + // Handle single-file downloads (tracks, albums, playlists) + if (data.prg_file) { const queueId = this.addDownload(item, type, data.prg_file, apiUrl, false); // Wait a short time before setting up SSE connection @@ -1101,6 +1225,16 @@ class DownloadQueue { */ async loadExistingPrgFiles() { try { + // Clear existing queue entries first to avoid duplicates when refreshing + for (const queueId in this.downloadQueue) { + const entry = this.downloadQueue[queueId]; + // Close any active connections + this.closeSSEConnection(queueId); + + // Don't remove the entry from DOM - we'll rebuild it entirely + delete this.downloadQueue[queueId]; + } + const response = await fetch('/api/prgs/list'); const prgFiles = await response.json(); @@ -1284,6 +1418,17 @@ class DownloadQueue { // Close any existing connection this.closeSSEConnection(queueId); + // Check if we're at the connection limit + const activeConnectionCount = Object.keys(this.sseConnections).length; + if (activeConnectionCount >= this.MAX_SSE_CONNECTIONS) { + // Add to pending queue instead of creating connection now + if (!this.pendingForSSE.includes(queueId)) { + this.pendingForSSE.push(queueId); + console.log(`Queued SSE connection for ${queueId} (max connections reached)`); + } + return; + } + // Create a new EventSource connection try { const sse = new EventSource(`/api/prgs/stream/${entry.prgFile}`); @@ -1321,96 +1466,44 @@ class DownloadQueue { entry.status = data.status; }); - sse.addEventListener('update', (event) => { + // Combined handler for all update-style events + const updateHandler = (event) => { const data = JSON.parse(event.data); - console.log('SSE update event:', data); - this.handleSSEUpdate(queueId, data); - }); - - sse.addEventListener('progress', (event) => { - const data = JSON.parse(event.data); - console.log('SSE progress event:', data); - this.handleSSEUpdate(queueId, data); - }); - - // Add specific handler for track_complete events - sse.addEventListener('track_complete', (event) => { - const data = JSON.parse(event.data); - console.log('SSE track_complete event:', data); - console.log(`Current entry type: ${entry.type}`); + const eventType = event.type; - // Mark this status as a track completion - data.status = 'track_complete'; - - // Only update the log message without changing status colors - const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); - if (logElement) { - let message = `Completed track: ${data.title || data.track || 'Unknown'}`; - if (data.artist) message += ` by ${data.artist}`; - logElement.textContent = message; - } - - // For single track downloads, track_complete is a terminal state - if (entry.type === 'track') { - console.log('Single track download completed - terminating'); - // Mark the track as ended - entry.hasEnded = true; + if (eventType === 'track_complete') { + // Special handling for track completions + console.log('SSE track_complete event:', data); - // Handle as a terminal state - setTimeout(() => { - this.closeSSEConnection(queueId); - this.cleanupEntry(queueId); - }, 5000); - } else { - console.log(`Album/playlist track completed - continuing download (type: ${entry.type})`); - // For albums/playlists, just update entry data without changing status - entry.lastStatus = data; - entry.lastUpdated = Date.now(); + // Mark this status as a track completion + data.status = 'track_complete'; - // Save to cache - this.queueCache[entry.prgFile] = data; - localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); - } - }); - - // Also handle 'done' events which can come for individual tracks - sse.addEventListener('done', (event) => { - const data = JSON.parse(event.data); - console.log('SSE done event (individual track):', data); - console.log(`Current entry type: ${entry.type}`); - - // Only update the log message without changing status colors for album tracks - const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); - if (logElement) { - let message = `Completed track: ${data.song || data.title || data.track || 'Unknown'}`; - if (data.artist) message += ` by ${data.artist}`; - logElement.textContent = message; - } - - // For single track downloads, done is a terminal state - if (entry.type === 'track') { - console.log('Single track download completed (done) - terminating'); - // Mark the track as ended - entry.hasEnded = true; + // Only update the log message without changing status colors + const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); + if (logElement) { + let message = `Completed track: ${data.title || data.track || 'Unknown'}`; + if (data.artist) message += ` by ${data.artist}`; + logElement.textContent = message; + } - // Handle as a terminal state - setTimeout(() => { - this.closeSSEConnection(queueId); - this.cleanupEntry(queueId); - }, 5000); - } else if (data.song) { - console.log(`Album/playlist individual track done - continuing download (type: ${entry.type})`); - // For albums/playlists, just update entry data without changing status - data._isIndividualTrack = true; // Mark it for special handling in update logic - entry.lastStatus = data; - entry.lastUpdated = Date.now(); + // For single track downloads, track_complete is a terminal state + if (entry.type === 'track') { + entry.hasEnded = true; + setTimeout(() => { + this.closeSSEConnection(queueId); + this.cleanupEntry(queueId); + }, 5000); + } else { + // For albums/playlists, just update entry data without changing status + entry.lastStatus = data; + entry.lastUpdated = Date.now(); + this.queueCache[entry.prgFile] = data; + localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); + } + } else if (eventType === 'complete' || eventType === 'done') { + // Terminal state handling + console.log(`SSE ${eventType} event:`, data); - // Save to cache - this.queueCache[entry.prgFile] = data; - localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); - } else { - // This is a real done event for the entire album/playlist - console.log(`Entire ${entry.type} completed - finalizing`); this.handleSSEUpdate(queueId, data); entry.hasEnded = true; @@ -1418,91 +1511,38 @@ class DownloadQueue { this.closeSSEConnection(queueId); this.cleanupEntry(queueId); }, 5000); - } - }); - - sse.addEventListener('complete', (event) => { - const data = JSON.parse(event.data); - console.log('SSE complete event:', data); - console.log(`Current entry type: ${entry.type}`); - - // Skip terminal processing for track_complete status in albums/playlists - // Also skip for "done" status when it's for an individual track in an album/playlist - if ((data.status === 'track_complete' && entry.type !== 'track') || - (data.status === 'done' && data.song && entry.type !== 'track')) { - console.log(`Track ${data.status} in ${entry.type} download - continuing`); - // Don't process individual track completion events here - return; - } - - // Make sure the status is set to 'complete' for UI purposes - if (!data.status || data.status === '') { - data.status = 'complete'; - } - - // For track downloads, make sure we have a proper name - if (entry.type === 'track' && !data.name && entry.lastStatus) { - data.name = entry.lastStatus.name || ''; - data.artist = entry.lastStatus.artist || ''; - } - - this.handleSSEUpdate(queueId, data); - - // Always mark as terminal state for 'complete' events (except individual track completions in albums) - entry.hasEnded = true; - - // Close the connection after a short delay - setTimeout(() => { + } else if (eventType === 'error') { + // Error state handling + console.log('SSE error event:', data); + this.handleSSEUpdate(queueId, data); + entry.hasEnded = true; this.closeSSEConnection(queueId); - this.cleanupEntry(queueId); - }, 5000); - }); - - sse.addEventListener('error', (event) => { - const data = JSON.parse(event.data); - console.log('SSE error event:', data); - this.handleSSEUpdate(queueId, data); - - // Mark the download as ended with error - entry.hasEnded = true; - - // Close the connection, but don't automatically clean up the entry - // to allow for potential retry - this.closeSSEConnection(queueId); - }); - - sse.addEventListener('end', (event) => { - const data = JSON.parse(event.data); - console.log('SSE end event:', data); - - // For track downloads, ensure we have the proper fields for UI display - if (entry.type === 'track') { - // If the end event doesn't have a name/artist, copy from lastStatus - if ((!data.name || !data.artist) && entry.lastStatus) { - data.name = data.name || entry.lastStatus.name || ''; - data.artist = data.artist || entry.lastStatus.artist || ''; - } + } else if (eventType === 'end') { + // End event handling + console.log('SSE end event:', data); - // Force status to 'complete' if not provided - if (!data.status || data.status === '') { - data.status = 'complete'; + // Update with final status + this.handleSSEUpdate(queueId, data); + entry.hasEnded = true; + this.closeSSEConnection(queueId); + + if (data.status === 'complete' || data.status === 'done') { + setTimeout(() => this.cleanupEntry(queueId), 5000); } + } else { + // Standard update handling + this.handleSSEUpdate(queueId, data); } - - // Update with final status - this.handleSSEUpdate(queueId, data); - - // Mark the download as ended - entry.hasEnded = true; - - // Close the connection - this.closeSSEConnection(queueId); - - // Clean up the entry after a delay if it's a success - if (data.status === 'complete' || data.status === 'done') { - setTimeout(() => this.cleanupEntry(queueId), 5000); - } - }); + }; + + // Set up shared handler for all events + sse.addEventListener('update', updateHandler); + sse.addEventListener('progress', updateHandler); + sse.addEventListener('track_complete', updateHandler); + sse.addEventListener('complete', updateHandler); + sse.addEventListener('done', updateHandler); + sse.addEventListener('error', updateHandler); + sse.addEventListener('end', updateHandler); // Handle connection error sse.onerror = (error) => { @@ -1537,6 +1577,13 @@ class DownloadQueue { console.error('Error closing SSE connection:', error); } delete this.sseConnections[queueId]; + + // Now that we've freed a slot, check if any entries are waiting for an SSE connection + if (this.pendingForSSE.length > 0) { + const nextQueueId = this.pendingForSSE.shift(); + console.log(`Starting SSE connection for queued entry ${nextQueueId}`); + this.setupSSEConnection(nextQueueId); + } } } @@ -1552,8 +1599,6 @@ class DownloadQueue { return; } - console.log(`handleSSEUpdate for ${queueId} with type ${entry.type} and status ${data.status}`); - // Track completion is special - don't change visible status ONLY for albums/playlists // Check for both 'track_complete' and 'done' statuses for individual tracks in albums const isTrackCompletion = data.status === 'track_complete' || @@ -1574,29 +1619,46 @@ class DownloadQueue { entry.status = data.status; } - // Update status message in the UI - const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); - if (logElement) { - const statusMessage = this.getStatusMessage(data); - logElement.textContent = statusMessage; - } + // Update status message in the UI - use a more efficient approach + this.updateEntryStatusUI(entry, data, skipStatusChange); - // Apply appropriate CSS classes based on status only if not skipping status change - if (!skipStatusChange) { - this.applyStatusClasses(entry, data); - } - - // Save updated status to cache - this.queueCache[entry.prgFile] = data; - localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); + // Save updated status to cache - debounce these writes to reduce storage operations + clearTimeout(entry.cacheWriteTimeout); + entry.cacheWriteTimeout = setTimeout(() => { + this.queueCache[entry.prgFile] = data; + localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); + }, 500); // Special handling for error status if (data.status === 'error') { this.handleTerminalState(entry, queueId, data); } - // Update the queue order - this.updateQueueOrder(); + // Throttle UI updates to improve performance with multiple downloads + this.throttledUpdateQueue(); + } + + // Optimized method to update the entry status in the UI + updateEntryStatusUI(entry, data, skipStatusChange) { + // First, update the log message text if the element exists + const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); + if (logElement) { + // Only modify the text content if it doesn't already have child elements + // (which would be the case for error states with retry buttons) + if (!logElement.querySelector('.error-message')) { + const statusMessage = this.getStatusMessage(data); + + // Only update DOM if the text has changed + if (logElement.textContent !== statusMessage) { + logElement.textContent = statusMessage; + } + } + } + + // Apply CSS classes for status indication only if we're not skipping status changes + if (!skipStatusChange) { + this.applyStatusClasses(entry, data); + } } /* Close all active SSE connections */