diff --git a/routes/prgs.py b/routes/prgs.py
index 1f25f97..1858ba4 100755
--- a/routes/prgs.py
+++ b/routes/prgs.py
@@ -3,6 +3,7 @@ import os
import json
import logging
import time
+import random
from routes.utils.celery_tasks import (
get_task_info,
@@ -412,8 +413,8 @@ def stream_task_status(task_id):
# Sort updates by id
sorted_updates = sorted(all_updates, key=lambda x: x.get("id", 0))
- # Send the most recent updates first (up to 10)
- for i, update in enumerate(sorted_updates[-10:]):
+ # Limit to send only the 5 most recent updates to reduce initial payload
+ for i, update in enumerate(sorted_updates[-5:]):
# Add the task_id to each update message
update["task_id"] = task_id
yield f"event: update\ndata: {json.dumps(update)}\n\n"
@@ -429,65 +430,76 @@ def stream_task_status(task_id):
# Hold the connection open and check for updates
last_heartbeat = time.time()
- heartbeat_interval = 15 # Send heartbeat every 15 seconds
+ heartbeat_interval = 30 # Increased from 15 to 30 seconds to reduce overhead
+
+ # Optimize polling with a more efficient loop structure
+ check_interval = 0.2 # Check for messages every 200ms instead of continuously
+ message_batch_size = 5 # Process up to 5 messages at a time
while True:
- # Check for new updates via Redis Pub/Sub
- message = redis_pubsub.get_message(timeout=1.0)
-
- if message and message['type'] == 'message':
- # Got a new message from Redis Pub/Sub
- try:
- data = json.loads(message['data'].decode('utf-8'))
- status_id = data.get('status_id', 0)
-
- # Fetch the actual status data
- if status_id > last_sent_id:
- all_status = redis_client.lrange(f"task:{task_id}:status", 0, -1)
+ # Process a batch of messages to reduce CPU usage
+ messages_processed = 0
+ while messages_processed < message_batch_size:
+ # Check for new updates via Redis Pub/Sub with a timeout
+ message = redis_pubsub.get_message(timeout=check_interval)
+
+ if not message:
+ break # No more messages to process
+
+ if message['type'] == 'message':
+ messages_processed += 1
+ # Got a new message from Redis Pub/Sub
+ try:
+ data = json.loads(message['data'].decode('utf-8'))
+ status_id = data.get('status_id', 0)
- for status_data in all_status:
- try:
- status = json.loads(status_data.decode('utf-8'))
- if status.get("id") == status_id:
- # Add the task_id to the update
- status["task_id"] = task_id
-
- # Choose the appropriate event type based on status
- status_type = status.get("status", "")
- event_type = "update"
-
- if status_type == ProgressState.COMPLETE or status_type == ProgressState.DONE:
- event_type = "complete"
- elif status_type == ProgressState.TRACK_COMPLETE:
- # Create a distinct event type for track completion to prevent UI issues
- event_type = "track_complete"
- elif status_type == ProgressState.ERROR:
- event_type = "error"
- elif status_type in [ProgressState.TRACK_PROGRESS, ProgressState.REAL_TIME]:
- event_type = "progress"
-
- # Send the update
- yield f"event: {event_type}\ndata: {json.dumps(status)}\n\n"
- last_sent_id = status_id
- break
- except Exception as e:
- logger.error(f"Error parsing status data: {e}")
- except Exception as e:
- logger.error(f"Error processing Redis Pub/Sub message: {e}")
+ # Only process if this is a new status update
+ if status_id > last_sent_id:
+ # Efficient fetch - only get the specific status update we need
+ for idx in range(-10, 0): # Check last 10 entries for efficiency
+ status_data = redis_client.lindex(f"task:{task_id}:status", idx)
+ if status_data:
+ status = json.loads(status_data.decode('utf-8'))
+ if status.get("id") == status_id:
+ # Add the task_id to the update
+ status["task_id"] = task_id
+
+ # Choose the appropriate event type based on status
+ status_type = status.get("status", "")
+ event_type = "update"
+
+ if status_type == ProgressState.COMPLETE or status_type == ProgressState.DONE:
+ event_type = "complete"
+ elif status_type == ProgressState.TRACK_COMPLETE:
+ # Create a distinct event type for track completion to prevent UI issues
+ event_type = "track_complete"
+ elif status_type == ProgressState.ERROR:
+ event_type = "error"
+ elif status_type in [ProgressState.TRACK_PROGRESS, ProgressState.REAL_TIME]:
+ event_type = "progress"
+
+ # Send the update
+ yield f"event: {event_type}\ndata: {json.dumps(status)}\n\n"
+ last_sent_id = status_id
+ break
+ except Exception as e:
+ logger.error(f"Error processing Redis Pub/Sub message: {e}")
# Check if task is complete, error, or cancelled - if so, end the stream
- last_status = get_last_task_status(task_id)
- if last_status and last_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED, ProgressState.DONE]:
- # Send final message
- final_data = {
- "event": "end",
- "task_id": task_id,
- "status": last_status.get("status"),
- "message": last_status.get("message", "Download complete"),
- "timestamp": time.time()
- }
- yield f"event: end\ndata: {json.dumps(final_data)}\n\n"
- break
+ # Only do this check every 5 loops to reduce load
+ if random.random() < 0.2: # ~20% chance to check terminal status each loop
+ last_status = get_last_task_status(task_id)
+ if last_status and last_status.get("status") in [ProgressState.COMPLETE, ProgressState.ERROR, ProgressState.CANCELLED, ProgressState.DONE]:
+ # Send final message
+ final_data = {
+ "event": "end",
+ "task_id": task_id,
+ "status": last_status.get("status"),
+ "message": last_status.get("message", "Download complete"),
+ "timestamp": time.time()
+ }
+ yield f"event: end\ndata: {json.dumps(final_data)}\n\n"
+ break
# Send a heartbeat periodically to keep the connection alive
now = time.time()
@@ -495,8 +507,8 @@ def stream_task_status(task_id):
yield f"event: heartbeat\ndata: {json.dumps({'timestamp': now})}\n\n"
last_heartbeat = now
- # Small sleep to prevent CPU spinning
- time.sleep(0.1)
+ # More efficient sleep between batch checks
+ time.sleep(check_interval)
except Exception as e:
logger.error(f"Error in SSE stream: {e}")
diff --git a/routes/utils/artist.py b/routes/utils/artist.py
index 65f92fd..fac85de 100644
--- a/routes/utils/artist.py
+++ b/routes/utils/artist.py
@@ -90,6 +90,9 @@ def download_artist_albums(url, album_type="album,single,compilation", request_a
# Get artist info with albums
artist_data = get_spotify_info(artist_id, "artist")
+ # Debug logging to inspect the structure of artist_data
+ logger.debug(f"Artist data structure has keys: {list(artist_data.keys() if isinstance(artist_data, dict) else [])}")
+
if not artist_data or 'items' not in artist_data:
raise ValueError(f"Failed to retrieve artist data or no albums found for artist ID {artist_id}")
@@ -125,11 +128,20 @@ def download_artist_albums(url, album_type="album,single,compilation", request_a
album_task_ids = []
for album in filtered_albums:
- album_url = album.get('external_urls', {}).get('spotify', '')
+ # Add detailed logging to inspect each album's structure and URLs
+ logger.debug(f"Processing album: {album.get('name', 'Unknown')}")
+ logger.debug(f"Album structure has keys: {list(album.keys())}")
+
+ external_urls = album.get('external_urls', {})
+ logger.debug(f"Album external_urls: {external_urls}")
+
+ album_url = external_urls.get('spotify', '')
album_name = album.get('name', 'Unknown Album')
album_artists = album.get('artists', [])
album_artist = album_artists[0].get('name', 'Unknown Artist') if album_artists else 'Unknown Artist'
+ logger.debug(f"Extracted album URL: {album_url}")
+
if not album_url:
logger.warning(f"Skipping album without URL: {album_name}")
continue
@@ -146,6 +158,9 @@ def download_artist_albums(url, album_type="album,single,compilation", request_a
"orig_request": request_args or {} # Store original request params
}
+ # Debug log the task data being sent to the queue
+ logger.debug(f"Album task data: url={task_data['url']}, retry_url={task_data['retry_url']}")
+
# Add the task to the queue manager
task_id = download_queue_manager.add_task(task_data)
album_task_ids.append(task_id)
diff --git a/routes/utils/celery_queue_manager.py b/routes/utils/celery_queue_manager.py
index 107512c..075be2e 100644
--- a/routes/utils/celery_queue_manager.py
+++ b/routes/utils/celery_queue_manager.py
@@ -29,11 +29,11 @@ CONFIG_PATH = './config/main.json'
try:
with open(CONFIG_PATH, 'r') as f:
config_data = json.load(f)
- MAX_CONCURRENT_DL = config_data.get("maxConcurrentDownloads", 3)
+ MAX_CONCURRENT_DL = config_data.get("maxConcurrentDownloads", 10)
except Exception as e:
print(f"Error loading configuration: {e}")
# Fallback default
- MAX_CONCURRENT_DL = 3
+ MAX_CONCURRENT_DL = 10
def get_config_params():
"""
@@ -96,50 +96,36 @@ class CeleryDownloadQueueManager:
def add_task(self, task):
"""
- Adds a new download task to the queue.
+ Add a new download task to the Celery queue
Args:
- task (dict): Dictionary containing task parameters
+ task (dict): Task parameters including download_type, url, etc.
Returns:
- str: The task ID for status tracking
+ str: Task ID
"""
try:
+ # Extract essential parameters
download_type = task.get("download_type", "unknown")
- service = task.get("service", "")
- # Get common parameters from config
- config_params = get_config_params()
+ # Debug existing task data
+ logger.debug(f"Adding {download_type} task with data: {json.dumps({k: v for k, v in task.items() if k != 'orig_request'})}")
- # Use service from config instead of task
- service = config_params.get('service')
-
- # Generate a unique task ID
+ # Create a unique task ID
task_id = str(uuid.uuid4())
- # Store the original request in task info
- original_request = task.get("orig_request", {}).copy()
+ # Get config parameters and process original request
+ config_params = get_config_params()
- # Add essential metadata for retry operations
- original_request["download_type"] = download_type
+ # Extract original request or use empty dict
+ original_request = task.get("orig_request", task.get("original_request", {}))
- # Add type from download_type if not provided
- if "type" not in task:
- task["type"] = download_type
+ # Determine service (spotify or deezer) from config or request
+ service = original_request.get("service", config_params.get("service", "spotify"))
- # Ensure key information is included
- for key in ["type", "name", "artist", "service", "url"]:
- if key in task and key not in original_request:
- original_request[key] = task[key]
-
- # Add API endpoint information
- if "endpoint" not in original_request:
- original_request["endpoint"] = f"/api/{download_type}/download"
-
- # Add explicit display information for the frontend
- original_request["display_title"] = task.get("name", original_request.get("name", "Unknown"))
- original_request["display_type"] = task.get("type", original_request.get("type", download_type))
- original_request["display_artist"] = task.get("artist", original_request.get("artist", ""))
+ # Debug retry_url if present
+ if "retry_url" in task:
+ logger.debug(f"Task has retry_url: {task['retry_url']}")
# Build the complete task with config parameters
complete_task = {
@@ -150,6 +136,9 @@ class CeleryDownloadQueueManager:
"service": service,
"url": task.get("url", ""),
+ # Preserve retry_url if present
+ "retry_url": task.get("retry_url", ""),
+
# Use config values but allow override from request
"main": original_request.get("main",
config_params['spotify'] if service == 'spotify' else config_params['deezer']),
diff --git a/routes/utils/celery_tasks.py b/routes/utils/celery_tasks.py
index 9200d35..8abf183 100644
--- a/routes/utils/celery_tasks.py
+++ b/routes/utils/celery_tasks.py
@@ -85,6 +85,9 @@ def store_task_status(task_id, status_data):
# Convert to JSON and store in Redis
redis_client.rpush(f"task:{task_id}:status", json.dumps(status_data))
+ # Trim the list to keep only the most recent 100 updates to avoid excessive memory usage
+ redis_client.ltrim(f"task:{task_id}:status", -100, -1)
+
# Set expiry for the list to avoid filling up Redis with old data
redis_client.expire(f"task:{task_id}:status", 60 * 60 * 24 * 7) # 7 days
redis_client.expire(f"task:{task_id}:status:next_id", 60 * 60 * 24 * 7) # 7 days
@@ -243,6 +246,8 @@ def retry_task(task_id):
if not task_info:
return {"status": "error", "message": f"Task {task_id} not found"}
+ logger.debug(f"Retry task {task_id} - Initial task_info: {json.dumps({k: v for k, v in task_info.items() if k != 'orig_request'})}")
+
# Check if task has retry_count information
last_status = get_last_task_status(task_id)
if last_status and last_status.get("status") == "error":
@@ -272,6 +277,19 @@ def retry_task(task_id):
task_info["retry_count"] = retry_count + 1
task_info["retry_of"] = task_id
+ # Log current URL before potentially updating it
+ logger.debug(f"Retry task {task_id} - Current URL: {task_info.get('url', 'N/A')}")
+ logger.debug(f"Retry task {task_id} - Retry URL available: {'Yes' if 'retry_url' in task_info and task_info['retry_url'] else 'No'}")
+
+ # Use retry_url if available, otherwise use the original url
+ # This is crucial for album tasks created from artist downloads
+ if "retry_url" in task_info and task_info["retry_url"]:
+ logger.info(f"Using retry_url for task {task_id}: {task_info['retry_url']}")
+ logger.debug(f"Retry task {task_id} - Replacing URL {task_info.get('url', 'N/A')} with retry_url {task_info['retry_url']}")
+ task_info["url"] = task_info["retry_url"]
+ else:
+ logger.debug(f"Retry task {task_id} - No retry_url found, keeping original URL: {task_info.get('url', 'N/A')}")
+
# Get the service and fallback configuration from config
service = config_params.get("service")
fallback_enabled = config_params.get("fallback", False)
@@ -318,6 +336,9 @@ def retry_task(task_id):
task_info["custom_track_format"] = task_info.get("custom_track_format", config_params.get("customTrackFormat", "%tracknum%. %music%"))
task_info["pad_tracks"] = task_info.get("pad_tracks", config_params.get("tracknum_padding", True))
+ # Log the final URL that will be used
+ logger.debug(f"Retry task {task_id} - Final URL for retry: {task_info.get('url', 'N/A')}")
+
# Store the updated task info
store_task_info(new_task_id, task_info)
diff --git a/routes/utils/get_info.py b/routes/utils/get_info.py
index 6aaf656..602801e 100644
--- a/routes/utils/get_info.py
+++ b/routes/utils/get_info.py
@@ -3,17 +3,9 @@
from deezspot.easy_spoty import Spo
import json
from pathlib import Path
+from routes.utils.celery_queue_manager import get_config_params
-# Load configuration from ./config/main.json
-CONFIG_PATH = './config/main.json'
-try:
- with open(CONFIG_PATH, 'r') as f:
- config_data = json.load(f)
- # Get the main Spotify account from config
- DEFAULT_SPOTIFY_ACCOUNT = config_data.get("spotify", "")
-except Exception as e:
- print(f"Error loading configuration: {e}")
- DEFAULT_SPOTIFY_ACCOUNT = ""
+# We'll rely on get_config_params() instead of directly loading the config file
def get_spotify_info(spotify_id, spotify_type):
"""
@@ -29,8 +21,9 @@ def get_spotify_info(spotify_id, spotify_type):
client_id = None
client_secret = None
- # Use the default account from config
- main = DEFAULT_SPOTIFY_ACCOUNT
+ # Get config parameters including Spotify account
+ config_params = get_config_params()
+ main = config_params.get('spotify', '')
if not main:
raise ValueError("No Spotify account configured in settings")
diff --git a/routes/utils/search.py b/routes/utils/search.py
index e935366..6ea5a0e 100755
--- a/routes/utils/search.py
+++ b/routes/utils/search.py
@@ -1,6 +1,10 @@
from deezspot.easy_spoty import Spo
import json
from pathlib import Path
+import logging
+
+# Configure logger
+logger = logging.getLogger(__name__)
def search(
query: str,
@@ -8,35 +12,48 @@ def search(
limit: int = 3,
main: str = None
) -> dict:
+ logger.info(f"Search requested: query='{query}', type={search_type}, limit={limit}, main={main}")
+
# If main account is specified, load client ID and secret from the account's search.json
client_id = None
client_secret = None
if main:
search_creds_path = Path(f'./creds/spotify/{main}/search.json')
-
+ logger.debug(f"Looking for credentials at: {search_creds_path}")
+
if search_creds_path.exists():
try:
with open(search_creds_path, 'r') as f:
search_creds = json.load(f)
client_id = search_creds.get('client_id')
client_secret = search_creds.get('client_secret')
+ logger.debug(f"Credentials loaded successfully for account: {main}")
except Exception as e:
+ logger.error(f"Error loading search credentials: {e}")
print(f"Error loading search credentials: {e}")
+ else:
+ logger.warning(f"Credentials file not found at: {search_creds_path}")
# Initialize the Spotify client with credentials (if available)
if client_id and client_secret:
+ logger.debug("Initializing Spotify client with account credentials")
Spo.__init__(client_id, client_secret)
+ else:
+ logger.debug("Using default Spotify client credentials")
# Perform the Spotify search
- # Note: We don't need to pass client_id and client_secret again in the search method
- # as they've already been set during initialization
- spotify_response = Spo.search(
- query=query,
- search_type=search_type,
- limit=limit,
- client_id=client_id,
- client_secret=client_secret
- )
-
- return spotify_response
+ logger.debug(f"Executing Spotify search with query='{query}', type={search_type}")
+ try:
+ spotify_response = Spo.search(
+ query=query,
+ search_type=search_type,
+ limit=limit,
+ client_id=client_id,
+ client_secret=client_secret
+ )
+ logger.info(f"Search completed successfully")
+ return spotify_response
+ except Exception as e:
+ logger.error(f"Error during Spotify search: {e}")
+ raise
diff --git a/static/css/queue/queue.css b/static/css/queue/queue.css
index 14ba98d..f162e4c 100644
--- a/static/css/queue/queue.css
+++ b/static/css/queue/queue.css
@@ -43,12 +43,88 @@
margin: 0;
}
+/* Queue subtitle with statistics */
+.queue-subtitle {
+ display: flex;
+ gap: 10px;
+ margin-top: 5px;
+ font-size: 0.8rem;
+ color: #b3b3b3;
+}
+
+.queue-stat {
+ padding: 2px 6px;
+ border-radius: 4px;
+ font-weight: 500;
+}
+
+.queue-stat-active {
+ color: #4a90e2;
+ background-color: rgba(74, 144, 226, 0.1);
+}
+
+.queue-stat-completed {
+ color: #1DB954;
+ background-color: rgba(29, 185, 84, 0.1);
+}
+
+.queue-stat-error {
+ color: #ff5555;
+ background-color: rgba(255, 85, 85, 0.1);
+}
+
.header-actions {
display: flex;
gap: 10px;
align-items: center;
}
+/* Refresh queue button */
+#refreshQueueBtn {
+ background: #2a2a2a;
+ border: none;
+ color: #fff;
+ padding: 8px;
+ border-radius: 4px;
+ cursor: pointer;
+ transition: background 0.3s ease, transform 0.2s ease;
+ display: flex;
+ align-items: center;
+ justify-content: center;
+}
+
+#refreshQueueBtn:hover {
+ background: #333;
+ transform: translateY(-1px);
+}
+
+#refreshQueueBtn:active {
+ transform: scale(0.95);
+}
+
+#refreshQueueBtn.refreshing {
+ animation: spin 1s linear infinite;
+}
+
+/* Artist queue message */
+.queue-artist-message {
+ background: #2a2a2a;
+ padding: 15px;
+ border-radius: 8px;
+ margin-bottom: 15px;
+ color: #fff;
+ text-align: center;
+ border-left: 4px solid #4a90e2;
+ animation: pulse 1.5s infinite;
+ font-weight: 500;
+}
+
+@keyframes pulse {
+ 0% { opacity: 0.8; }
+ 50% { opacity: 1; }
+ 100% { opacity: 0.8; }
+}
+
/* Cancel all button styling */
#cancelAllBtn {
background: #8b0000; /* Dark blood red */
diff --git a/static/js/queue.js b/static/js/queue.js
index 93a2d1a..2de054a 100644
--- a/static/js/queue.js
+++ b/static/js/queue.js
@@ -20,12 +20,14 @@ class DownloadQueue {
this.MAX_RETRIES = 3; // Default max retries
this.RETRY_DELAY = 5; // Default retry delay in seconds
this.RETRY_DELAY_INCREASE = 5; // Default retry delay increase in seconds
+ this.MAX_SSE_CONNECTIONS = 5; // Maximum number of active SSE connections
this.downloadQueue = {}; // keyed by unique queueId
this.currentConfig = {}; // Cache for current config
// EventSource connections for SSE tracking
this.sseConnections = {}; // keyed by prgFile/task_id
+ this.pendingForSSE = []; // Queue of entries waiting for SSE connections
// Load the saved visible count (or default to 10)
const storedVisibleCount = localStorage.getItem("downloadQueueVisibleCount");
@@ -34,6 +36,9 @@ class DownloadQueue {
// Load the cached status info (object keyed by prgFile)
this.queueCache = JSON.parse(localStorage.getItem("downloadQueueCache") || "{}");
+ // Add a throttled update method to reduce UI updates
+ this.throttledUpdateQueue = this.throttle(this.updateQueueOrder.bind(this), 500);
+
// Wait for initDOM to complete before setting up event listeners and loading existing PRG files.
this.initDOM().then(() => {
this.initEventListeners();
@@ -41,6 +46,25 @@ class DownloadQueue {
});
}
+ /* Utility method to throttle frequent function calls */
+ throttle(func, delay) {
+ let lastCall = 0;
+ let timeout;
+ return function(...args) {
+ const now = Date.now();
+ if (now - lastCall < delay) {
+ clearTimeout(timeout);
+ timeout = setTimeout(() => {
+ lastCall = now;
+ func(...args);
+ }, delay);
+ } else {
+ lastCall = now;
+ func(...args);
+ }
+ };
+ }
+
/* DOM Management */
async initDOM() {
// New HTML structure for the download queue.
@@ -53,6 +77,14 @@ class DownloadQueue {
Cancel all
+
Your download queue is empty
-Your download queue is empty
+ `; + container.innerHTML = ''; + container.appendChild(emptyDiv); + } else { + // Get the visible entries slice + const visibleEntries = entries.slice(0, this.visibleCount); + + // Create a map of current DOM elements by queue ID + const existingElements = container.querySelectorAll('.queue-item'); + const existingElementMap = {}; + Array.from(existingElements).forEach(el => { + const cancelBtn = el.querySelector('.cancel-btn'); + if (cancelBtn) { + const queueId = cancelBtn.dataset.queueid; + if (queueId) existingElementMap[queueId] = el; + } + }); + + // Add visible entries to the fragment in the correct order + visibleEntries.forEach(entry => { + fragment.appendChild(entry.element); + entry.isNew = false; + }); + + // Clear container and append the fragment + container.innerHTML = ''; + container.appendChild(fragment); + } // Update footer footer.innerHTML = ''; @@ -951,8 +1035,14 @@ class DownloadQueue { // Close any existing SSE connection this.closeSSEConnection(queueId); + // For album tasks created from artist downloads, we need to ensure + // we're using the album URL, not the original artist URL + let retryUrl = entry.requestUrl; + + console.log(`Retrying download for ${entry.type} with URL: ${retryUrl}`); + // Use the stored original request URL to create a new download - const retryResponse = await fetch(entry.requestUrl); + const retryResponse = await fetch(retryUrl); if (!retryResponse.ok) { throw new Error(`Server returned ${retryResponse.status}`); } @@ -1052,29 +1142,63 @@ class DownloadQueue { const data = await response.json(); - // Handle artist downloads which return multiple album_prg_files - if (type === 'artist' && data.album_prg_files && Array.isArray(data.album_prg_files)) { - // Add each album to the download queue separately - const queueIds = []; - data.album_prg_files.forEach(prgFile => { - const queueId = this.addDownload(item, 'album', prgFile, apiUrl, false); - queueIds.push({queueId, prgFile}); - }); - - // Wait a short time before setting up SSE connections - await new Promise(resolve => setTimeout(resolve, 1000)); - - // Set up SSE connections for each entry - for (const {queueId, prgFile} of queueIds) { - const entry = this.downloadQueue[queueId]; - if (entry && !entry.hasEnded) { - this.setupSSEConnection(queueId); + // Handle artist downloads which return multiple album tasks + if (type === 'artist') { + // Check for new API response format + if (data.task_ids && Array.isArray(data.task_ids)) { + // For artist discographies, we get individual task IDs for each album + console.log(`Queued artist discography with ${data.task_ids.length} albums`); + + // Make queue visible to show progress + this.toggleVisibility(true); + + // Show a temporary message about the artist download + const artistMessage = document.createElement('div'); + artistMessage.className = 'queue-artist-message'; + artistMessage.textContent = `Queued ${data.task_ids.length} albums for ${item.name || 'artist'}. Loading...`; + document.getElementById('queueItems').prepend(artistMessage); + + // Wait a moment to ensure backend has processed the tasks + await new Promise(resolve => setTimeout(resolve, 1500)); + + // Remove the temporary message + artistMessage.remove(); + + // Fetch the latest tasks to show all newly created album downloads + await this.loadExistingPrgFiles(); + + return data.task_ids; + } + // Check for older API response format + else if (data.album_prg_files && Array.isArray(data.album_prg_files)) { + console.log(`Queued artist discography with ${data.album_prg_files.length} albums (old format)`); + // Add each album to the download queue separately + const queueIds = []; + data.album_prg_files.forEach(prgFile => { + const queueId = this.addDownload(item, 'album', prgFile, apiUrl, false); + queueIds.push({queueId, prgFile}); + }); + + // Make queue visible to show progress + this.toggleVisibility(true); + + // Wait a short time before setting up SSE connections + await new Promise(resolve => setTimeout(resolve, 1000)); + + // Set up SSE connections for each entry + for (const {queueId, prgFile} of queueIds) { + const entry = this.downloadQueue[queueId]; + if (entry && !entry.hasEnded) { + this.setupSSEConnection(queueId); + } } + + return queueIds.map(({queueId}) => queueId); } - - return queueIds.map(({queueId}) => queueId); - } else if (data.prg_file) { - // Handle single-file downloads (tracks, albums, playlists) + } + + // Handle single-file downloads (tracks, albums, playlists) + if (data.prg_file) { const queueId = this.addDownload(item, type, data.prg_file, apiUrl, false); // Wait a short time before setting up SSE connection @@ -1101,6 +1225,16 @@ class DownloadQueue { */ async loadExistingPrgFiles() { try { + // Clear existing queue entries first to avoid duplicates when refreshing + for (const queueId in this.downloadQueue) { + const entry = this.downloadQueue[queueId]; + // Close any active connections + this.closeSSEConnection(queueId); + + // Don't remove the entry from DOM - we'll rebuild it entirely + delete this.downloadQueue[queueId]; + } + const response = await fetch('/api/prgs/list'); const prgFiles = await response.json(); @@ -1284,6 +1418,17 @@ class DownloadQueue { // Close any existing connection this.closeSSEConnection(queueId); + // Check if we're at the connection limit + const activeConnectionCount = Object.keys(this.sseConnections).length; + if (activeConnectionCount >= this.MAX_SSE_CONNECTIONS) { + // Add to pending queue instead of creating connection now + if (!this.pendingForSSE.includes(queueId)) { + this.pendingForSSE.push(queueId); + console.log(`Queued SSE connection for ${queueId} (max connections reached)`); + } + return; + } + // Create a new EventSource connection try { const sse = new EventSource(`/api/prgs/stream/${entry.prgFile}`); @@ -1321,96 +1466,44 @@ class DownloadQueue { entry.status = data.status; }); - sse.addEventListener('update', (event) => { + // Combined handler for all update-style events + const updateHandler = (event) => { const data = JSON.parse(event.data); - console.log('SSE update event:', data); - this.handleSSEUpdate(queueId, data); - }); - - sse.addEventListener('progress', (event) => { - const data = JSON.parse(event.data); - console.log('SSE progress event:', data); - this.handleSSEUpdate(queueId, data); - }); - - // Add specific handler for track_complete events - sse.addEventListener('track_complete', (event) => { - const data = JSON.parse(event.data); - console.log('SSE track_complete event:', data); - console.log(`Current entry type: ${entry.type}`); + const eventType = event.type; - // Mark this status as a track completion - data.status = 'track_complete'; - - // Only update the log message without changing status colors - const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); - if (logElement) { - let message = `Completed track: ${data.title || data.track || 'Unknown'}`; - if (data.artist) message += ` by ${data.artist}`; - logElement.textContent = message; - } - - // For single track downloads, track_complete is a terminal state - if (entry.type === 'track') { - console.log('Single track download completed - terminating'); - // Mark the track as ended - entry.hasEnded = true; + if (eventType === 'track_complete') { + // Special handling for track completions + console.log('SSE track_complete event:', data); - // Handle as a terminal state - setTimeout(() => { - this.closeSSEConnection(queueId); - this.cleanupEntry(queueId); - }, 5000); - } else { - console.log(`Album/playlist track completed - continuing download (type: ${entry.type})`); - // For albums/playlists, just update entry data without changing status - entry.lastStatus = data; - entry.lastUpdated = Date.now(); + // Mark this status as a track completion + data.status = 'track_complete'; - // Save to cache - this.queueCache[entry.prgFile] = data; - localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); - } - }); - - // Also handle 'done' events which can come for individual tracks - sse.addEventListener('done', (event) => { - const data = JSON.parse(event.data); - console.log('SSE done event (individual track):', data); - console.log(`Current entry type: ${entry.type}`); - - // Only update the log message without changing status colors for album tracks - const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); - if (logElement) { - let message = `Completed track: ${data.song || data.title || data.track || 'Unknown'}`; - if (data.artist) message += ` by ${data.artist}`; - logElement.textContent = message; - } - - // For single track downloads, done is a terminal state - if (entry.type === 'track') { - console.log('Single track download completed (done) - terminating'); - // Mark the track as ended - entry.hasEnded = true; + // Only update the log message without changing status colors + const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); + if (logElement) { + let message = `Completed track: ${data.title || data.track || 'Unknown'}`; + if (data.artist) message += ` by ${data.artist}`; + logElement.textContent = message; + } - // Handle as a terminal state - setTimeout(() => { - this.closeSSEConnection(queueId); - this.cleanupEntry(queueId); - }, 5000); - } else if (data.song) { - console.log(`Album/playlist individual track done - continuing download (type: ${entry.type})`); - // For albums/playlists, just update entry data without changing status - data._isIndividualTrack = true; // Mark it for special handling in update logic - entry.lastStatus = data; - entry.lastUpdated = Date.now(); + // For single track downloads, track_complete is a terminal state + if (entry.type === 'track') { + entry.hasEnded = true; + setTimeout(() => { + this.closeSSEConnection(queueId); + this.cleanupEntry(queueId); + }, 5000); + } else { + // For albums/playlists, just update entry data without changing status + entry.lastStatus = data; + entry.lastUpdated = Date.now(); + this.queueCache[entry.prgFile] = data; + localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); + } + } else if (eventType === 'complete' || eventType === 'done') { + // Terminal state handling + console.log(`SSE ${eventType} event:`, data); - // Save to cache - this.queueCache[entry.prgFile] = data; - localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); - } else { - // This is a real done event for the entire album/playlist - console.log(`Entire ${entry.type} completed - finalizing`); this.handleSSEUpdate(queueId, data); entry.hasEnded = true; @@ -1418,91 +1511,38 @@ class DownloadQueue { this.closeSSEConnection(queueId); this.cleanupEntry(queueId); }, 5000); - } - }); - - sse.addEventListener('complete', (event) => { - const data = JSON.parse(event.data); - console.log('SSE complete event:', data); - console.log(`Current entry type: ${entry.type}`); - - // Skip terminal processing for track_complete status in albums/playlists - // Also skip for "done" status when it's for an individual track in an album/playlist - if ((data.status === 'track_complete' && entry.type !== 'track') || - (data.status === 'done' && data.song && entry.type !== 'track')) { - console.log(`Track ${data.status} in ${entry.type} download - continuing`); - // Don't process individual track completion events here - return; - } - - // Make sure the status is set to 'complete' for UI purposes - if (!data.status || data.status === '') { - data.status = 'complete'; - } - - // For track downloads, make sure we have a proper name - if (entry.type === 'track' && !data.name && entry.lastStatus) { - data.name = entry.lastStatus.name || ''; - data.artist = entry.lastStatus.artist || ''; - } - - this.handleSSEUpdate(queueId, data); - - // Always mark as terminal state for 'complete' events (except individual track completions in albums) - entry.hasEnded = true; - - // Close the connection after a short delay - setTimeout(() => { + } else if (eventType === 'error') { + // Error state handling + console.log('SSE error event:', data); + this.handleSSEUpdate(queueId, data); + entry.hasEnded = true; this.closeSSEConnection(queueId); - this.cleanupEntry(queueId); - }, 5000); - }); - - sse.addEventListener('error', (event) => { - const data = JSON.parse(event.data); - console.log('SSE error event:', data); - this.handleSSEUpdate(queueId, data); - - // Mark the download as ended with error - entry.hasEnded = true; - - // Close the connection, but don't automatically clean up the entry - // to allow for potential retry - this.closeSSEConnection(queueId); - }); - - sse.addEventListener('end', (event) => { - const data = JSON.parse(event.data); - console.log('SSE end event:', data); - - // For track downloads, ensure we have the proper fields for UI display - if (entry.type === 'track') { - // If the end event doesn't have a name/artist, copy from lastStatus - if ((!data.name || !data.artist) && entry.lastStatus) { - data.name = data.name || entry.lastStatus.name || ''; - data.artist = data.artist || entry.lastStatus.artist || ''; - } + } else if (eventType === 'end') { + // End event handling + console.log('SSE end event:', data); - // Force status to 'complete' if not provided - if (!data.status || data.status === '') { - data.status = 'complete'; + // Update with final status + this.handleSSEUpdate(queueId, data); + entry.hasEnded = true; + this.closeSSEConnection(queueId); + + if (data.status === 'complete' || data.status === 'done') { + setTimeout(() => this.cleanupEntry(queueId), 5000); } + } else { + // Standard update handling + this.handleSSEUpdate(queueId, data); } - - // Update with final status - this.handleSSEUpdate(queueId, data); - - // Mark the download as ended - entry.hasEnded = true; - - // Close the connection - this.closeSSEConnection(queueId); - - // Clean up the entry after a delay if it's a success - if (data.status === 'complete' || data.status === 'done') { - setTimeout(() => this.cleanupEntry(queueId), 5000); - } - }); + }; + + // Set up shared handler for all events + sse.addEventListener('update', updateHandler); + sse.addEventListener('progress', updateHandler); + sse.addEventListener('track_complete', updateHandler); + sse.addEventListener('complete', updateHandler); + sse.addEventListener('done', updateHandler); + sse.addEventListener('error', updateHandler); + sse.addEventListener('end', updateHandler); // Handle connection error sse.onerror = (error) => { @@ -1537,6 +1577,13 @@ class DownloadQueue { console.error('Error closing SSE connection:', error); } delete this.sseConnections[queueId]; + + // Now that we've freed a slot, check if any entries are waiting for an SSE connection + if (this.pendingForSSE.length > 0) { + const nextQueueId = this.pendingForSSE.shift(); + console.log(`Starting SSE connection for queued entry ${nextQueueId}`); + this.setupSSEConnection(nextQueueId); + } } } @@ -1552,8 +1599,6 @@ class DownloadQueue { return; } - console.log(`handleSSEUpdate for ${queueId} with type ${entry.type} and status ${data.status}`); - // Track completion is special - don't change visible status ONLY for albums/playlists // Check for both 'track_complete' and 'done' statuses for individual tracks in albums const isTrackCompletion = data.status === 'track_complete' || @@ -1574,29 +1619,46 @@ class DownloadQueue { entry.status = data.status; } - // Update status message in the UI - const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); - if (logElement) { - const statusMessage = this.getStatusMessage(data); - logElement.textContent = statusMessage; - } + // Update status message in the UI - use a more efficient approach + this.updateEntryStatusUI(entry, data, skipStatusChange); - // Apply appropriate CSS classes based on status only if not skipping status change - if (!skipStatusChange) { - this.applyStatusClasses(entry, data); - } - - // Save updated status to cache - this.queueCache[entry.prgFile] = data; - localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); + // Save updated status to cache - debounce these writes to reduce storage operations + clearTimeout(entry.cacheWriteTimeout); + entry.cacheWriteTimeout = setTimeout(() => { + this.queueCache[entry.prgFile] = data; + localStorage.setItem("downloadQueueCache", JSON.stringify(this.queueCache)); + }, 500); // Special handling for error status if (data.status === 'error') { this.handleTerminalState(entry, queueId, data); } - // Update the queue order - this.updateQueueOrder(); + // Throttle UI updates to improve performance with multiple downloads + this.throttledUpdateQueue(); + } + + // Optimized method to update the entry status in the UI + updateEntryStatusUI(entry, data, skipStatusChange) { + // First, update the log message text if the element exists + const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); + if (logElement) { + // Only modify the text content if it doesn't already have child elements + // (which would be the case for error states with retry buttons) + if (!logElement.querySelector('.error-message')) { + const statusMessage = this.getStatusMessage(data); + + // Only update DOM if the text has changed + if (logElement.textContent !== statusMessage) { + logElement.textContent = statusMessage; + } + } + } + + // Apply CSS classes for status indication only if we're not skipping status changes + if (!skipStatusChange) { + this.applyStatusClasses(entry, data); + } } /* Close all active SSE connections */