mirror of
https://lavaforge.org/spotizerr/spotizerr.git
synced 2025-12-24 02:39:14 -05:00
test suite
This commit is contained in:
44
tests/README.md
Normal file
44
tests/README.md
Normal file
@@ -0,0 +1,44 @@
|
||||
# Spotizerr Backend Tests
|
||||
|
||||
This directory contains automated tests for the Spotizerr backend API.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
1. **Running Backend**: Ensure the Spotizerr Flask application is running and accessible at `http://localhost:7171`. You can start it with `python app.py`.
|
||||
|
||||
2. **Python Dependencies**: Install the necessary Python packages for testing.
|
||||
```bash
|
||||
pip install pytest requests python-dotenv
|
||||
```
|
||||
|
||||
3. **Credentials**: These tests require valid Spotify and Deezer credentials. Create a file named `.env` in the root directory of the project (`spotizerr`) and add your credentials to it. The tests will load this file automatically.
|
||||
|
||||
**Example `.env` file:**
|
||||
```
|
||||
SPOTIFY_API_CLIENT_ID="your_spotify_client_id"
|
||||
SPOTIFY_API_CLIENT_SECRET="your_spotify_client_secret"
|
||||
# This should be the full JSON content of your credentials blob as a single line string
|
||||
SPOTIFY_BLOB_CONTENT='{"username": "your_spotify_username", "password": "your_spotify_password", ...}'
|
||||
DEEZER_ARL="your_deezer_arl"
|
||||
```
|
||||
|
||||
The tests will automatically use these credentials to create and manage test accounts named `test-spotify-account` and `test-deezer-account`.
|
||||
|
||||
## Running Tests
|
||||
|
||||
To run all tests, navigate to the root directory of the project (`spotizerr`) and run `pytest`:
|
||||
|
||||
```bash
|
||||
pytest
|
||||
```
|
||||
|
||||
To run a specific test file:
|
||||
|
||||
```bash
|
||||
pytest tests/test_downloads.py
|
||||
```
|
||||
|
||||
For more detailed output, use the `-v` (verbose) and `-s` (show print statements) flags:
|
||||
```bash
|
||||
pytest -v -s
|
||||
```
|
||||
1
tests/__init__.py
Normal file
1
tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
149
tests/conftest.py
Normal file
149
tests/conftest.py
Normal file
@@ -0,0 +1,149 @@
|
||||
import pytest
|
||||
import requests
|
||||
import time
|
||||
import os
|
||||
import json
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Load environment variables from .env file in the project root
|
||||
load_dotenv()
|
||||
|
||||
# --- Environment-based secrets for testing ---
|
||||
SPOTIFY_API_CLIENT_ID = os.environ.get("SPOTIFY_API_CLIENT_ID", "your_spotify_client_id")
|
||||
SPOTIFY_API_CLIENT_SECRET = os.environ.get("SPOTIFY_API_CLIENT_SECRET", "your_spotify_client_secret")
|
||||
SPOTIFY_BLOB_CONTENT_STR = os.environ.get("SPOTIFY_BLOB_CONTENT_STR", '{}')
|
||||
try:
|
||||
SPOTIFY_BLOB_CONTENT = json.loads(SPOTIFY_BLOB_CONTENT_STR)
|
||||
except json.JSONDecodeError:
|
||||
SPOTIFY_BLOB_CONTENT = {}
|
||||
|
||||
DEEZER_ARL = os.environ.get("DEEZER_ARL", "your_deezer_arl")
|
||||
|
||||
# --- Standard names for test accounts ---
|
||||
SPOTIFY_ACCOUNT_NAME = "test-spotify-account"
|
||||
DEEZER_ACCOUNT_NAME = "test-deezer-account"
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def base_url():
|
||||
"""Provides the base URL for the API tests."""
|
||||
return "http://localhost:7171/api"
|
||||
|
||||
|
||||
def wait_for_task(base_url, task_id, timeout=600):
|
||||
"""
|
||||
Waits for a Celery task to reach a terminal state (complete, error, etc.).
|
||||
Polls the progress endpoint and prints status updates.
|
||||
"""
|
||||
print(f"\n--- Waiting for task {task_id} (timeout: {timeout}s) ---")
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < timeout:
|
||||
try:
|
||||
response = requests.get(f"{base_url}/prgs/{task_id}")
|
||||
if response.status_code == 404:
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
response.raise_for_status() # Raise an exception for bad status codes
|
||||
|
||||
statuses = response.json()
|
||||
if not statuses:
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
last_status = statuses[-1]
|
||||
status = last_status.get("status")
|
||||
|
||||
# More verbose logging for debugging during tests
|
||||
message = last_status.get('message', '')
|
||||
track = last_status.get('track', '')
|
||||
progress = last_status.get('overall_progress', '')
|
||||
print(f"Task {task_id} | Status: {status:<12} | Progress: {progress or 'N/A':>3}% | Track: {track:<30} | Message: {message}")
|
||||
|
||||
if status in ["complete", "ERROR", "cancelled", "ERROR_RETRIED", "ERROR_AUTO_CLEANED"]:
|
||||
print(f"--- Task {task_id} finished with status: {status} ---")
|
||||
return last_status
|
||||
|
||||
time.sleep(2)
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Warning: Request to fetch task status for {task_id} failed: {e}. Retrying...")
|
||||
time.sleep(5)
|
||||
|
||||
raise TimeoutError(f"Task {task_id} did not complete within {timeout} seconds.")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def task_waiter(base_url):
|
||||
"""Provides a fixture that returns the wait_for_task helper function."""
|
||||
def _waiter(task_id, timeout=600):
|
||||
return wait_for_task(base_url, task_id, timeout)
|
||||
return _waiter
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def setup_credentials_for_tests(base_url):
|
||||
"""
|
||||
A session-wide, automatic fixture to set up all necessary credentials.
|
||||
It runs once before any tests, and tears down the credentials after all tests are complete.
|
||||
"""
|
||||
print("\n--- Setting up credentials for test session ---")
|
||||
|
||||
print("\n--- DEBUGGING CREDENTIALS ---")
|
||||
print(f"SPOTIFY_API_CLIENT_ID: {SPOTIFY_API_CLIENT_ID}")
|
||||
print(f"SPOTIFY_API_CLIENT_SECRET: {SPOTIFY_API_CLIENT_SECRET}")
|
||||
print(f"DEEZER_ARL: {DEEZER_ARL}")
|
||||
print(f"SPOTIFY_BLOB_CONTENT {SPOTIFY_BLOB_CONTENT}")
|
||||
print("--- END DEBUGGING ---\n")
|
||||
|
||||
# Skip all tests if secrets are not provided in the environment
|
||||
if SPOTIFY_API_CLIENT_ID == "your_spotify_client_id" or \
|
||||
SPOTIFY_API_CLIENT_SECRET == "your_spotify_client_secret" or \
|
||||
not SPOTIFY_BLOB_CONTENT or \
|
||||
DEEZER_ARL == "your_deezer_arl":
|
||||
pytest.skip("Required credentials not provided in .env file or environment. Skipping credential-dependent tests.")
|
||||
|
||||
# 1. Set global Spotify API creds
|
||||
data = {"client_id": SPOTIFY_API_CLIENT_ID, "client_secret": SPOTIFY_API_CLIENT_SECRET}
|
||||
response = requests.put(f"{base_url}/credentials/spotify_api_config", json=data)
|
||||
if response.status_code != 200:
|
||||
pytest.fail(f"Failed to set global Spotify API creds: {response.text}")
|
||||
print("Global Spotify API credentials set.")
|
||||
|
||||
# 2. Delete any pre-existing test credentials to ensure a clean state
|
||||
requests.delete(f"{base_url}/credentials/spotify/{SPOTIFY_ACCOUNT_NAME}")
|
||||
requests.delete(f"{base_url}/credentials/deezer/{DEEZER_ACCOUNT_NAME}")
|
||||
print("Cleaned up any old test credentials.")
|
||||
|
||||
# 3. Create Deezer credential
|
||||
data = {"name": DEEZER_ACCOUNT_NAME, "arl": DEEZER_ARL, "region": "US"}
|
||||
response = requests.post(f"{base_url}/credentials/deezer/{DEEZER_ACCOUNT_NAME}", json=data)
|
||||
if response.status_code != 201:
|
||||
pytest.fail(f"Failed to create Deezer credential: {response.text}")
|
||||
print("Deezer test credential created.")
|
||||
|
||||
# 4. Create Spotify credential
|
||||
data = {"name": SPOTIFY_ACCOUNT_NAME, "blob_content": SPOTIFY_BLOB_CONTENT, "region": "US"}
|
||||
response = requests.post(f"{base_url}/credentials/spotify/{SPOTIFY_ACCOUNT_NAME}", json=data)
|
||||
if response.status_code != 201:
|
||||
pytest.fail(f"Failed to create Spotify credential: {response.text}")
|
||||
print("Spotify test credential created.")
|
||||
|
||||
# 5. Set main config to use these accounts for downloads
|
||||
config_payload = {
|
||||
"spotify": SPOTIFY_ACCOUNT_NAME,
|
||||
"deezer": DEEZER_ACCOUNT_NAME,
|
||||
}
|
||||
response = requests.post(f"{base_url}/config", json=config_payload)
|
||||
if response.status_code != 200:
|
||||
pytest.fail(f"Failed to set main config for tests: {response.text}")
|
||||
print("Main config set to use test credentials.")
|
||||
|
||||
yield # This is where the tests will run
|
||||
|
||||
# --- Teardown ---
|
||||
print("\n--- Tearing down test credentials ---")
|
||||
response = requests.delete(f"{base_url}/credentials/spotify/{SPOTIFY_ACCOUNT_NAME}")
|
||||
assert response.status_code in [200, 404]
|
||||
response = requests.delete(f"{base_url}/credentials/deezer/{DEEZER_ACCOUNT_NAME}")
|
||||
assert response.status_code in [200, 404]
|
||||
print("Test credentials deleted.")
|
||||
94
tests/test_config.py
Normal file
94
tests/test_config.py
Normal file
@@ -0,0 +1,94 @@
|
||||
import requests
|
||||
import pytest
|
||||
|
||||
@pytest.fixture
|
||||
def reset_config(base_url):
|
||||
"""A fixture to ensure the main config is reset after a test case."""
|
||||
response = requests.get(f"{base_url}/config")
|
||||
assert response.status_code == 200
|
||||
original_config = response.json()
|
||||
yield
|
||||
response = requests.post(f"{base_url}/config", json=original_config)
|
||||
assert response.status_code == 200
|
||||
|
||||
def test_get_main_config(base_url):
|
||||
"""Tests if the main configuration can be retrieved."""
|
||||
response = requests.get(f"{base_url}/config")
|
||||
assert response.status_code == 200
|
||||
config = response.json()
|
||||
assert "service" in config
|
||||
assert "maxConcurrentDownloads" in config
|
||||
assert "spotify" in config # Should be set by conftest
|
||||
assert "deezer" in config # Should be set by conftest
|
||||
|
||||
def test_update_main_config(base_url, reset_config):
|
||||
"""Tests updating various fields in the main configuration."""
|
||||
new_settings = {
|
||||
"maxConcurrentDownloads": 5,
|
||||
"spotifyQuality": "HIGH",
|
||||
"deezerQuality": "FLAC",
|
||||
"customDirFormat": "%artist%/%album%",
|
||||
"customTrackFormat": "%tracknum% %title%",
|
||||
"save_cover": False,
|
||||
"fallback": True,
|
||||
}
|
||||
|
||||
response = requests.post(f"{base_url}/config", json=new_settings)
|
||||
assert response.status_code == 200
|
||||
updated_config = response.json()
|
||||
|
||||
for key, value in new_settings.items():
|
||||
assert updated_config[key] == value
|
||||
|
||||
def test_get_watch_config(base_url):
|
||||
"""Tests if the watch-specific configuration can be retrieved."""
|
||||
response = requests.get(f"{base_url}/config/watch")
|
||||
assert response.status_code == 200
|
||||
config = response.json()
|
||||
assert "delay_between_playlists_seconds" in config
|
||||
assert "delay_between_artists_seconds" in config
|
||||
|
||||
def test_update_watch_config(base_url):
|
||||
"""Tests updating the watch-specific configuration."""
|
||||
response = requests.get(f"{base_url}/config/watch")
|
||||
original_config = response.json()
|
||||
|
||||
new_settings = {
|
||||
"delay_between_playlists_seconds": 120,
|
||||
"delay_between_artists_seconds": 240,
|
||||
"auto_add_new_releases_to_queue": False,
|
||||
}
|
||||
|
||||
response = requests.post(f"{base_url}/config/watch", json=new_settings)
|
||||
assert response.status_code == 200
|
||||
updated_config = response.json()
|
||||
|
||||
for key, value in new_settings.items():
|
||||
assert updated_config[key] == value
|
||||
|
||||
# Revert to original
|
||||
requests.post(f"{base_url}/config/watch", json=original_config)
|
||||
|
||||
def test_update_conversion_config(base_url, reset_config):
|
||||
"""
|
||||
Iterates through all supported conversion formats and bitrates,
|
||||
updating the config and verifying the changes for each combination.
|
||||
"""
|
||||
conversion_formats = ["mp3", "flac", "ogg", "opus", "m4a"]
|
||||
bitrates = {
|
||||
"mp3": ["320", "256", "192", "128"],
|
||||
"ogg": ["500", "320", "192", "160"],
|
||||
"opus": ["256", "192", "128", "96"],
|
||||
"m4a": ["320k", "256k", "192k", "128k"],
|
||||
"flac": [None] # Bitrate is not applicable for FLAC
|
||||
}
|
||||
|
||||
for format in conversion_formats:
|
||||
for br in bitrates.get(format, [None]):
|
||||
print(f"Testing conversion config: format={format}, bitrate={br}")
|
||||
new_settings = {"convertTo": format, "bitrate": br}
|
||||
response = requests.post(f"{base_url}/config", json=new_settings)
|
||||
assert response.status_code == 200
|
||||
updated_config = response.json()
|
||||
assert updated_config["convertTo"] == format
|
||||
assert updated_config["bitrate"] == br
|
||||
128
tests/test_downloads.py
Normal file
128
tests/test_downloads.py
Normal file
@@ -0,0 +1,128 @@
|
||||
import requests
|
||||
import pytest
|
||||
|
||||
# URLs provided by the user for testing
|
||||
SPOTIFY_TRACK_URL = "https://open.spotify.com/track/1Cts4YV9aOXVAP3bm3Ro6r"
|
||||
SPOTIFY_ALBUM_URL = "https://open.spotify.com/album/4K0JVP5veNYTVI6IMamlla"
|
||||
SPOTIFY_PLAYLIST_URL = "https://open.spotify.com/playlist/26CiMxIxdn5WhXyccMCPOB"
|
||||
SPOTIFY_ARTIST_URL = "https://open.spotify.com/artist/7l6cdPhOLYO7lehz5xfzLV"
|
||||
|
||||
# Corresponding IDs extracted from URLs
|
||||
TRACK_ID = SPOTIFY_TRACK_URL.split('/')[-1].split('?')[0]
|
||||
ALBUM_ID = SPOTIFY_ALBUM_URL.split('/')[-1].split('?')[0]
|
||||
PLAYLIST_ID = SPOTIFY_PLAYLIST_URL.split('/')[-1].split('?')[0]
|
||||
ARTIST_ID = SPOTIFY_ARTIST_URL.split('/')[-1].split('?')[0]
|
||||
|
||||
@pytest.fixture
|
||||
def reset_config(base_url):
|
||||
"""Fixture to reset the main config after a test to avoid side effects."""
|
||||
response = requests.get(f"{base_url}/config")
|
||||
original_config = response.json()
|
||||
yield
|
||||
requests.post(f"{base_url}/config", json=original_config)
|
||||
|
||||
def test_download_track_spotify_only(base_url, task_waiter, reset_config):
|
||||
"""Tests downloading a single track from Spotify with real-time download enabled."""
|
||||
print("\n--- Testing Spotify-only track download ---")
|
||||
config_payload = {
|
||||
"service": "spotify",
|
||||
"fallback": False,
|
||||
"realTime": True,
|
||||
"spotifyQuality": "NORMAL" # Simulating free account quality
|
||||
}
|
||||
requests.post(f"{base_url}/config", json=config_payload)
|
||||
|
||||
response = requests.get(f"{base_url}/track/download/{TRACK_ID}")
|
||||
assert response.status_code == 202
|
||||
task_id = response.json()["task_id"]
|
||||
|
||||
final_status = task_waiter(task_id)
|
||||
assert final_status["status"] == "complete", f"Task failed: {final_status.get('error')}"
|
||||
|
||||
def test_download_album_spotify_only(base_url, task_waiter, reset_config):
|
||||
"""Tests downloading a full album from Spotify with real-time download enabled."""
|
||||
print("\n--- Testing Spotify-only album download ---")
|
||||
config_payload = {"service": "spotify", "fallback": False, "realTime": True, "spotifyQuality": "NORMAL"}
|
||||
requests.post(f"{base_url}/config", json=config_payload)
|
||||
|
||||
response = requests.get(f"{base_url}/album/download/{ALBUM_ID}")
|
||||
assert response.status_code == 202
|
||||
task_id = response.json()["task_id"]
|
||||
|
||||
final_status = task_waiter(task_id, timeout=900)
|
||||
assert final_status["status"] == "complete", f"Task failed: {final_status.get('error')}"
|
||||
|
||||
def test_download_playlist_spotify_only(base_url, task_waiter, reset_config):
|
||||
"""Tests downloading a full playlist from Spotify with real-time download enabled."""
|
||||
print("\n--- Testing Spotify-only playlist download ---")
|
||||
config_payload = {"service": "spotify", "fallback": False, "realTime": True, "spotifyQuality": "NORMAL"}
|
||||
requests.post(f"{base_url}/config", json=config_payload)
|
||||
|
||||
response = requests.get(f"{base_url}/playlist/download/{PLAYLIST_ID}")
|
||||
assert response.status_code == 202
|
||||
task_id = response.json()["task_id"]
|
||||
|
||||
final_status = task_waiter(task_id, timeout=1200)
|
||||
assert final_status["status"] == "complete", f"Task failed: {final_status.get('error')}"
|
||||
|
||||
def test_download_artist_spotify_only(base_url, task_waiter, reset_config):
|
||||
"""Tests queuing downloads for an artist's entire discography from Spotify."""
|
||||
print("\n--- Testing Spotify-only artist download ---")
|
||||
config_payload = {"service": "spotify", "fallback": False, "realTime": True, "spotifyQuality": "NORMAL"}
|
||||
requests.post(f"{base_url}/config", json=config_payload)
|
||||
|
||||
response = requests.get(f"{base_url}/artist/download/{ARTIST_ID}?album_type=album,single")
|
||||
assert response.status_code == 202
|
||||
response_data = response.json()
|
||||
queued_albums = response_data.get("successfully_queued_albums", [])
|
||||
assert len(queued_albums) > 0, "No albums were queued for the artist."
|
||||
|
||||
for album in queued_albums:
|
||||
task_id = album["task_id"]
|
||||
print(f"--- Waiting for artist album: {album['name']} ({task_id}) ---")
|
||||
final_status = task_waiter(task_id, timeout=900)
|
||||
assert final_status["status"] == "complete", f"Artist album task {album['name']} failed: {final_status.get('error')}"
|
||||
|
||||
def test_download_track_with_fallback(base_url, task_waiter, reset_config):
|
||||
"""Tests downloading a Spotify track with Deezer fallback enabled."""
|
||||
print("\n--- Testing track download with Deezer fallback ---")
|
||||
config_payload = {
|
||||
"service": "spotify",
|
||||
"fallback": True,
|
||||
"deezerQuality": "MP3_320" # Simulating higher quality from Deezer free
|
||||
}
|
||||
requests.post(f"{base_url}/config", json=config_payload)
|
||||
|
||||
response = requests.get(f"{base_url}/track/download/{TRACK_ID}")
|
||||
assert response.status_code == 202
|
||||
task_id = response.json()["task_id"]
|
||||
|
||||
final_status = task_waiter(task_id)
|
||||
assert final_status["status"] == "complete", f"Task failed: {final_status.get('error')}"
|
||||
|
||||
@pytest.mark.parametrize("format,bitrate", [
|
||||
("mp3", "320"), ("mp3", "128"),
|
||||
("flac", None),
|
||||
("ogg", "160"),
|
||||
("opus", "128"),
|
||||
("m4a", "128k")
|
||||
])
|
||||
def test_download_with_conversion(base_url, task_waiter, reset_config, format, bitrate):
|
||||
"""Tests downloading a track with various conversion formats and bitrates."""
|
||||
print(f"\n--- Testing conversion: {format} @ {bitrate or 'default'} ---")
|
||||
config_payload = {
|
||||
"service": "spotify",
|
||||
"fallback": False,
|
||||
"realTime": True,
|
||||
"spotifyQuality": "NORMAL",
|
||||
"convertTo": format,
|
||||
"bitrate": bitrate
|
||||
}
|
||||
requests.post(f"{base_url}/config", json=config_payload)
|
||||
|
||||
response = requests.get(f"{base_url}/track/download/{TRACK_ID}")
|
||||
assert response.status_code == 202
|
||||
task_id = response.json()["task_id"]
|
||||
|
||||
final_status = task_waiter(task_id)
|
||||
assert final_status["status"] == "complete", f"Download failed for format {format} bitrate {bitrate}: {final_status.get('error')}"
|
||||
61
tests/test_history.py
Normal file
61
tests/test_history.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import requests
|
||||
import pytest
|
||||
import time
|
||||
|
||||
TRACK_ID = "1Cts4YV9aOXVAP3bm3Ro6r" # Use a known, short track
|
||||
|
||||
@pytest.fixture
|
||||
def reset_config(base_url):
|
||||
"""Fixture to reset the main config after a test."""
|
||||
response = requests.get(f"{base_url}/config")
|
||||
original_config = response.json()
|
||||
yield
|
||||
requests.post(f"{base_url}/config", json=original_config)
|
||||
|
||||
def test_history_logging_and_filtering(base_url, task_waiter, reset_config):
|
||||
"""
|
||||
Tests if a completed download appears in the history and
|
||||
verifies that history filtering works correctly.
|
||||
"""
|
||||
# First, complete a download task to ensure there's a history entry
|
||||
config_payload = {"service": "spotify", "fallback": False, "realTime": True}
|
||||
requests.post(f"{base_url}/config", json=config_payload)
|
||||
response = requests.get(f"{base_url}/track/download/{TRACK_ID}")
|
||||
assert response.status_code == 202
|
||||
task_id = response.json()["task_id"]
|
||||
task_waiter(task_id) # Wait for the download to complete
|
||||
|
||||
# Give a moment for history to be written if it's asynchronous
|
||||
time.sleep(2)
|
||||
|
||||
# 1. Get all history and check if our task is present
|
||||
print("\n--- Verifying task appears in general history ---")
|
||||
response = requests.get(f"{base_url}/history")
|
||||
assert response.status_code == 200
|
||||
history_data = response.json()
|
||||
assert "entries" in history_data
|
||||
assert "total" in history_data
|
||||
assert history_data["total"] > 0
|
||||
|
||||
# Find our specific task in the history
|
||||
history_entry = next((entry for entry in history_data["entries"] if entry['task_id'] == task_id), None)
|
||||
assert history_entry is not None, f"Task {task_id} not found in download history."
|
||||
assert history_entry["status_final"] == "COMPLETED"
|
||||
|
||||
# 2. Test filtering for COMPLETED tasks
|
||||
print("\n--- Verifying history filtering for COMPLETED status ---")
|
||||
response = requests.get(f"{base_url}/history?filters[status_final]=COMPLETED")
|
||||
assert response.status_code == 200
|
||||
completed_history = response.json()
|
||||
assert completed_history["total"] > 0
|
||||
assert any(entry['task_id'] == task_id for entry in completed_history["entries"])
|
||||
assert all(entry['status_final'] == 'COMPLETED' for entry in completed_history["entries"])
|
||||
|
||||
# 3. Test filtering for an item name
|
||||
print(f"\n--- Verifying history filtering for item_name: {history_entry['item_name']} ---")
|
||||
item_name_query = requests.utils.quote(history_entry['item_name'])
|
||||
response = requests.get(f"{base_url}/history?filters[item_name]={item_name_query}")
|
||||
assert response.status_code == 200
|
||||
named_history = response.json()
|
||||
assert named_history["total"] > 0
|
||||
assert any(entry['task_id'] == task_id for entry in named_history["entries"])
|
||||
93
tests/test_prgs.py
Normal file
93
tests/test_prgs.py
Normal file
@@ -0,0 +1,93 @@
|
||||
import requests
|
||||
import pytest
|
||||
import time
|
||||
|
||||
# Use a known, short track for quick tests
|
||||
TRACK_ID = "1Cts4YV9aOXVAP3bm3Ro6r"
|
||||
# Use a long playlist to ensure there's time to cancel it
|
||||
LONG_PLAYLIST_ID = "6WsyUEITURbQXZsqtEewb1" # Today's Top Hits on Spotify
|
||||
|
||||
@pytest.fixture
|
||||
def reset_config(base_url):
|
||||
"""Fixture to reset the main config after a test."""
|
||||
response = requests.get(f"{base_url}/config")
|
||||
original_config = response.json()
|
||||
yield
|
||||
requests.post(f"{base_url}/config", json=original_config)
|
||||
|
||||
def test_list_tasks(base_url, reset_config):
|
||||
"""Tests listing all active tasks."""
|
||||
config_payload = {"service": "spotify", "fallback": False, "realTime": True}
|
||||
requests.post(f"{base_url}/config", json=config_payload)
|
||||
|
||||
# Start a task
|
||||
response = requests.get(f"{base_url}/track/download/{TRACK_ID}")
|
||||
assert response.status_code == 202
|
||||
task_id = response.json()["task_id"]
|
||||
|
||||
# Check the list to see if our task appears
|
||||
response = requests.get(f"{base_url}/prgs/list")
|
||||
assert response.status_code == 200
|
||||
tasks = response.json()
|
||||
assert isinstance(tasks, list)
|
||||
assert any(t['task_id'] == task_id for t in tasks)
|
||||
|
||||
# Clean up by cancelling the task
|
||||
requests.post(f"{base_url}/prgs/cancel/{task_id}")
|
||||
|
||||
def test_get_task_progress_and_log(base_url, task_waiter, reset_config):
|
||||
"""Tests getting progress for a running task and retrieving its log after completion."""
|
||||
config_payload = {"service": "spotify", "fallback": False, "realTime": True}
|
||||
requests.post(f"{base_url}/config", json=config_payload)
|
||||
|
||||
response = requests.get(f"{base_url}/track/download/{TRACK_ID}")
|
||||
assert response.status_code == 202
|
||||
task_id = response.json()["task_id"]
|
||||
|
||||
# Poll progress a few times while it's running to check the endpoint
|
||||
for _ in range(3):
|
||||
time.sleep(1)
|
||||
res = requests.get(f"{base_url}/prgs/{task_id}")
|
||||
if res.status_code == 200 and res.json():
|
||||
statuses = res.json()
|
||||
assert isinstance(statuses, list)
|
||||
assert "status" in statuses[-1]
|
||||
break
|
||||
else:
|
||||
pytest.fail("Could not get a valid task status in time.")
|
||||
|
||||
# Wait for completion
|
||||
final_status = task_waiter(task_id)
|
||||
assert final_status["status"] == "complete"
|
||||
|
||||
# After completion, check the task log endpoint
|
||||
res = requests.get(f"{base_url}/prgs/{task_id}?log=true")
|
||||
assert res.status_code == 200
|
||||
log_data = res.json()
|
||||
assert "task_log" in log_data
|
||||
assert len(log_data["task_log"]) > 0
|
||||
assert "status" in log_data["task_log"][0]
|
||||
|
||||
def test_cancel_task(base_url, reset_config):
|
||||
"""Tests cancelling a task shortly after it has started."""
|
||||
config_payload = {"service": "spotify", "fallback": False, "realTime": True}
|
||||
requests.post(f"{base_url}/config", json=config_payload)
|
||||
|
||||
response = requests.get(f"{base_url}/playlist/download/{LONG_PLAYLIST_ID}")
|
||||
assert response.status_code == 202
|
||||
task_id = response.json()["task_id"]
|
||||
|
||||
# Give it a moment to ensure it has started processing
|
||||
time.sleep(3)
|
||||
|
||||
# Cancel the task
|
||||
response = requests.post(f"{base_url}/prgs/cancel/{task_id}")
|
||||
assert response.status_code == 200
|
||||
assert response.json()["status"] == "cancelled"
|
||||
|
||||
# Check the final status to confirm it's marked as cancelled
|
||||
time.sleep(2) # Allow time for the final status to propagate
|
||||
res = requests.get(f"{base_url}/prgs/{task_id}")
|
||||
assert res.status_code == 200
|
||||
last_status = res.json()[-1]
|
||||
assert last_status["status"] == "cancelled"
|
||||
35
tests/test_search.py
Normal file
35
tests/test_search.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import requests
|
||||
import pytest
|
||||
|
||||
def test_search_spotify_artist(base_url):
|
||||
"""Tests searching for an artist on Spotify."""
|
||||
response = requests.get(f"{base_url}/search?q=Daft+Punk&search_type=artist")
|
||||
assert response.status_code == 200
|
||||
results = response.json()
|
||||
assert "items" in results
|
||||
assert len(results["items"]) > 0
|
||||
assert "Daft Punk" in results["items"][0]["name"]
|
||||
|
||||
def test_search_spotify_track(base_url):
|
||||
"""Tests searching for a track on Spotify."""
|
||||
response = requests.get(f"{base_url}/search?q=Get+Lucky&search_type=track")
|
||||
assert response.status_code == 200
|
||||
results = response.json()
|
||||
assert "items" in results
|
||||
assert len(results["items"]) > 0
|
||||
|
||||
def test_search_deezer_track(base_url):
|
||||
"""Tests searching for a track on Deezer."""
|
||||
response = requests.get(f"{base_url}/search?q=Instant+Crush&search_type=track")
|
||||
assert response.status_code == 200
|
||||
results = response.json()
|
||||
assert "items" in results
|
||||
assert len(results["items"]) > 0
|
||||
|
||||
def test_search_deezer_album(base_url):
|
||||
"""Tests searching for an album on Deezer."""
|
||||
response = requests.get(f"{base_url}/search?q=Random+Access+Memories&search_type=album")
|
||||
assert response.status_code == 200
|
||||
results = response.json()
|
||||
assert "items" in results
|
||||
assert len(results["items"]) > 0
|
||||
117
tests/test_watch.py
Normal file
117
tests/test_watch.py
Normal file
@@ -0,0 +1,117 @@
|
||||
import requests
|
||||
import pytest
|
||||
import time
|
||||
|
||||
SPOTIFY_PLAYLIST_ID = "26CiMxIxdn5WhXyccMCPOB"
|
||||
SPOTIFY_ARTIST_ID = "7l6cdPhOLYO7lehz5xfzLV"
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_and_cleanup_watch_tests(base_url):
|
||||
"""
|
||||
A fixture that enables watch mode, cleans the watchlist before each test,
|
||||
and then restores original state and cleans up after each test.
|
||||
"""
|
||||
# Get original watch config to restore it later
|
||||
response = requests.get(f"{base_url}/config/watch")
|
||||
assert response.status_code == 200
|
||||
original_config = response.json()
|
||||
|
||||
# Enable watch mode for testing if it's not already
|
||||
if not original_config.get("enabled"):
|
||||
response = requests.post(f"{base_url}/config/watch", json={"enabled": True})
|
||||
assert response.status_code == 200
|
||||
|
||||
# Cleanup any existing watched items before the test
|
||||
requests.delete(f"{base_url}/playlist/watch/{SPOTIFY_PLAYLIST_ID}")
|
||||
requests.delete(f"{base_url}/artist/watch/{SPOTIFY_ARTIST_ID}")
|
||||
|
||||
yield
|
||||
|
||||
# Cleanup watched items created during the test
|
||||
requests.delete(f"{base_url}/playlist/watch/{SPOTIFY_PLAYLIST_ID}")
|
||||
requests.delete(f"{base_url}/artist/watch/{SPOTIFY_ARTIST_ID}")
|
||||
|
||||
# Restore original watch config
|
||||
response = requests.post(f"{base_url}/config/watch", json=original_config)
|
||||
assert response.status_code == 200
|
||||
|
||||
def test_add_and_list_playlist_to_watch(base_url):
|
||||
"""Tests adding a playlist to the watch list and verifying it appears in the list."""
|
||||
response = requests.put(f"{base_url}/playlist/watch/{SPOTIFY_PLAYLIST_ID}")
|
||||
assert response.status_code == 200
|
||||
assert "Playlist added to watch list" in response.json()["message"]
|
||||
|
||||
# Verify it's in the watched list
|
||||
response = requests.get(f"{base_url}/playlist/watch/list")
|
||||
assert response.status_code == 200
|
||||
watched_playlists = response.json()
|
||||
assert any(p['spotify_id'] == SPOTIFY_PLAYLIST_ID for p in watched_playlists)
|
||||
|
||||
def test_add_and_list_artist_to_watch(base_url):
|
||||
"""Tests adding an artist to the watch list and verifying it appears in the list."""
|
||||
response = requests.put(f"{base_url}/artist/watch/{SPOTIFY_ARTIST_ID}")
|
||||
assert response.status_code == 200
|
||||
assert "Artist added to watch list" in response.json()["message"]
|
||||
|
||||
# Verify it's in the watched list
|
||||
response = requests.get(f"{base_url}/artist/watch/list")
|
||||
assert response.status_code == 200
|
||||
watched_artists = response.json()
|
||||
assert any(a['spotify_id'] == SPOTIFY_ARTIST_ID for a in watched_artists)
|
||||
|
||||
def test_trigger_playlist_check(base_url):
|
||||
"""Tests the endpoint for manually triggering a check on a watched playlist."""
|
||||
# First, add the playlist to the watch list
|
||||
requests.put(f"{base_url}/playlist/watch/{SPOTIFY_PLAYLIST_ID}")
|
||||
|
||||
# Trigger the check
|
||||
response = requests.post(f"{base_url}/playlist/watch/trigger_check/{SPOTIFY_PLAYLIST_ID}")
|
||||
assert response.status_code == 200
|
||||
assert "Check triggered for playlist" in response.json()["message"]
|
||||
|
||||
# A full verification would require inspecting the database or new tasks,
|
||||
# but for an API test, confirming the trigger endpoint responds correctly is the key goal.
|
||||
print("Playlist check triggered. Note: This does not verify new downloads were queued.")
|
||||
|
||||
def test_trigger_artist_check(base_url):
|
||||
"""Tests the endpoint for manually triggering a check on a watched artist."""
|
||||
# First, add the artist to the watch list
|
||||
requests.put(f"{base_url}/artist/watch/{SPOTIFY_ARTIST_ID}")
|
||||
|
||||
# Trigger the check
|
||||
response = requests.post(f"{base_url}/artist/watch/trigger_check/{SPOTIFY_ARTIST_ID}")
|
||||
assert response.status_code == 200
|
||||
assert "Check triggered for artist" in response.json()["message"]
|
||||
print("Artist check triggered. Note: This does not verify new downloads were queued.")
|
||||
|
||||
def test_remove_playlist_from_watch(base_url):
|
||||
"""Tests removing a playlist from the watch list."""
|
||||
# Add the playlist first to ensure it exists
|
||||
requests.put(f"{base_url}/playlist/watch/{SPOTIFY_PLAYLIST_ID}")
|
||||
|
||||
# Now, remove it
|
||||
response = requests.delete(f"{base_url}/playlist/watch/{SPOTIFY_PLAYLIST_ID}")
|
||||
assert response.status_code == 200
|
||||
assert "Playlist removed from watch list" in response.json()["message"]
|
||||
|
||||
# Verify it's no longer in the list
|
||||
response = requests.get(f"{base_url}/playlist/watch/list")
|
||||
assert response.status_code == 200
|
||||
watched_playlists = response.json()
|
||||
assert not any(p['spotify_id'] == SPOTIFY_PLAYLIST_ID for p in watched_playlists)
|
||||
|
||||
def test_remove_artist_from_watch(base_url):
|
||||
"""Tests removing an artist from the watch list."""
|
||||
# Add the artist first to ensure it exists
|
||||
requests.put(f"{base_url}/artist/watch/{SPOTIFY_ARTIST_ID}")
|
||||
|
||||
# Now, remove it
|
||||
response = requests.delete(f"{base_url}/artist/watch/{SPOTIFY_ARTIST_ID}")
|
||||
assert response.status_code == 200
|
||||
assert "Artist removed from watch list" in response.json()["message"]
|
||||
|
||||
# Verify it's no longer in the list
|
||||
response = requests.get(f"{base_url}/artist/watch/list")
|
||||
assert response.status_code == 200
|
||||
watched_artists = response.json()
|
||||
assert not any(a['spotify_id'] == SPOTIFY_ARTIST_ID for a in watched_artists)
|
||||
Reference in New Issue
Block a user