diff --git a/routes/utils/celery_manager.py b/routes/utils/celery_manager.py index 24aeeba..ea140ad 100644 --- a/routes/utils/celery_manager.py +++ b/routes/utils/celery_manager.py @@ -11,6 +11,16 @@ import queue import sys import uuid +# Import Celery task utilities +from .celery_tasks import ( + ProgressState, + get_task_info, + get_last_task_status, + store_task_status, + get_all_tasks as get_all_celery_tasks_info +) +from .celery_config import get_config_params + # Configure logging logger = logging.getLogger(__name__) @@ -33,6 +43,61 @@ class CeleryManager: self.log_queue = queue.Queue() self.output_threads = [] + def _cleanup_stale_tasks(self): + logger.info("Cleaning up potentially stale Celery tasks...") + try: + tasks = get_all_celery_tasks_info() + if not tasks: + logger.info("No tasks found in Redis to check for staleness.") + return + + active_stale_states = [ + ProgressState.PROCESSING, + ProgressState.INITIALIZING, + ProgressState.DOWNLOADING, + ProgressState.PROGRESS, + ProgressState.REAL_TIME, + ProgressState.RETRYING + ] + + stale_tasks_count = 0 + for task_summary in tasks: + task_id = task_summary.get("task_id") + if not task_id: + continue + + last_status_data = get_last_task_status(task_id) + if last_status_data: + current_status_str = last_status_data.get("status") + if current_status_str in active_stale_states: + logger.warning(f"Task {task_id} ('{task_summary.get('name', 'Unknown')}') found in stale state '{current_status_str}'. Marking as error.") + + task_info_details = get_task_info(task_id) + config = get_config_params() + + error_payload = { + "status": ProgressState.ERROR, + "message": "Task interrupted due to application restart.", + "error": "Task interrupted due to application restart.", + "timestamp": time.time(), + "type": task_info_details.get("type", task_summary.get("type", "unknown")), + "name": task_info_details.get("name", task_summary.get("name", "Unknown")), + "artist": task_info_details.get("artist", task_summary.get("artist", "")), + "can_retry": True, + "retry_count": last_status_data.get("retry_count", 0), + "max_retries": config.get('maxRetries', 3) + } + store_task_status(task_id, error_payload) + stale_tasks_count += 1 + + if stale_tasks_count > 0: + logger.info(f"Marked {stale_tasks_count} stale tasks as 'error'.") + else: + logger.info("No stale tasks found that needed cleanup.") + + except Exception as e: + logger.error(f"Error during stale task cleanup: {e}", exc_info=True) + def start(self): """Start the Celery manager and initial workers""" if self.running: @@ -40,6 +105,9 @@ class CeleryManager: self.running = True + # Clean up stale tasks BEFORE starting/restarting workers + self._cleanup_stale_tasks() + # Start initial workers self._update_workers() diff --git a/static/js/queue.js b/static/js/queue.js index d082674..e7d6760 100644 --- a/static/js/queue.js +++ b/static/js/queue.js @@ -444,7 +444,8 @@ class DownloadQueue { isNew: true, // Add flag to track if this is a new entry status: 'initializing', lastMessage: `Initializing ${type} download...`, - parentInfo: null // Will store parent data for tracks that are part of albums/playlists + parentInfo: null, // Will store parent data for tracks that are part of albums/playlists + realTimeStallDetector: { count: 0, lastStatusJson: '' } // For detecting stalled real_time downloads }; // If cached info exists for this PRG file, use it. @@ -1149,23 +1150,22 @@ createQueueItem(item, type, prgFile, queueId) { async retryDownload(queueId, logElement) { const entry = this.queueEntries[queueId]; - if (!entry) return; - - // Hide any existing error-details and restore log for retry - const errContainer = entry.element.querySelector(`#error-details-${entry.uniqueId}-${entry.prgFile}`); - if (errContainer) { errContainer.style.display = 'none'; } - logElement.style.display = ''; - - // Mark the entry as retrying to prevent automatic cleanup - entry.isRetrying = true; - logElement.textContent = 'Retrying download...'; - - // Determine if we should use parent information for retry + if (!entry) { + console.warn(`Retry called for non-existent queueId: ${queueId}`); + return; + } + + // The retry button is already showing "Retrying..." and is disabled by the click handler. + // We will update the error message div within logElement if retry fails. + const errorMessageDiv = logElement?.querySelector('.error-message'); + const retryBtn = logElement?.querySelector('.retry-btn'); + + entry.isRetrying = true; // Mark the original entry as being retried. + + // Determine if we should use parent information for retry (existing logic) let useParent = false; let parentType = null; let parentUrl = null; - - // Check if we have parent information in the lastStatus if (entry.lastStatus && entry.lastStatus.parent) { const parent = entry.lastStatus.parent; if (parent.type && parent.url) { @@ -1175,125 +1175,110 @@ createQueueItem(item, type, prgFile, queueId) { console.log(`Using parent info for retry: ${parentType} with URL: ${parentUrl}`); } } - - // Find a retry URL from various possible sources + const getRetryUrl = () => { - // Prefer full original URL from progress API - if (entry.lastStatus && entry.lastStatus.original_url) { - return entry.lastStatus.original_url; - } - // If using parent, return parent URL - if (useParent && parentUrl) { - return parentUrl; - } - - // Otherwise use the standard fallback options + if (entry.lastStatus && entry.lastStatus.original_url) return entry.lastStatus.original_url; + if (useParent && parentUrl) return parentUrl; if (entry.requestUrl) return entry.requestUrl; - - // If we have lastStatus with original_request, check there if (entry.lastStatus && entry.lastStatus.original_request) { - if (entry.lastStatus.original_request.retry_url) - return entry.lastStatus.original_request.retry_url; - if (entry.lastStatus.original_request.url) - return entry.lastStatus.original_request.url; + if (entry.lastStatus.original_request.retry_url) return entry.lastStatus.original_request.retry_url; + if (entry.lastStatus.original_request.url) return entry.lastStatus.original_request.url; } - - // Check if there's a URL directly in the lastStatus - if (entry.lastStatus && entry.lastStatus.url) - return entry.lastStatus.url; - - // Fallback to stored requestUrl - if (entry.requestUrl) { - return entry.requestUrl; - } - + if (entry.lastStatus && entry.lastStatus.url) return entry.lastStatus.url; return null; }; - + const retryUrl = getRetryUrl(); - - // If we don't have any retry URL, show error + if (!retryUrl) { - logElement.textContent = 'Retry not available: missing URL information.'; - entry.isRetrying = false; // Reset retrying flag + if (errorMessageDiv) errorMessageDiv.textContent = 'Retry not available: missing URL information.'; + entry.isRetrying = false; + if (retryBtn) { + retryBtn.disabled = false; + retryBtn.innerHTML = 'Retry'; // Reset button text + } return; } - - try { - // Close any existing polling interval - this.clearPollingInterval(queueId); - - // Determine which type to use for the API endpoint - const apiType = useParent ? parentType : entry.type; - console.log(`Retrying download using type: ${apiType} with URL: ${retryUrl}`); - - // Determine request URL: if retryUrl is already a full API URL, use it directly - let fullRetryUrl; - if (retryUrl.startsWith('http')) { - fullRetryUrl = retryUrl; - } else { - const apiUrl = `/api/${apiType}/download?url=${encodeURIComponent(retryUrl)}`; - fullRetryUrl = apiUrl; - // Append metadata if retryUrl is raw resource URL - if (entry.item && entry.item.name) { - fullRetryUrl += `&name=${encodeURIComponent(entry.item.name)}`; - } - if (entry.item && entry.item.artist) { - fullRetryUrl += `&artist=${encodeURIComponent(entry.item.artist)}`; - } - } - // Use the stored original request URL to create a new download + // Store details needed for the new entry BEFORE any async operations + const originalItem = { ...entry.item }; // Shallow copy + const apiTypeForNewEntry = useParent ? parentType : entry.type; + console.log(`Retrying download using type: ${apiTypeForNewEntry} with base URL: ${retryUrl}`); + + let fullRetryUrl; + if (retryUrl.startsWith('http') || retryUrl.startsWith('/api/')) { // if it's already a full URL or an API path + fullRetryUrl = retryUrl; + } else { + // Construct full URL if retryUrl is just a resource identifier + fullRetryUrl = `/api/${apiTypeForNewEntry}/download?url=${encodeURIComponent(retryUrl)}`; + // Append metadata if retryUrl is raw resource URL + if (originalItem && originalItem.name) { + fullRetryUrl += `&name=${encodeURIComponent(originalItem.name)}`; + } + if (originalItem && originalItem.artist) { + fullRetryUrl += `&artist=${encodeURIComponent(originalItem.artist)}`; + } + } + const requestUrlForNewEntry = fullRetryUrl; + + try { + // Clear polling for the old entry before making the request + this.clearPollingInterval(queueId); + const retryResponse = await fetch(fullRetryUrl); if (!retryResponse.ok) { - throw new Error(`Server returned ${retryResponse.status}`); + const errorText = await retryResponse.text(); + throw new Error(`Server returned ${retryResponse.status}${errorText ? (': ' + errorText) : ''}`); } - + const retryData = await retryResponse.json(); - + if (retryData.prg_file) { - // Store the old PRG file for cleanup - const oldPrgFile = entry.prgFile; + const newPrgFile = retryData.prg_file; + + // Clean up the old entry from UI, memory, cache, and server (PRG file) + // logElement and retryBtn are part of the old entry's DOM structure and will be removed. + await this.cleanupEntry(queueId); + + // Add the new download entry. This will create a new element, start monitoring, etc. + this.addDownload(originalItem, apiTypeForNewEntry, newPrgFile, requestUrlForNewEntry, true); - // Update the entry with the new PRG file - const logEl = entry.element.querySelector('.log'); - logEl.id = `log-${entry.uniqueId}-${retryData.prg_file}`; - entry.prgFile = retryData.prg_file; - entry.lastStatus = null; - entry.hasEnded = false; - entry.lastUpdated = Date.now(); - entry.retryCount = (entry.retryCount || 0) + 1; - entry.statusCheckFailures = 0; // Reset failure counter - logEl.textContent = 'Retry initiated...'; - - // Make sure any existing interval is cleared - if (entry.intervalId) { - clearInterval(entry.intervalId); - entry.intervalId = null; - } - - // Set up a new polling interval for the retried download - this.setupPollingInterval(queueId); - - // Delete the old PRG file after a short delay to ensure the new one is properly set up - if (oldPrgFile) { - setTimeout(async () => { - try { - await fetch(`/api/prgs/delete/${oldPrgFile}`, { method: 'DELETE' }); - console.log(`Cleaned up old PRG file: ${oldPrgFile}`); - } catch (deleteError) { - console.error('Error deleting old PRG file:', deleteError); - } - }, 2000); // Wait 2 seconds before deleting the old file - } + // The old setTimeout block for deleting oldPrgFile is no longer needed as cleanupEntry handles it. } else { - logElement.textContent = 'Retry failed: invalid response from server'; - entry.isRetrying = false; // Reset retrying flag + if (errorMessageDiv) errorMessageDiv.textContent = 'Retry failed: invalid response from server.'; + const currentEntry = this.queueEntries[queueId]; // Check if old entry still exists + if (currentEntry) { + currentEntry.isRetrying = false; + } + if (retryBtn) { + retryBtn.disabled = false; + retryBtn.innerHTML = 'Retry'; + } } } catch (error) { console.error('Retry error:', error); - logElement.textContent = 'Retry failed: ' + error.message; - entry.isRetrying = false; // Reset retrying flag + // The old entry might still be in the DOM if cleanupEntry wasn't called or failed. + const stillExistingEntry = this.queueEntries[queueId]; + if (stillExistingEntry && stillExistingEntry.element) { + // logElement might be stale if the element was re-rendered, so query again if possible. + const currentLogOnFailedEntry = stillExistingEntry.element.querySelector('.log'); + const errorDivOnFailedEntry = currentLogOnFailedEntry?.querySelector('.error-message') || errorMessageDiv; + const retryButtonOnFailedEntry = currentLogOnFailedEntry?.querySelector('.retry-btn') || retryBtn; + + if (errorDivOnFailedEntry) errorDivOnFailedEntry.textContent = 'Retry failed: ' + error.message; + stillExistingEntry.isRetrying = false; + if (retryButtonOnFailedEntry) { + retryButtonOnFailedEntry.disabled = false; + retryButtonOnFailedEntry.innerHTML = 'Retry'; + } + } else if (errorMessageDiv) { + // Fallback if entry is gone from queue but original logElement's parts are somehow still accessible + errorMessageDiv.textContent = 'Retry failed: ' + error.message; + if (retryBtn) { + retryBtn.disabled = false; + retryBtn.innerHTML = 'Retry'; + } + } } } @@ -1892,10 +1877,54 @@ createQueueItem(item, type, prgFile, queueId) { } // Get primary status - const status = statusData.status || data.event || 'unknown'; + let status = statusData.status || data.event || 'unknown'; // Define status *before* potential modification + + // Stall detection for 'real_time' status + if (status === 'real_time') { + entry.realTimeStallDetector = entry.realTimeStallDetector || { count: 0, lastStatusJson: '' }; + const detector = entry.realTimeStallDetector; + + const currentMetrics = { + progress: statusData.progress, + time_elapsed: statusData.time_elapsed, + // For multi-track items, current_track is a key indicator of activity + current_track: (entry.type === 'album' || entry.type === 'playlist') ? statusData.current_track : undefined, + // Include other relevant fields if they signify activity, e.g., speed, eta + // For example, if statusData.song changes for an album, that's progress. + song: statusData.song + }; + const currentMetricsJson = JSON.stringify(currentMetrics); + + // Check if significant metrics are present and static + if (detector.lastStatusJson === currentMetricsJson && + (currentMetrics.progress !== undefined || currentMetrics.time_elapsed !== undefined || currentMetrics.current_track !== undefined || currentMetrics.song !== undefined)) { + // Metrics are present and haven't changed + detector.count++; + } else { + // Metrics changed, or this is the first time seeing them, or no metrics to compare (e.g. empty object from server) + detector.count = 0; + // Only update lastStatusJson if currentMetricsJson represents actual data, not an empty object if that's possible + if (currentMetricsJson !== '{}' || detector.lastStatusJson === '') { // Avoid replacing actual old data with '{}' if new data is sparse + detector.lastStatusJson = currentMetricsJson; + } + } + + const STALL_THRESHOLD = 600; // Approx 5 minutes (600 polls * 0.5s/poll) + if (detector.count >= STALL_THRESHOLD) { + console.warn(`Download ${queueId} (${entry.prgFile}) appears stalled in real_time state. Metrics: ${detector.lastStatusJson}. Stall count: ${detector.count}. Forcing error.`); + statusData.status = 'error'; + statusData.error = 'Download stalled (no progress updates for 5 minutes)'; + statusData.can_retry = true; // Allow manual retry for stalled items + status = 'error'; // Update local status variable for current execution scope + + // Reset detector for this entry in case of retry + detector.count = 0; + detector.lastStatusJson = ''; + } + } // Store the status data for potential retries - entry.lastStatus = statusData; + entry.lastStatus = statusData; // This now stores the potentially modified statusData (e.g., status changed to 'error') entry.lastUpdated = Date.now(); // Update type if needed - could be more specific now (e.g., from 'album' to 'compilation') @@ -1947,46 +1976,75 @@ createQueueItem(item, type, prgFile, queueId) { const cancelBtn = entry.element.querySelector('.cancel-btn'); if (cancelBtn) cancelBtn.style.display = 'none'; + // Hide progress bars for errored items + const trackProgressContainer = entry.element.querySelector(`#track-progress-container-${entry.uniqueId}-${entry.prgFile}`); + if (trackProgressContainer) trackProgressContainer.style.display = 'none'; + const overallProgressContainer = entry.element.querySelector('.overall-progress-container'); + if (overallProgressContainer) overallProgressContainer.style.display = 'none'; + // Hide time elapsed for errored items + const timeElapsedContainer = entry.element.querySelector(`#time-elapsed-${entry.uniqueId}-${entry.prgFile}`); + if (timeElapsedContainer) timeElapsedContainer.style.display = 'none'; + // Extract error details - const errMsg = statusData.error; - const canRetry = Boolean(statusData.can_retry) && statusData.retry_count < statusData.max_retries; - // Determine retry URL + const errMsg = statusData.error || 'An unknown error occurred.'; // Ensure errMsg is a string + // const canRetry = Boolean(statusData.can_retry) && statusData.retry_count < statusData.max_retries; // This logic is implicitly handled by retry button availability const retryUrl = data.original_url || data.original_request?.url || entry.requestUrl || null; if (retryUrl) { - entry.requestUrl = retryUrl; + entry.requestUrl = retryUrl; // Store for retry logic } - console.log(`Error for ${entry.type} download. Can retry: ${canRetry}. Retry URL: ${retryUrl}`); + console.log(`Error for ${entry.type} download. Can retry: ${!!entry.requestUrl}. Retry URL: ${entry.requestUrl}`); const logElement = document.getElementById(`log-${entry.uniqueId}-${entry.prgFile}`); if (logElement) { - // Build error UI with manual retry always available - logElement.innerHTML = ` -
- - `; - // Close handler - logElement.querySelector('.close-error-btn').addEventListener('click', () => { - this.cleanupEntry(queueId); - }); - // Always attach manual retry handler - const retryBtn = logElement.querySelector('.retry-btn'); - retryBtn.addEventListener('click', (e) => { - e.preventDefault(); - e.stopPropagation(); - retryBtn.disabled = true; - retryBtn.innerHTML = ' Retrying...'; - this.retryDownload(queueId, logElement); - }); - // Auto cleanup after 15s - setTimeout(() => { - if (this.queueEntries[queueId]?.hasEnded) { - this.cleanupEntry(queueId); + let errorMessageElement = logElement.querySelector('.error-message'); + + if (!errorMessageElement) { // If error UI (message and buttons) is not built yet + // Build error UI with manual retry always available + logElement.innerHTML = ` + + + `; + errorMessageElement = logElement.querySelector('.error-message'); // Re-select after innerHTML change + + // Attach listeners ONLY when creating the buttons + const closeErrorBtn = logElement.querySelector('.close-error-btn'); + if (closeErrorBtn) { + closeErrorBtn.addEventListener('click', () => { + this.cleanupEntry(queueId); + }); } - }, 15000); + + const retryBtnElem = logElement.querySelector('.retry-btn'); + if (retryBtnElem) { + retryBtnElem.addEventListener('click', (e) => { + e.preventDefault(); + e.stopPropagation(); + retryBtnElem.disabled = true; + retryBtnElem.innerHTML = ' Retrying...'; + this.retryDownload(queueId, logElement); + }); + } + + // Auto cleanup after 15s - only set this timeout once when error UI is first built + setTimeout(() => { + const currentEntryForCleanup = this.queueEntries[queueId]; + if (currentEntryForCleanup && + currentEntryForCleanup.hasEnded && + currentEntryForCleanup.lastStatus?.status === 'error' && + !currentEntryForCleanup.isRetrying) { + this.cleanupEntry(queueId); + } + }, 15000); + + } else { // Error UI already exists, just update the message text if it's different + if (errorMessageElement.textContent !== errMsg) { + errorMessageElement.textContent = errMsg; + } + } } }