diff --git a/.gitignore b/.gitignore index 8dc6088b5a..79bba2ea0a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ /.idea/ /docs/build/ **/__pycache__/ -.venv \ No newline at end of file +.venv +.vscode/ diff --git a/build_stream/.env b/build_stream/.env new file mode 100644 index 0000000000..319f554191 --- /dev/null +++ b/build_stream/.env @@ -0,0 +1,21 @@ +# Example environment configuration for UAT tests +# Copy this file to .env and update with your values + +# Required: Registration password for OAuth client registration +BUILD_STREAM_AUTH_PASSWORD=dell1234 + +# Optional: Override server URL (defaults to http://localhost:8000) +BUILD_STREAM_BASE_URL=https://182.10.5.157:8010 + +# Optional: Override auth username (defaults to build_stream_registrar) +BUILD_STREAM_AUTH_USERNAME=admin + +# Optional: Override client name (defaults to uat-test-client) +# BUILD_STREAM_CLIENT_NAME=uat-test-client + +# Optional: Override client ID and secret (for existing clients) +BUILD_STREAM_CLIENT_ID=bld_57b2ac8e855da60ec06c2315b3e41d9d +BUILD_STREAM_CLIENT_SECRET=bld_s_XjsqZSNAmHTXNOkIALl7BPczeGVDs75ZlEg1rvoHHfQ + +# Optional: Override client scopes (comma-separated) +BUILD_STREAM_CLIENT_SCOPES=catalog:read,catalog:write,job:write diff --git a/build_stream/.env.test b/build_stream/.env.test index 7cee8820af..e32bbbdf66 100644 --- a/build_stream/.env.test +++ b/build_stream/.env.test @@ -1,5 +1,9 @@ +# Example environment configuration for Tests + +# This file can be used with: python -m pytest --env-file .env.test or +# Copy this file to .env and update with your values + # Test environment variables for PostgreSQL integration tests -# This file can be used with: python -m pytest --env-file .env.test # Database connection for integration tests TEST_DATABASE_URL=postgresql://admin:dell1234@localhost:5432/build_stream_db @@ -8,3 +12,24 @@ DATABASE_URL=postgresql://admin:dell1234@localhost:5432/build_stream_db # Alternative: Use environment-specific config # For production container: postgresql://{{ postgres_user }}:{{ postgres_password }}@localhost:5432/{{ postgres_db_name }} # For local development: postgresql://admin:dell1234@localhost:5432/build_stream_db + +# Example environment configuration for UAT tests + +# Optional: Override server URL (defaults to http://localhost:8000) +BUILD_STREAM_BASE_URL=https://182.10.0.100:8010 + +# Optional: Override auth username (defaults to build_stream_registrar) +BUILD_STREAM_AUTH_USERNAME=admin + +# Required: Registration password for OAuth client registration +BUILD_STREAM_AUTH_PASSWORD=Test@1234 + +# Optional: Override client name (defaults to uat-test-client) +# BUILD_STREAM_CLIENT_NAME=uat-test-client + +# Optional: Override client ID and secret (for existing clients) +BUILD_STREAM_CLIENT_ID= +BUILD_STREAM_CLIENT_SECRET= + +# Optional: Override client scopes (comma-separated) +# BUILD_STREAM_CLIENT_SCOPES=catalog:read,catalog:write,job:write \ No newline at end of file diff --git a/build_stream/pytest.ini b/build_stream/pytest.ini index 4be0ba0e39..05059d9b80 100644 --- a/build_stream/pytest.ini +++ b/build_stream/pytest.ini @@ -8,7 +8,8 @@ markers = unit: marks tests as unit tests integration: marks tests as integration tests e2e: marks tests as end-to-end tests + uat: marks tests as user acceptance tests (requires infra ready) env = ENV = dev TEST_DATABASE_URL = postgresql://admin:dell1234@localhost:5432/build_stream_db - DATABASE_URL = postgresql://admin:dell1234@localhost:5432/build_stream_db + DATABASE_URL = postgresql://admin:dell1234@localhost:5432/build_stream_db \ No newline at end of file diff --git a/build_stream/tests/uat/__init__.py b/build_stream/tests/uat/__init__.py new file mode 100644 index 0000000000..7ad07c7b1f --- /dev/null +++ b/build_stream/tests/uat/__init__.py @@ -0,0 +1,25 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""User Acceptance Tests (UAT) for Build Stream API. + +These tests assume the API server is already running and configured. +They focus on API contract validation and user workflows without infrastructure setup. + +To run UAT tests: + pytest tests/uat/ -v -m uat + +Required environment variable: + BUILD_STREAM_AUTH_PASSWORD - Password for OAuth registration +""" diff --git a/build_stream/tests/uat/conftest.py b/build_stream/tests/uat/conftest.py new file mode 100644 index 0000000000..0001e30365 --- /dev/null +++ b/build_stream/tests/uat/conftest.py @@ -0,0 +1,429 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Shared fixtures for User Acceptance Tests (UAT). + +These fixtures assume the Build Stream API server is already running and configured. +Tests focus on API contract validation and user workflows without infrastructure setup. +""" + +import os +import uuid +from typing import Dict, Generator +from unittest.mock import patch +from pathlib import Path + +import httpx +import pytest + +# UAT Configuration +# Direct mapping of environment variables to their default values +UAT_CONFIG = { + "BUILD_STREAM_BASE_URL": "http://localhost:8000", + "BUILD_STREAM_AUTH_USERNAME": "build_stream_registrar", + "BUILD_STREAM_CLIENT_NAME": "uat-test-client", + "BUILD_STREAM_CLIENT_SCOPES": ["catalog:read", "catalog:write", "job:read", "job:write"], + # Required variables (no defaults - must be set in environment) + "BUILD_STREAM_AUTH_PASSWORD": None, # Required + "BUILD_STREAM_CLIENT_ID": None, # Required + "BUILD_STREAM_CLIENT_SECRET": None, # Required +} + +# Load environment variables from .env file if it exists +try: + from dotenv import load_dotenv + + # Try multiple paths for .env file + env_paths = [ + # Relative to conftest.py (most reliable) + Path(__file__).parent.parent.parent / ".env", + # Current working directory + Path.cwd() / ".env", + # One level up from current directory + Path.cwd().parent / ".env" + ] + + env_loaded = False + for env_file in env_paths: + print(f"\nChecking for .env file at: {env_file}") + if env_file.exists(): + load_dotenv(env_file) + print(f"Loaded environment variables from {env_file}") + env_loaded = True + break + + if not env_loaded: + print("āš ļø No .env file found in any of the expected locations") + +except ImportError: + # python-dotenv not available, continue without it + print("āš ļø python-dotenv not available, environment variables not loaded from .env") + pass + +@pytest.fixture(scope="session") +def base_url() -> str: + """Get the API server base URL from environment variable or default. + + Returns: + Base URL for API requests. + """ + return os.getenv("BUILD_STREAM_BASE_URL", UAT_CONFIG["BUILD_STREAM_BASE_URL"]) + + +@pytest.fixture(scope="session") +def auth_username() -> str: + """Get the auth username from environment variable or default. + + Returns: + Username for OAuth registration. + """ + return os.getenv("BUILD_STREAM_AUTH_USERNAME", UAT_CONFIG["BUILD_STREAM_AUTH_USERNAME"]) + + +@pytest.fixture(scope="session") +def auth_password() -> str: + """Get the auth password from environment variable. + + Returns: + Password for OAuth registration. + + Raises: + ValueError: If BUILD_STREAM_AUTH_PASSWORD is not set. + """ + password = os.getenv("BUILD_STREAM_AUTH_PASSWORD") + if not password: + raise ValueError( + "BUILD_STREAM_AUTH_PASSWORD environment variable is required for UAT tests. " + "Set it to the registration password for the Build Stream API." + ) + return password + + +@pytest.fixture(scope="session") +def client_name() -> str: + """Get the client name from environment variable or use default with unique suffix. + + Returns: + Client name for OAuth registration. + """ + # Environment variable has highest priority + env_name = os.getenv("BUILD_STREAM_CLIENT_NAME") + if env_name: + return env_name + + # Use default with unique suffix to avoid conflicts + base_name = UAT_CONFIG["BUILD_STREAM_CLIENT_NAME"] + unique_suffix = str(uuid.uuid4())[:8] + return f"{base_name}-{unique_suffix}" + + +@pytest.fixture(scope="session") +def client_scopes() -> list: + """Get the client scopes from environment variable or use default. + + Returns: + List of scopes for OAuth registration. + """ + # Environment variable has highest priority + env_scopes = os.getenv("BUILD_STREAM_CLIENT_SCOPES") + if env_scopes: + return [scope.strip() for scope in env_scopes.split(',')] + + return UAT_CONFIG["BUILD_STREAM_CLIENT_SCOPES"] + + +@pytest.fixture(scope="session") +def client_id() -> str: + """Get the client ID from environment variable. + + Returns: + Client ID for OAuth authentication. + """ + client_id = os.getenv("BUILD_STREAM_CLIENT_ID") + if not client_id: + raise ValueError( + "BUILD_STREAM_CLIENT_ID environment variable is required for UAT tests. " + "Set it to the existing client ID or use client registration flow." + ) + return client_id + + +@pytest.fixture(scope="session") +def client_secret() -> str: + """Get the client secret from environment variable. + + Returns: + Client secret for OAuth authentication. + """ + client_secret = os.getenv("BUILD_STREAM_CLIENT_SECRET") + if not client_secret: + raise ValueError( + "BUILD_STREAM_CLIENT_SECRET environment variable is required for UAT tests. " + "Set it to the existing client secret or use client registration flow." + ) + return client_secret + + +@pytest.fixture(scope="session") +def http_client(base_url: str) -> Generator[httpx.Client, None, None]: + """Create httpx client configured for API requests. + + Args: + base_url: Base URL for API requests. + + Yields: + Configured httpx.Client instance. + """ + # Disable SSL verification for development environments with self-signed certificates + # For HTTPS URLs in development, we often have self-signed certificates + verify_ssl = not base_url.startswith("https://") # Disable SSL for HTTPS in dev + + with httpx.Client( + base_url=base_url, + timeout=30.0, + headers={"User-Agent": "UAT-Tests/1.0"}, + verify=verify_ssl + ) as client: + yield client + + +@pytest.fixture(scope="session") +def registered_client(http_client: httpx.Client, auth_username: str, auth_password: str, + client_name: str, client_scopes: list) -> Dict[str, str]: + """Get OAuth client credentials for UAT tests with registration fallback. + + This fixture first tries to use existing client credentials from environment variables. + If that fails, it attempts to register a new client using auth credentials. + + Args: + http_client: httpx client for API requests. + auth_username: Registration username. + auth_password: Registration password. + client_name: OAuth client name. + client_scopes: OAuth client scopes. + + Returns: + Dictionary containing client credentials and access token. + """ + import base64 + + # Try to use existing client credentials from environment first + env_client_id = os.getenv("BUILD_STREAM_CLIENT_ID") + env_client_secret = os.getenv("BUILD_STREAM_CLIENT_SECRET") + + if env_client_id and env_client_secret: + print("šŸ”‘ Using existing client credentials from environment") + try: + # Try to get token with existing credentials + token_data = { + "grant_type": "client_credentials", + "client_id": env_client_id, + "client_secret": env_client_secret, + "scope": " ".join(client_scopes), + } + + credentials = base64.b64encode(f"{env_client_id}:{env_client_secret}".encode()).decode() + token_headers = { + "Authorization": f"Basic {credentials}", + "Content-Type": "application/x-www-form-urlencoded", + } + + token_response = http_client.post( + "/api/v1/auth/token", + data=token_data, + headers=token_headers + ) + + if token_response.status_code == 200: + token_result = token_response.json() + print("āœ… Successfully authenticated with existing client") + return { + "client_id": env_client_id, + "client_secret": env_client_secret, + "access_token": token_result["access_token"], + "token_type": "Bearer", + } + else: + print(f"āš ļø Existing client authentication failed: {token_response.status_code}") + except Exception as e: + print(f"āš ļø Failed to use existing client: {e}") + + # Fallback: Register a new client + print("šŸ”„ Attempting to register new client...") + + registration_data = { + "client_name": client_name, + "allowed_scopes": client_scopes, + } + + # Basic auth for registration + reg_credentials = base64.b64encode(f"{auth_username}:{auth_password}".encode()).decode() + reg_headers = { + "Authorization": f"Basic {reg_credentials}", + "Content-Type": "application/json", + } + + try: + reg_response = http_client.post("/api/v1/auth/register", json=registration_data, headers=reg_headers) + if reg_response.status_code == 201: + # Registration successful + reg_result = reg_response.json() + client_id = reg_result["client_id"] + client_secret = reg_result["client_secret"] + print(f"āœ… Successfully registered new client: {client_id}") + elif reg_response.status_code == 409: + # Client already exists - this shouldn't happen with unique names + raise ValueError(f"Client '{client_name}' already exists. Try a different client name.") + else: + raise RuntimeError(f"Client registration failed: {reg_response.status_code} - {reg_response.text}") + except httpx.RequestError as e: + raise RuntimeError(f"Failed to connect to API server for registration: {e}") + + # Get access token with newly registered client + token_data = { + "grant_type": "client_credentials", + "client_id": client_id, + "client_secret": client_secret, + "scope": " ".join(client_scopes), + } + + token_headers = { + "Authorization": f"Basic {reg_credentials}", + "Content-Type": "application/x-www-form-urlencoded", + } + + try: + token_response = http_client.post( + "/api/v1/auth/token", + data=token_data, + headers=token_headers + ) + if token_response.status_code == 200: + token_result = token_response.json() + print("āœ… Successfully obtained access token") + else: + raise RuntimeError(f"Token generation failed: {token_response.status_code} - {token_response.text}") + except httpx.RequestError as e: + raise RuntimeError(f"Failed to get access token: {e}") + + return { + "client_id": client_id, + "client_secret": client_secret, + "access_token": token_result["access_token"], + "token_type": "Bearer", + } + + +@pytest.fixture(scope="function") +def auth_headers(registered_client: Dict[str, str]) -> Dict[str, str]: + """Get authorization headers with Bearer token. + + Args: + registered_client: Registered client credentials. + + Returns: + Dictionary with Authorization header. + """ + return { + "Authorization": f"Bearer {registered_client['access_token']}", + "Content-Type": "application/json", + } + + +@pytest.fixture(scope="function") +def unique_idempotency_key() -> str: + """Generate unique idempotency key for each test. + + Returns: + Unique idempotency key. + """ + return f"uat-test-{uuid.uuid4()}" + + +@pytest.fixture(scope="function") +def unique_correlation_id() -> str: + """Generate unique correlation ID for each test. + + Returns: + Unique correlation ID. + """ + return str(uuid.uuid4()) + + +@pytest.fixture(scope="function") +def auth_headers_with_ids(auth_headers: Dict[str, str], unique_idempotency_key: str, + unique_correlation_id: str) -> Dict[str, str]: + """Get authorization headers with unique IDs. + + Args: + auth_headers: Base authorization headers. + unique_idempotency_key: Unique idempotency key. + unique_correlation_id: Unique correlation ID. + + Returns: + Headers with Authorization, Idempotency-Key, and X-Correlation-Id. + """ + headers = auth_headers.copy() + headers["Idempotency-Key"] = unique_idempotency_key + headers["X-Correlation-Id"] = unique_correlation_id + return headers + + +@pytest.fixture(scope="function") +def invalid_job_id() -> str: + """Generate invalid job ID for testing error scenarios. + + Returns: + Invalid job ID that doesn't exist. + """ + return "00000000-0000-0000-0000-000000000000" + + +@pytest.fixture(scope="function") +def sample_catalog_content() -> bytes: + """Get sample catalog content for testing parse catalog. + + Returns: + Sample catalog JSON as bytes. + """ + import json + catalog_data = { + "metadata": { + "name": "test-catalog", + "version": "1.0.0", + "description": "Test catalog for UAT", + }, + "software": [ + { + "name": "test-package", + "version": "1.0.0", + "arch": "x86_64", + "repository": "test-repo", + } + ], + } + return json.dumps(catalog_data, indent=2).encode('utf-8') + + +@pytest.fixture(scope="session") +def real_catalog_content() -> bytes: + """Load real RHEL catalog content for testing. + + Returns: + Real catalog JSON content as bytes for file upload testing. + """ + from pathlib import Path + catalog_path = Path(__file__).parent.parent.parent.parent / "examples" / "catalog" / "catalog_rhel.json" + with open(catalog_path, "rb") as f: + return f.read() diff --git a/build_stream/tests/uat/debug_env.py b/build_stream/tests/uat/debug_env.py new file mode 100644 index 0000000000..5e1c47a0c6 --- /dev/null +++ b/build_stream/tests/uat/debug_env.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +""" +Debug script to help identify IDE vs terminal environment differences. +Run this in both IDE and terminal to compare outputs. +""" + +import os +import sys +from pathlib import Path + +print("=== Environment Debug Script ===") +print(f"Python executable: {sys.executable}") +print(f"Python version: {sys.version}") +print(f"Current working directory: {Path.cwd()}") +print(f"Script location: {Path(__file__).parent}") + +print("\n=== Environment Variables ===") +env_vars = [ + "BUILD_STREAM_CLIENT_ID", + "BUILD_STREAM_CLIENT_SECRET", + "BUILD_STREAM_AUTH_PASSWORD", + "BUILD_STREAM_BASE_URL", + "BUILD_STREAM_AUTH_USERNAME", + "BUILD_STREAM_CLIENT_SCOPES" +] + +for var in env_vars: + value = os.getenv(var) + if value: + # Mask sensitive values + if "SECRET" in var or "PASSWORD" in var: + display_value = value[:4] + "*" * (len(value) - 4) + else: + display_value = value + print(f"āœ… {var}: {display_value}") + else: + print(f"āŒ {var}: Not set") + +print("\n=== .env File Search ===") +env_paths = [ + Path(__file__).parent.parent.parent / ".env", + Path.cwd() / ".env", + Path.cwd().parent / ".env", + Path("/opt/omnia/windsurf/build_stream_venu_oim/build_stream/.env"), +] + +for i, env_file in enumerate(env_paths, 1): + exists = env_file.exists() + print(f"{i}. {env_file} - {'āœ… EXISTS' if exists else 'āŒ NOT FOUND'}") + +print("\n=== python-dotenv Test ===") +try: + from dotenv import load_dotenv + print("āœ… python-dotenv is available") + + # Test loading from the first available .env + for env_file in env_paths: + if env_file.exists(): + print(f"Testing load from: {env_file}") + load_dotenv(env_file) + + # Check if variables are now loaded + client_id = os.getenv("BUILD_STREAM_CLIENT_ID") + if client_id: + print("āœ… Successfully loaded environment variables") + else: + print("āŒ Failed to load environment variables") + break +except ImportError: + print("āŒ python-dotenv not available") + +print("\n=== PYTHONPATH ===") +for path in sys.path[:5]: # Show first 5 paths + print(f" {path}") + +print("\n=== IDE vs Terminal Diagnosis ===") +print("If you see differences between IDE and terminal:") +print("1. Check if IDE uses different Python interpreter") +print("2. Check if IDE has different working directory") +print("3. Check if IDE has different PYTHONPATH") +print("4. Check if IDE has different environment variables") diff --git a/build_stream/tests/uat/test_00_auth_flow.py b/build_stream/tests/uat/test_00_auth_flow.py new file mode 100644 index 0000000000..df56516a74 --- /dev/null +++ b/build_stream/tests/uat/test_00_auth_flow.py @@ -0,0 +1,92 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""UAT tests for OAuth2 authentication flow.""" + +import base64 +import uuid +import httpx +import pytest + + +@pytest.mark.uat +class TestClientRegistration: + """Test OAuth2 client registration.""" + + def test_register_client_with_valid_credentials( + self, http_client: httpx.Client, auth_username: str, auth_password: str + ): + """Test client registration with valid credentials.""" + credentials = base64.b64encode(f"{auth_username}:{auth_password}".encode()).decode() + headers = { + "Authorization": f"Basic {credentials}", + "Content-Type": "application/json", + } + + payload = { + "client_name": f"uat-test-{uuid.uuid4()}", + "allowed_scopes": ["catalog:read", "catalog:write"], + } + + response = http_client.post("/api/v1/auth/register", json=payload, headers=headers) + + # May be 201 (new) or 409 (already exists) + assert response.status_code in [201, 409] + if response.status_code == 201: + data = response.json() + assert "client_id" in data + assert "client_secret" in data + assert data["client_id"].startswith("bld_") + assert data["client_secret"].startswith("bld_s_") + + def test_register_client_with_invalid_credentials_returns_401( + self, http_client: httpx.Client + ): + """Test client registration with invalid credentials returns 401.""" + credentials = base64.b64encode(b"invalid:invalid").decode() + headers = { + "Authorization": f"Basic {credentials}", + "Content-Type": "application/json", + } + + payload = { + "client_name": "test-client", + "allowed_scopes": ["catalog:read"], + } + + response = http_client.post("/api/v1/auth/register", json=payload, headers=headers) + + assert response.status_code == 401 + + +@pytest.mark.uat +class TestTokenGeneration: + """Test OAuth2 token generation.""" + + def test_generate_token_with_valid_credentials(self, registered_client: dict): + """Test token generation with valid client credentials.""" + assert "access_token" in registered_client + assert "token_type" in registered_client + assert registered_client["token_type"] == "Bearer" + assert len(registered_client["access_token"]) > 0 + + def test_token_contains_valid_structure(self, registered_client: dict): + """Test generated token has valid JWT structure.""" + token = registered_client["access_token"] + # JWT has 3 parts separated by dots + parts = token.split('.') + assert len(parts) == 3 + # Each part should be base64 encoded + for part in parts: + assert len(part) > 0 diff --git a/build_stream/tests/uat/test_00_config.py b/build_stream/tests/uat/test_00_config.py new file mode 100644 index 0000000000..c5a286137b --- /dev/null +++ b/build_stream/tests/uat/test_00_config.py @@ -0,0 +1,70 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""UAT tests for configuration loading.""" + +import pytest + + +@pytest.mark.uat +class TestConfiguration: + """Test UAT configuration is loaded correctly.""" + + def test_base_url_configuration(self, base_url: str): + """Test base URL is configured correctly.""" + assert base_url is not None + assert base_url.startswith(("http://", "https://")) + assert len(base_url) > 0 + print(f"āœ… Base URL configured: {base_url}") + + def test_auth_username_configuration(self, auth_username: str): + """Test auth username is configured correctly.""" + assert auth_username is not None + assert len(auth_username) > 0 + print(f"āœ… Auth username configured: {auth_username}") + + def test_auth_password_configuration(self, auth_password: str): + """Test auth password is loaded from environment.""" + assert auth_password is not None + assert len(auth_password) > 0 + print(f"āœ… Auth password loaded from environment: {'*' * len(auth_password)}") + + def test_client_name_configuration(self, client_name: str): + """Test client name is configured correctly.""" + assert client_name is not None + assert len(client_name) > 0 + print(f"āœ… Client name configured: {client_name}") + + def test_client_scopes_configuration(self, client_scopes: list): + """Test client scopes are configured correctly.""" + assert client_scopes is not None + assert len(client_scopes) > 0 + # Verify scopes contain expected basic permissions + assert all(isinstance(scope, str) for scope in client_scopes) + assert all(len(scope) > 0 for scope in client_scopes) + print(f"āœ… Client scopes configured: {client_scopes}") + + def test_client_id_configuration(self, client_id: str): + """Test client ID is configured correctly.""" + assert client_id is not None + assert len(client_id) > 0 + assert client_id.startswith("bld_") + print(f"āœ… Client ID configured: {client_id[:10]}...") + + def test_client_secret_configuration(self, client_secret: str): + """Test client secret is configured correctly.""" + assert client_secret is not None + assert len(client_secret) > 0 + assert client_secret.startswith("bld_s_") + print(f"āœ… Client secret configured: {client_secret[:10]}...") diff --git a/build_stream/tests/uat/test_00_health.py b/build_stream/tests/uat/test_00_health.py new file mode 100644 index 0000000000..6380664105 --- /dev/null +++ b/build_stream/tests/uat/test_00_health.py @@ -0,0 +1,64 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""UAT tests for health check and basic connectivity.""" + +import httpx +import pytest + + +@pytest.mark.uat +class TestHealthEndpoint: + """Test health check endpoint.""" + + def test_health_endpoint_returns_200(self, http_client: httpx.Client): + """Test health endpoint returns 200 OK.""" + response = http_client.get("/health") + + assert response.status_code == 200 + data = response.json() + assert "status" in data + assert data["status"] == "healthy" + + def test_health_endpoint_is_reachable(self, http_client: httpx.Client): + """Test server is reachable and responding.""" + try: + response = http_client.get("/health") + assert response.status_code == 200 + except httpx.RequestError as e: + pytest.fail(f"Server is not reachable: {e}") + + +@pytest.mark.uat +class TestRootEndpoint: + """Test root endpoint.""" + + def test_root_endpoint_returns_200(self, http_client: httpx.Client): + """Test root endpoint returns 200 OK.""" + response = http_client.get("/") + + assert response.status_code == 200 + data = response.json() + assert "message" in data + assert "docs" in data + assert "version" in data + + def test_root_endpoint_returns_welcome_message(self, http_client: httpx.Client): + """Test root endpoint returns welcome message.""" + response = http_client.get("/") + + assert response.status_code == 200 + data = response.json() + assert "Build Stream" in data["message"] + assert data["docs"] == "/docs" diff --git a/build_stream/tests/uat/test_01_job_lifecycle.py b/build_stream/tests/uat/test_01_job_lifecycle.py new file mode 100644 index 0000000000..080c6d294b --- /dev/null +++ b/build_stream/tests/uat/test_01_job_lifecycle.py @@ -0,0 +1,184 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""UAT tests for complete job lifecycle (create, get, delete).""" + +import uuid +import httpx +import pytest + + +@pytest.mark.uat +class TestJobLifecycle: + """Test complete job lifecycle operations.""" + + def test_create_job_success(self, http_client: httpx.Client, auth_headers_with_ids: dict): + """Test successful job creation.""" + payload = { + "client_id": "uat-lifecycle-client", + "client_name": "UAT Lifecycle Test", + } + + response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + + assert response.status_code == 201 + data = response.json() + assert "job_id" in data + assert "correlation_id" in data + assert "job_state" in data + assert data["job_state"] == "CREATED" + assert "created_at" in data + assert "stages" in data + assert len(data["stages"]) == 6 + + def test_get_job_after_creation(self, http_client: httpx.Client, auth_headers_with_ids: dict): + """Test retrieving job immediately after creation.""" + payload = { + "client_id": "uat-lifecycle-client", + "client_name": "UAT Lifecycle Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + get_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + + assert get_response.status_code == 200 + data = get_response.json() + assert data["job_id"] == job_id + assert "job_state" in data + assert "stages" in data + + def test_get_nonexistent_job_returns_404( + self, http_client: httpx.Client, auth_headers: dict + ): + """Test getting nonexistent job returns 404.""" + nonexistent_id = str(uuid.uuid4()) + + response = http_client.get(f"/api/v1/jobs/{nonexistent_id}", headers=auth_headers) + + assert response.status_code == 404 + data = response.json() + assert "detail" in data + + def test_get_job_with_invalid_uuid_returns_400( + self, http_client: httpx.Client, auth_headers: dict + ): + """Test getting job with invalid UUID format returns 400.""" + invalid_id = "not-a-valid-uuid" + + response = http_client.get(f"/api/v1/jobs/{invalid_id}", headers=auth_headers) + + assert response.status_code == 400 + + @pytest.mark.skip(reason="DELETE endpoint not implemented yet") + def test_delete_job_success(self, http_client: httpx.Client, auth_headers_with_ids: dict): + """Test successful job deletion.""" + payload = { + "client_id": "uat-lifecycle-client", + "client_name": "UAT Lifecycle Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + delete_response = http_client.delete(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + + assert delete_response.status_code == 204 + + @pytest.mark.skip(reason="DELETE endpoint not implemented yet") + def test_get_deleted_job_returns_404(self, http_client: httpx.Client, auth_headers_with_ids: dict): + """Test getting deleted job returns 404.""" + payload = { + "client_id": "uat-lifecycle-client", + "client_name": "UAT Lifecycle Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + delete_response = http_client.delete(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + assert delete_response.status_code == 204 + + get_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + assert get_response.status_code == 404 + + def test_delete_nonexistent_job_returns_404( + self, http_client: httpx.Client, auth_headers: dict + ): + """Test deleting nonexistent job returns 404.""" + nonexistent_id = str(uuid.uuid4()) + + response = http_client.delete(f"/api/v1/jobs/{nonexistent_id}", headers=auth_headers) + + assert response.status_code == 404 + + def test_create_job_without_auth_returns_401(self, http_client: httpx.Client): + """Test job creation without authentication returns 401.""" + payload = { + "client_id": "uat-lifecycle-client", + "client_name": "UAT Lifecycle Test", + } + + response = http_client.post("/api/v1/jobs", json=payload) + + assert response.status_code == 401 + + def test_create_job_with_invalid_token_returns_401(self, http_client: httpx.Client): + """Test job creation with invalid token returns 401.""" + headers = { + "Authorization": "Bearer invalid-token", + "Content-Type": "application/json", + } + payload = { + "client_id": "uat-lifecycle-client", + "client_name": "UAT Lifecycle Test", + } + + response = http_client.post("/api/v1/jobs", json=payload, headers=headers) + + assert response.status_code == 401 + + def test_create_job_with_missing_required_fields_returns_422( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test job creation with missing required fields returns 422.""" + payload = { + "client_name": "UAT Lifecycle Test", + } + + response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + + assert response.status_code == 422 + + def test_job_id_is_valid_uuid_v4(self, http_client: httpx.Client, auth_headers_with_ids: dict): + """Test created job ID is valid UUID v4 format.""" + payload = { + "client_id": "uat-lifecycle-client", + "client_name": "UAT Lifecycle Test", + } + + response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + + assert response.status_code == 201 + job_id = response.json()["job_id"] + + try: + parsed_uuid = uuid.UUID(job_id) + assert parsed_uuid.version == 4 + except (ValueError, AttributeError): + pytest.fail(f"Job ID is not a valid UUID v4: {job_id}") diff --git a/build_stream/tests/uat/test_02_parse_catalog_stage.py b/build_stream/tests/uat/test_02_parse_catalog_stage.py new file mode 100644 index 0000000000..ae23f56f24 --- /dev/null +++ b/build_stream/tests/uat/test_02_parse_catalog_stage.py @@ -0,0 +1,267 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""UAT tests for parse-catalog stage.""" + +import json +import time +import httpx +import pytest + + +@pytest.mark.uat +class TestParseCatalogStageSuccess: + """Test parse-catalog stage success scenarios.""" + + def test_parse_catalog_with_valid_file( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test parse-catalog stage with valid catalog file.""" + payload = { + "client_id": "uat-parse-catalog-client", + "client_name": "UAT Parse Catalog Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + + stage_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + + assert stage_response.status_code in [200, 202] + data = stage_response.json() + assert "status" in data + assert data["status"] == "success" + + def test_parse_catalog_stage_transitions_to_running( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test parse-catalog stage transitions from PENDING to RUNNING.""" + payload = { + "client_id": "uat-parse-catalog-client", + "client_name": "UAT Parse Catalog Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + get_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + assert get_response.status_code == 200 + stages = get_response.json()["stages"] + parse_stage = next(s for s in stages if s["stage_name"] == "parse-catalog") + assert parse_stage["stage_state"] == "PENDING" + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + + stage_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + + assert stage_response.status_code in [200, 202] + data = stage_response.json() + assert "status" in data + assert data["status"] == "success" + + def test_parse_catalog_stage_eventually_completes( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test parse-catalog stage eventually completes successfully.""" + payload = { + "client_id": "uat-parse-catalog-client", + "client_name": "UAT Parse Catalog Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + + stage_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + + assert stage_response.status_code in [200, 202] + + max_attempts = 30 + for _ in range(max_attempts): + time.sleep(2) + get_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + assert get_response.status_code == 200 + stages = get_response.json()["stages"] + parse_stage = next(s for s in stages if s["stage_name"] == "parse-catalog") + + if parse_stage["stage_state"] in ["COMPLETED", "FAILED"]: + assert parse_stage["stage_state"] == "COMPLETED" + assert parse_stage["started_at"] is not None + assert parse_stage["ended_at"] is not None + return + + pytest.fail("parse-catalog stage did not complete within timeout") + + +@pytest.mark.uat +class TestParseCatalogStageFailure: + """Test parse-catalog stage failure scenarios.""" + + def test_parse_catalog_without_file_returns_400( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test parse-catalog without catalog file returns 400.""" + payload = { + "client_id": "uat-parse-catalog-client", + "client_name": "UAT Parse Catalog Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + stage_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + headers=auth_headers_with_ids + ) + + assert stage_response.status_code == 422 + + def test_parse_catalog_with_invalid_json_returns_400( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test parse-catalog with invalid JSON returns 400.""" + payload = { + "client_id": "uat-parse-catalog-client", + "client_name": "UAT Parse Catalog Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + invalid_content = b"{ invalid json content" + files = { + "catalog": ("catalog.json", invalid_content, "application/json") + } + + stage_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + + assert stage_response.status_code in [400, 422] + + + def test_parse_catalog_for_nonexistent_job_returns_404( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test parse-catalog for nonexistent job returns 404.""" + import uuid + nonexistent_job_id = str(uuid.uuid4()) + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + + stage_response = http_client.post( + f"/api/v1/jobs/{nonexistent_job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + + assert stage_response.status_code == 404 + + def test_parse_catalog_without_auth_returns_401( + self, http_client: httpx.Client, real_catalog_content: bytes + ): + """Test parse-catalog without authentication returns 401.""" + import uuid + job_id = str(uuid.uuid4()) + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + + stage_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files + ) + + assert stage_response.status_code == 401 + + def test_parse_catalog_with_wrong_content_type_returns_400( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test parse-catalog with wrong content type returns 400.""" + payload = { + "client_id": "uat-parse-catalog-client", + "client_name": "UAT Parse Catalog Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.txt", b"not a json file", "text/plain") + } + + stage_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + + assert stage_response.status_code in [400, 422] + + def test_parse_catalog_with_empty_file_returns_400( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test parse-catalog with empty file returns 400.""" + payload = { + "client_id": "uat-parse-catalog-client", + "client_name": "UAT Parse Catalog Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "catalog": ("catalog.json", b"", "application/json") + } + + stage_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + + assert stage_response.status_code in [400, 422] diff --git a/build_stream/tests/uat/test_03_generate_input_files_stage.py b/build_stream/tests/uat/test_03_generate_input_files_stage.py new file mode 100644 index 0000000000..0087aa9673 --- /dev/null +++ b/build_stream/tests/uat/test_03_generate_input_files_stage.py @@ -0,0 +1,244 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""UAT tests for generate-input-files stage.""" + +import time +import httpx +import pytest + + +@pytest.mark.uat +class TestGenerateInputFilesStageSuccess: + """Test generate-input-files stage success scenarios.""" + + def test_generate_input_files_after_parse_catalog( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test generate-input-files stage after successful parse-catalog.""" + payload = { + "client_id": "uat-generate-input-client", + "client_name": "UAT Generate Input Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + + assert generate_response.status_code in [200, 202] + data = generate_response.json() + assert "stage_state" in data + assert data["stage_state"] in ["RUNNING", "COMPLETED"] + + def test_generate_input_files_stage_transitions_to_running( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test generate-input-files stage transitions from PENDING to RUNNING.""" + payload = { + "client_id": "uat-generate-input-client", + "client_name": "UAT Generate Input Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + get_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + assert get_response.status_code == 200 + stages = get_response.json()["stages"] + generate_stage = next(s for s in stages if s["stage_name"] == "generate-input-files") + assert generate_stage["stage_state"] == "PENDING" + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + + assert generate_response.status_code in [200, 202] + data = generate_response.json() + assert data["stage_state"] in ["RUNNING", "COMPLETED"] + + def test_generate_input_files_eventually_completes( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test generate-input-files stage eventually completes successfully.""" + payload = { + "client_id": "uat-generate-input-client", + "client_name": "UAT Generate Input Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + + max_attempts = 30 + for _ in range(max_attempts): + time.sleep(2) + get_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + assert get_response.status_code == 200 + stages = get_response.json()["stages"] + generate_stage = next(s for s in stages if s["stage_name"] == "generate-input-files") + + if generate_stage["stage_state"] in ["COMPLETED", "FAILED"]: + assert generate_stage["stage_state"] == "COMPLETED" + assert generate_stage["started_at"] is not None + assert generate_stage["ended_at"] is not None + return + + pytest.fail("generate-input-files stage did not complete within timeout") + + +@pytest.mark.uat +class TestGenerateInputFilesStageFailure: + """Test generate-input-files stage failure scenarios.""" + + def test_generate_input_files_without_parse_catalog_returns_400( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test generate-input-files without parse-catalog returns 400.""" + payload = { + "client_id": "uat-generate-input-client", + "client_name": "UAT Generate Input Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + + assert generate_response.status_code == 412 + + def test_generate_input_files_for_nonexistent_job_returns_404( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test generate-input-files for nonexistent job returns 404.""" + import uuid + nonexistent_job_id = str(uuid.uuid4()) + + generate_response = http_client.post( + f"/api/v1/jobs/{nonexistent_job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + + assert generate_response.status_code == 404 + + def test_generate_input_files_without_auth_returns_401(self, http_client: httpx.Client): + """Test generate-input-files without authentication returns 401.""" + import uuid + job_id = str(uuid.uuid4()) + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files" + ) + + assert generate_response.status_code == 401 + + def test_generate_input_files_with_invalid_job_id_returns_422( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test generate-input-files with invalid job ID format returns 422.""" + invalid_job_id = "not-a-valid-uuid" + + generate_response = http_client.post( + f"/api/v1/jobs/{invalid_job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + + assert generate_response.status_code == 400 + + def test_generate_input_files_twice_returns_409( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test generate-input-files executed twice returns 409.""" + payload = { + "client_id": "uat-generate-input-client", + "client_name": "UAT Generate Input Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + time.sleep(2) + + first_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert first_response.status_code in [200, 202] + + time.sleep(2) + + second_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + + assert second_response.status_code == 409 diff --git a/build_stream/tests/uat/test_04_create_local_repository_stage.py b/build_stream/tests/uat/test_04_create_local_repository_stage.py new file mode 100644 index 0000000000..bfb8786ca7 --- /dev/null +++ b/build_stream/tests/uat/test_04_create_local_repository_stage.py @@ -0,0 +1,282 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""UAT tests for create-local-repository stage.""" + +import time +import httpx +import pytest + + +@pytest.mark.uat +class TestCreateLocalRepositoryStageSuccess: + """Test create-local-repository stage success scenarios.""" + + def test_create_local_repository_after_generate_input_files( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test create-local-repository stage after successful generate-input-files.""" + payload = { + "client_id": "uat-create-repo-client", + "client_name": "UAT Create Repo Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + + + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + + assert repo_response.status_code in [200, 202] + data = repo_response.json() + assert "status" in data + assert data["status"] == "accepted" + + def test_create_local_repository_stage_transitions_to_running( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test create-local-repository stage transitions from PENDING to RUNNING.""" + payload = { + "client_id": "uat-create-repo-client", + "client_name": "UAT Create Repo Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + get_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + assert get_response.status_code == 200 + stages = get_response.json()["stages"] + repo_stage = next(s for s in stages if s["stage_name"] == "create-local-repository") + assert repo_stage["stage_state"] == "PENDING" + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + + + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + + assert repo_response.status_code in [200, 202] + data = repo_response.json() + assert "status" in data + assert data["status"] == "accepted" + + def test_create_local_repository_eventually_completes( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test create-local-repository stage eventually completes successfully.""" + payload = { + "client_id": "uat-create-repo-client", + "client_name": "UAT Create Repo Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + + + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + assert repo_response.status_code in [200, 202] + + max_attempts = 60 + for _ in range(max_attempts): + time.sleep(5) + get_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + assert get_response.status_code == 200 + stages = get_response.json()["stages"] + repo_stage = next(s for s in stages if s["stage_name"] == "create-local-repository") + + if repo_stage["stage_state"] in ["COMPLETED", "FAILED"]: + assert repo_stage["stage_state"] == "COMPLETED" + assert repo_stage["started_at"] is not None + assert repo_stage["ended_at"] is not None + return + + pytest.fail("create-local-repository stage did not complete within timeout") + + +@pytest.mark.uat +class TestCreateLocalRepositoryStageFailure: + """Test create-local-repository stage failure scenarios.""" + + def test_create_local_repository_without_generate_input_files_returns_400( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test create-local-repository without generate-input-files returns 400.""" + payload = { + "client_id": "uat-create-repo-client", + "client_name": "UAT Create Repo Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + + assert repo_response.status_code == 412 + + def test_create_local_repository_for_nonexistent_job_returns_404( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test create-local-repository for nonexistent job returns 404.""" + import uuid + nonexistent_job_id = str(uuid.uuid4()) + + repo_response = http_client.post( + f"/api/v1/jobs/{nonexistent_job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + + assert repo_response.status_code == 404 + + def test_create_local_repository_without_auth_returns_401(self, http_client: httpx.Client): + """Test create-local-repository without authentication returns 401.""" + import uuid + job_id = str(uuid.uuid4()) + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository" + ) + + assert repo_response.status_code == 401 + + def test_create_local_repository_with_invalid_job_id_returns_422( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test create-local-repository with invalid job ID format returns 422.""" + invalid_job_id = "not-a-valid-uuid" + + repo_response = http_client.post( + f"/api/v1/jobs/{invalid_job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + + assert repo_response.status_code == 400 + + def test_create_local_repository_twice_returns_409( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test create-local-repository executed twice returns 409.""" + payload = { + "client_id": "uat-create-repo-client", + "client_name": "UAT Create Repo Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + + + + first_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + assert first_response.status_code in [200, 202] + + time.sleep(2) + + second_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + + assert second_response.status_code == 409 diff --git a/build_stream/tests/uat/test_05_build_image_stage.py b/build_stream/tests/uat/test_05_build_image_stage.py new file mode 100644 index 0000000000..bef182db89 --- /dev/null +++ b/build_stream/tests/uat/test_05_build_image_stage.py @@ -0,0 +1,335 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""UAT tests for build-image stage (both x86_64 and aarch64).""" + +import time +import httpx +import pytest + + +@pytest.mark.uat +class TestBuildImageStageSuccess: + """Test build-image stage success scenarios.""" + + def test_build_image_after_create_local_repository( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test build-image-x86_64 stage after successful create-local-repository.""" + payload = { + "client_id": "uat-build-image-client", + "client_name": "UAT Build Image Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + assert repo_response.status_code in [200, 202] + + time.sleep(10) + + build_request_body = { + "architecture": "x86_64", + "image_key": "test-image-key" + } + build_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image", + json=build_request_body, + headers=auth_headers_with_ids + ) + + assert build_response.status_code in [200, 202] + data = build_response.json() + assert "status" in data + assert data["status"] == "accepted" + + def test_build_image_stage_transitions_to_running( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test build-image stage transitions from PENDING to RUNNING.""" + payload = { + "client_id": "uat-build-image-client", + "client_name": "UAT Build Image Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + get_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + assert get_response.status_code == 200 + stages = get_response.json()["stages"] + build_stage = next(s for s in stages if s["stage_name"] == "build-image-x86_64") + assert build_stage["stage_state"] == "PENDING" + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + + + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + assert repo_response.status_code in [200, 202] + + time.sleep(10) + + build_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image", + headers=auth_headers_with_ids + ) + + assert build_response.status_code in [200, 202] + data = build_response.json() + assert "status" in data + assert data["status"] == "accepted" + + def test_build_image_eventually_completes( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test build-image stage eventually completes successfully.""" + payload = { + "client_id": "uat-build-image-client", + "client_name": "UAT Build Image Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + + + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + assert repo_response.status_code in [200, 202] + + time.sleep(10) + + build_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image", + headers=auth_headers_with_ids + ) + assert build_response.status_code in [200, 202] + + max_attempts = 120 + for _ in range(max_attempts): + time.sleep(6) + get_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + assert get_response.status_code == 200 + stages = get_response.json()["stages"] + build_stage = next(s for s in stages if s["stage_name"] == "build-image-x86_64") + + if build_stage["stage_state"] in ["COMPLETED", "FAILED"]: + assert build_stage["stage_state"] == "COMPLETED" + assert build_stage["started_at"] is not None + assert build_stage["ended_at"] is not None + return + + pytest.fail("build-image-x86_64 stage did not complete within timeout") + + +@pytest.mark.uat +class TestBuildImageStageFailure: + """Test build-image stage failure scenarios.""" + + def test_build_image_without_create_local_repository_returns_400( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test build-image without create-local-repository returns 400.""" + payload = { + "client_id": "uat-build-image-client", + "client_name": "UAT Build Image Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + build_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image-x86_64", + headers=auth_headers_with_ids + ) + + assert build_response.status_code == 404 + + def test_build_image_for_nonexistent_job_returns_404( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test build-image for nonexistent job returns 404.""" + import uuid + nonexistent_job_id = str(uuid.uuid4()) + + build_response = http_client.post( + f"/api/v1/jobs/{nonexistent_job_id}/stages/build-image-x86_64", + headers=auth_headers_with_ids + ) + + assert build_response.status_code == 404 + + def test_build_image_without_auth_returns_401(self, http_client: httpx.Client): + """Test build-image without authentication returns 401.""" + import uuid + job_id = str(uuid.uuid4()) + + build_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image-x86_64" + ) + + assert build_response.status_code == 401 + + def test_build_image_with_invalid_job_id_returns_422( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test build-image with invalid job ID format returns 422.""" + invalid_job_id = "not-a-valid-uuid" + + build_response = http_client.post( + f"/api/v1/jobs/{invalid_job_id}/stages/build-image-x86_64", + headers=auth_headers_with_ids + ) + + assert build_response.status_code == 404 + + def test_build_image_twice_returns_409( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test build-image executed twice returns 409.""" + payload = { + "client_id": "uat-build-image-client", + "client_name": "UAT Build Image Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + + + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + assert repo_response.status_code in [200, 202] + + time.sleep(10) + + first_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image-x86_64", + headers=auth_headers_with_ids + ) + assert first_response.status_code in [200, 202] + + time.sleep(2) + + second_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image-x86_64", + headers=auth_headers_with_ids + ) + + assert second_response.status_code == 409 + + def test_build_image_with_invalid_architecture_returns_404( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test build-image with invalid architecture returns 404.""" + payload = { + "client_id": "uat-build-image-client", + "client_name": "UAT Build Image Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + build_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image-invalid-arch", + headers=auth_headers_with_ids + ) + + assert build_response.status_code == 404 diff --git a/build_stream/tests/uat/test_06_validate_image_on_test_stage.py b/build_stream/tests/uat/test_06_validate_image_on_test_stage.py new file mode 100644 index 0000000000..fe22a7b65a --- /dev/null +++ b/build_stream/tests/uat/test_06_validate_image_on_test_stage.py @@ -0,0 +1,345 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""UAT tests for validate-image-on-test stage.""" + +import time +import httpx +import pytest + + +@pytest.mark.uat +class TestValidateImageOnTestStageSuccess: + """Test validate-image-on-test stage success scenarios.""" + + def test_validate_image_after_build_image( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test validate-image-on-test stage after successful build-image.""" + payload = { + "client_id": "uat-validate-image-client", + "client_name": "UAT Validate Image Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", sample_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + time.sleep(5) + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + + time.sleep(5) + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + assert repo_response.status_code in [200, 202] + + time.sleep(10) + + build_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image-x86_64", + headers=auth_headers_with_ids + ) + assert build_response.status_code in [200, 202] + + time.sleep(15) + + validate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/validate-image-on-test", + headers=auth_headers_with_ids + ) + + assert validate_response.status_code in [200, 202] + data = validate_response.json() + assert "stage_state" in data + assert data["stage_state"] in ["RUNNING", "COMPLETED"] + + def test_validate_image_stage_transitions_to_running( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test validate-image-on-test stage transitions from PENDING to RUNNING.""" + payload = { + "client_id": "uat-validate-image-client", + "client_name": "UAT Validate Image Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + get_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + assert get_response.status_code == 200 + stages = get_response.json()["stages"] + validate_stage = next(s for s in stages if s["stage_name"] == "validate-image-on-test") + assert validate_stage["stage_state"] == "PENDING" + + files = { + "file": ("catalog.json", sample_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + time.sleep(5) + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + + time.sleep(5) + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + assert repo_response.status_code in [200, 202] + + time.sleep(10) + + build_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image-x86_64", + headers=auth_headers_with_ids + ) + assert build_response.status_code in [200, 202] + + time.sleep(15) + + validate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/validate-image-on-test", + headers=auth_headers_with_ids + ) + + assert validate_response.status_code in [200, 202] + data = validate_response.json() + assert data["stage_state"] in ["RUNNING", "COMPLETED"] + + def test_validate_image_eventually_completes( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test validate-image-on-test stage eventually completes successfully.""" + payload = { + "client_id": "uat-validate-image-client", + "client_name": "UAT Validate Image Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", sample_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + time.sleep(5) + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + + time.sleep(5) + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + assert repo_response.status_code in [200, 202] + + time.sleep(10) + + build_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image-x86_64", + headers=auth_headers_with_ids + ) + assert build_response.status_code in [200, 202] + + time.sleep(15) + + validate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/validate-image-on-test", + headers=auth_headers_with_ids + ) + assert validate_response.status_code in [200, 202] + + max_attempts = 60 + for _ in range(max_attempts): + time.sleep(5) + get_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + assert get_response.status_code == 200 + stages = get_response.json()["stages"] + validate_stage = next(s for s in stages if s["stage_name"] == "validate-image-on-test") + + if validate_stage["stage_state"] in ["COMPLETED", "FAILED"]: + assert validate_stage["stage_state"] == "COMPLETED" + assert validate_stage["started_at"] is not None + assert validate_stage["ended_at"] is not None + return + + pytest.fail("validate-image-on-test stage did not complete within timeout") + + +@pytest.mark.uat +class TestValidateImageOnTestStageFailure: + """Test validate-image-on-test stage failure scenarios.""" + + def test_validate_image_without_build_image_returns_400( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test validate-image-on-test without build-image returns 400.""" + payload = { + "client_id": "uat-validate-image-client", + "client_name": "UAT Validate Image Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + validate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/validate-image-on-test", + headers=auth_headers_with_ids + ) + + assert validate_response.status_code == 422 + + def test_validate_image_for_nonexistent_job_returns_404( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test validate-image-on-test for nonexistent job returns 404.""" + import uuid + nonexistent_job_id = str(uuid.uuid4()) + + validate_response = http_client.post( + f"/api/v1/jobs/{nonexistent_job_id}/stages/validate-image-on-test", + headers=auth_headers_with_ids + ) + + assert validate_response.status_code == 404 + + def test_validate_image_without_auth_returns_401(self, http_client: httpx.Client): + """Test validate-image-on-test without authentication returns 401.""" + import uuid + job_id = str(uuid.uuid4()) + + validate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/validate-image-on-test" + ) + + assert validate_response.status_code == 401 + + def test_validate_image_with_invalid_job_id_returns_422( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test validate-image-on-test with invalid job ID format returns 422.""" + invalid_job_id = "not-a-valid-uuid" + + validate_response = http_client.post( + f"/api/v1/jobs/{invalid_job_id}/stages/validate-image-on-test", + headers=auth_headers_with_ids + ) + + assert validate_response.status_code == 422 + + def test_validate_image_twice_returns_409( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test validate-image-on-test executed twice returns 409.""" + payload = { + "client_id": "uat-validate-image-client", + "client_name": "UAT Validate Image Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", sample_catalog_content, "application/json") + } + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + time.sleep(5) + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + + time.sleep(5) + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + assert repo_response.status_code in [200, 202] + + time.sleep(10) + + build_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image-x86_64", + headers=auth_headers_with_ids + ) + assert build_response.status_code in [200, 202] + + time.sleep(15) + + first_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/validate-image-on-test", + headers=auth_headers_with_ids + ) + assert first_response.status_code in [200, 202] + + time.sleep(2) + + second_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/validate-image-on-test", + headers=auth_headers_with_ids + ) + + assert second_response.status_code == 409 diff --git a/build_stream/tests/uat/test_07_catalog_query_endpoints.py b/build_stream/tests/uat/test_07_catalog_query_endpoints.py new file mode 100644 index 0000000000..5f7616ed8a --- /dev/null +++ b/build_stream/tests/uat/test_07_catalog_query_endpoints.py @@ -0,0 +1,260 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""UAT tests for catalog query endpoints.""" + +import json +import time +import httpx +import pytest + + +@pytest.mark.uat +class TestCatalogQuerySuccess: + """Test catalog query endpoints success scenarios.""" + + def test_query_catalog_after_parse( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test querying catalog after successful parse-catalog.""" + payload = { + "client_id": "uat-catalog-query-client", + "client_name": "UAT Catalog Query Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + time.sleep(10) + + query_response = http_client.get( + f"/api/v1/jobs/{job_id}/catalog", + headers=auth_headers_with_ids + ) + + assert query_response.status_code == 200 + data = query_response.json() + assert "metadata" in data or "software" in data or isinstance(data, dict) + + def test_query_catalog_returns_parsed_data( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test catalog query returns parsed catalog data.""" + catalog_data = { + "metadata": { + "name": "query-test-catalog", + "version": "1.0.0", + "description": "Test catalog for query", + }, + "software": [ + { + "name": "test-package", + "version": "1.0.0", + "arch": "x86_64", + "repository": "test-repo", + } + ], + } + catalog_content = json.dumps(catalog_data, indent=2).encode('utf-8') + + payload = { + "client_id": "uat-catalog-query-client", + "client_name": "UAT Catalog Query Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", catalog_content, "application/json") + } + + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + time.sleep(10) + + query_response = http_client.get( + f"/api/v1/jobs/{job_id}/catalog", + headers=auth_headers_with_ids + ) + + assert query_response.status_code == 200 + data = query_response.json() + assert isinstance(data, dict) + + def test_query_catalog_with_filters( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test catalog query with filter parameters.""" + payload = { + "client_id": "uat-catalog-query-client", + "client_name": "UAT Catalog Query Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + time.sleep(10) + + query_response = http_client.get( + f"/api/v1/jobs/{job_id}/catalog?arch=x86_64", + headers=auth_headers_with_ids + ) + + assert query_response.status_code in [200, 404] + + def test_list_all_catalogs( + self, http_client: httpx.Client, auth_headers: dict + ): + """Test listing all catalogs.""" + list_response = http_client.get( + "/api/v1/catalog", + headers=auth_headers + ) + + assert list_response.status_code == 200 + data = list_response.json() + assert isinstance(data, (list, dict)) + + +@pytest.mark.uat +class TestCatalogQueryFailure: + """Test catalog query endpoints failure scenarios.""" + + def test_query_catalog_before_parse_returns_404( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test querying catalog before parse-catalog returns 404.""" + payload = { + "client_id": "uat-catalog-query-client", + "client_name": "UAT Catalog Query Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + query_response = http_client.get( + f"/api/v1/jobs/{job_id}/catalog", + headers=auth_headers_with_ids + ) + + assert query_response.status_code == 404 + + def test_query_catalog_for_nonexistent_job_returns_404( + self, http_client: httpx.Client, auth_headers: dict + ): + """Test querying catalog for nonexistent job returns 404.""" + import uuid + nonexistent_job_id = str(uuid.uuid4()) + + query_response = http_client.get( + f"/api/v1/jobs/{nonexistent_job_id}/catalog", + headers=auth_headers + ) + + assert query_response.status_code == 404 + + def test_query_catalog_without_auth_returns_401(self, http_client: httpx.Client): + """Test querying catalog without authentication returns 401.""" + import uuid + job_id = str(uuid.uuid4()) + + query_response = http_client.get( + f"/api/v1/jobs/{job_id}/catalog" + ) + + assert query_response.status_code == 401 + + def test_query_catalog_with_invalid_job_id_returns_422( + self, http_client: httpx.Client, auth_headers: dict + ): + """Test querying catalog with invalid job ID format returns 422.""" + invalid_job_id = "not-a-valid-uuid" + + query_response = http_client.get( + f"/api/v1/jobs/{invalid_job_id}/catalog", + headers=auth_headers + ) + + assert query_response.status_code == 422 + + def test_query_catalog_with_invalid_filter_returns_400( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test catalog query with invalid filter parameters returns 400.""" + payload = { + "client_id": "uat-catalog-query-client", + "client_name": "UAT Catalog Query Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + time.sleep(10) + + query_response = http_client.get( + f"/api/v1/jobs/{job_id}/catalog?invalid_param=value", + headers=auth_headers_with_ids + ) + + assert query_response.status_code in [200, 400, 422] + + def test_list_catalogs_without_auth_returns_401(self, http_client: httpx.Client): + """Test listing catalogs without authentication returns 401.""" + list_response = http_client.get("/api/v1/catalog") + + assert list_response.status_code == 401 diff --git a/build_stream/tests/uat/test_08_end_to_end_workflow.py b/build_stream/tests/uat/test_08_end_to_end_workflow.py new file mode 100644 index 0000000000..ec38250794 --- /dev/null +++ b/build_stream/tests/uat/test_08_end_to_end_workflow.py @@ -0,0 +1,398 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""UAT tests for end-to-end workflow (sequential stage execution).""" + +import json +import time +import httpx +import pytest + + +@pytest.mark.uat +@pytest.mark.slow +class TestEndToEndWorkflow: + """Test complete end-to-end workflow execution.""" + + def test_complete_workflow_sequential_execution( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test complete workflow from job creation to final validation.""" + catalog_data = { + "metadata": { + "name": "e2e-test-catalog", + "version": "1.0.0", + "description": "End-to-end test catalog", + }, + "software": [ + { + "name": "test-package", + "version": "1.0.0", + "arch": "x86_64", + "repository": "test-repo", + } + ], + } + catalog_content = json.dumps(catalog_data, indent=2).encode('utf-8') + + payload = { + "client_id": "uat-e2e-client", + "client_name": "UAT E2E Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + print(f"\nāœ“ Job created: {job_id}") + + files = { + "file": ("catalog.json", catalog_content, "application/json") + } + + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + print("āœ“ Parse-catalog stage initiated") + + self._wait_for_stage_completion(http_client, job_id, "parse-catalog", auth_headers_with_ids, max_wait=60) + print("āœ“ Parse-catalog stage completed") + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + print("āœ“ Generate-input-files stage initiated") + + self._wait_for_stage_completion(http_client, job_id, "generate-input-files", auth_headers_with_ids, max_wait=60) + print("āœ“ Generate-input-files stage completed") + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + assert repo_response.status_code in [200, 202] + print("āœ“ Create-local-repository stage initiated") + + self._wait_for_stage_completion(http_client, job_id, "create-local-repository", auth_headers_with_ids, max_wait=300) + print("āœ“ Create-local-repository stage completed") + + build_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image-x86_64", + headers=auth_headers_with_ids + ) + assert build_response.status_code in [200, 202] + print("āœ“ Build-image-x86_64 stage initiated") + + self._wait_for_stage_completion(http_client, job_id, "build-image-x86_64", auth_headers_with_ids, max_wait=600) + print("āœ“ Build-image-x86_64 stage completed") + + validate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/validate-image-on-test", + headers=auth_headers_with_ids + ) + assert validate_response.status_code in [200, 202] + print("āœ“ Validate-image-on-test stage initiated") + + self._wait_for_stage_completion(http_client, job_id, "validate-image-on-test", auth_headers_with_ids, max_wait=300) + print("āœ“ Validate-image-on-test stage completed") + + final_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + assert final_response.status_code == 200 + final_data = final_response.json() + + assert final_data["job_state"] in ["COMPLETED", "RUNNING"] + print(f"āœ“ Final job state: {final_data['job_state']}") + + stages = final_data["stages"] + completed_stages = [s for s in stages if s["stage_state"] == "COMPLETED"] + assert len(completed_stages) >= 5 + print(f"āœ“ Completed stages: {len(completed_stages)}/6") + + def test_workflow_with_both_architectures( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test workflow executing both x86_64 and aarch64 build stages.""" + catalog_data = { + "metadata": { + "name": "multi-arch-catalog", + "version": "1.0.0", + "description": "Multi-architecture test catalog", + }, + "software": [ + { + "name": "test-package-x86", + "version": "1.0.0", + "arch": "x86_64", + "repository": "test-repo", + }, + { + "name": "test-package-arm", + "version": "1.0.0", + "arch": "aarch64", + "repository": "test-repo", + } + ], + } + catalog_content = json.dumps(catalog_data, indent=2).encode('utf-8') + + payload = { + "client_id": "uat-e2e-multi-arch-client", + "client_name": "UAT E2E Multi-Arch Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + print(f"\nāœ“ Job created: {job_id}") + + files = { + "file": ("catalog.json", catalog_content, "application/json") + } + + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + self._wait_for_stage_completion(http_client, job_id, "parse-catalog", auth_headers_with_ids, max_wait=60) + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code in [200, 202] + + self._wait_for_stage_completion(http_client, job_id, "generate-input-files", auth_headers_with_ids, max_wait=60) + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + assert repo_response.status_code in [200, 202] + + self._wait_for_stage_completion(http_client, job_id, "create-local-repository", auth_headers_with_ids, max_wait=300) + + build_x86_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image-x86_64", + headers=auth_headers_with_ids + ) + assert build_x86_response.status_code in [200, 202] + print("āœ“ Build-image-x86_64 stage initiated") + + build_arm_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image-aarch64", + headers=auth_headers_with_ids + ) + assert build_arm_response.status_code in [200, 202] + print("āœ“ Build-image-aarch64 stage initiated") + + self._wait_for_stage_completion(http_client, job_id, "build-image-x86_64", auth_headers_with_ids, max_wait=600) + print("āœ“ Build-image-x86_64 stage completed") + + self._wait_for_stage_completion(http_client, job_id, "build-image-aarch64", auth_headers_with_ids, max_wait=600) + print("āœ“ Build-image-aarch64 stage completed") + + def test_workflow_stage_dependencies( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test that stages enforce proper dependencies.""" + payload = { + "client_id": "uat-e2e-dependencies-client", + "client_name": "UAT E2E Dependencies Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + generate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/generate-input-files", + headers=auth_headers_with_ids + ) + assert generate_response.status_code == 412 + print("āœ“ Generate-input-files correctly rejected without parse-catalog") + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + assert repo_response.status_code == 412 + print("āœ“ Create-local-repository correctly rejected without generate-input-files") + + build_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/build-image-x86_64", + headers=auth_headers_with_ids + ) + assert build_response.status_code == 404 + print("āœ“ Build-image correctly rejected without create-local-repository") + + validate_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/validate-image-on-test", + headers=auth_headers_with_ids + ) + assert validate_response.status_code == 422 + print("āœ“ Validate-image correctly rejected without build-image") + + def test_workflow_with_catalog_query( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test workflow with catalog query after parse stage.""" + catalog_data = { + "metadata": { + "name": "query-workflow-catalog", + "version": "1.0.0", + "description": "Catalog for query workflow test", + }, + "software": [ + { + "name": "queryable-package", + "version": "1.0.0", + "arch": "x86_64", + "repository": "test-repo", + } + ], + } + catalog_content = json.dumps(catalog_data, indent=2).encode('utf-8') + + payload = { + "client_id": "uat-e2e-query-client", + "client_name": "UAT E2E Query Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", catalog_content, "application/json") + } + + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + self._wait_for_stage_completion(http_client, job_id, "parse-catalog", auth_headers_with_ids, max_wait=60) + + query_response = http_client.get( + f"/api/v1/jobs/{job_id}/catalog", + headers=auth_headers_with_ids + ) + assert query_response.status_code == 200 + print("āœ“ Catalog query successful after parse-catalog") + + query_data = query_response.json() + assert isinstance(query_data, dict) + print(f"āœ“ Catalog data retrieved: {len(str(query_data))} bytes") + + def _wait_for_stage_completion( + self, http_client: httpx.Client, job_id: str, stage_name: str, + auth_headers: dict, max_wait: int = 60 + ): + """Wait for a stage to complete with timeout.""" + attempts = max_wait // 5 + for attempt in range(attempts): + time.sleep(5) + get_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers) + if get_response.status_code != 200: + continue + + stages = get_response.json()["stages"] + stage = next((s for s in stages if s["stage_name"] == stage_name), None) + + if stage and stage["stage_state"] == "COMPLETED": + return + + if stage and stage["stage_state"] == "FAILED": + pytest.fail(f"Stage {stage_name} failed") + + if attempt % 6 == 0: + print(f" Waiting for {stage_name}... ({attempt * 5}s elapsed)") + + pytest.fail(f"Stage {stage_name} did not complete within {max_wait}s") + + +@pytest.mark.uat +@pytest.mark.slow +class TestEndToEndWorkflowFailures: + """Test end-to-end workflow failure scenarios.""" + + def test_workflow_fails_with_invalid_catalog( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test workflow fails gracefully with invalid catalog.""" + invalid_catalog = b"{ invalid json" + + payload = { + "client_id": "uat-e2e-failure-client", + "client_name": "UAT E2E Failure Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", invalid_catalog, "application/json") + } + + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + + assert parse_response.status_code in [400, 422] + print("āœ“ Workflow correctly rejected invalid catalog") + + def test_workflow_cannot_skip_stages( + self, http_client: httpx.Client, auth_headers_with_ids: dict, real_catalog_content: bytes + ): + """Test workflow enforces sequential stage execution.""" + payload = { + "client_id": "uat-e2e-skip-client", + "client_name": "UAT E2E Skip Test", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + files = { + "file": ("catalog.json", real_catalog_content, "application/json") + } + + parse_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/parse-catalog", + files=files, + headers={"Authorization": auth_headers_with_ids["Authorization"]} + ) + assert parse_response.status_code in [200, 202] + + time.sleep(10) + + repo_response = http_client.post( + f"/api/v1/jobs/{job_id}/stages/create-local-repository", + headers=auth_headers_with_ids + ) + assert repo_response.status_code in [400, 409] + print("āœ“ Cannot skip generate-input-files stage") diff --git a/build_stream/tests/uat/test_cross_api_errors.py b/build_stream/tests/uat/test_cross_api_errors.py new file mode 100644 index 0000000000..8cf63bf1b0 --- /dev/null +++ b/build_stream/tests/uat/test_cross_api_errors.py @@ -0,0 +1,133 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""UAT tests for cross-API error scenarios. + +These tests validate consistent error handling across all API endpoints. +""" + +import httpx +import pytest + + +@pytest.mark.uat +class TestCrossAPIErrors: + """Test common error scenarios across all APIs.""" + + @pytest.mark.parametrize("endpoint,method", [ + ("/api/v1/jobs/00000000-0000-0000-0000-000000000000", "GET"), + ("/api/v1/jobs/00000000-0000-0000-0000-000000000000", "DELETE"), + ("/api/v1/jobs/00000000-0000-0000-0000-000000000000/stages/parse-catalog", "POST"), + ("/api/v1/jobs/00000000-0000-0000-0000-000000000000/stages/generate-input-files", "POST"), + ("/api/v1/jobs/00000000-0000-0000-0000-000000000000/stages/create-local-repository", "POST"), + ("/api/v1/jobs/00000000-0000-0000-0000-000000000000/stages/build-image", "POST"), + ("/api/v1/jobs/00000000-0000-0000-0000-000000000000/stages/validate-image-on-test", "POST"), + ("/api/v1/jobs/00000000-0000-0000-0000-000000000000/catalog/roles", "GET"), + ]) + def test_all_apis_with_invalid_job_returns_404( + self, http_client: httpx.Client, auth_headers: dict, endpoint: str, method: str + ): + """Test all APIs return 404 for nonexistent job ID. + + Note: Some endpoints validate request body before checking job existence, + so they may return 422 (validation error) instead of 404. + """ + if method == "GET": + response = http_client.get(endpoint, headers=auth_headers) + assert response.status_code == 404 + elif method == "DELETE": + response = http_client.delete(endpoint, headers=auth_headers) + assert response.status_code == 404 + else: # POST + # Parse-catalog requires file upload, not JSON + if "/stages/parse-catalog" in endpoint: + files = {"file": ("test.json", b"{}", "application/json")} + response = http_client.post(endpoint, headers=auth_headers, files=files) + else: + response = http_client.post(endpoint, headers=auth_headers, json={}) + + # Stage endpoints may validate request body first (422) or check job existence first (404) + assert response.status_code in [404, 422] + + @pytest.mark.parametrize("endpoint,method", [ + ("/api/v1/jobs", "POST"), + ("/api/v1/jobs/test-id", "GET"), + ("/api/v1/jobs/test-id", "DELETE"), + ("/api/v1/jobs/test-id/stages/parse-catalog", "POST"), + ("/api/v1/jobs/test-id/stages/generate-input-files", "POST"), + ("/api/v1/jobs/test-id/stages/create-local-repository", "POST"), + ("/api/v1/jobs/test-id/stages/build-image", "POST"), + ("/api/v1/jobs/test-id/stages/validate-image-on-test", "POST"), + ("/api/v1/jobs/test-id/catalog/roles", "GET"), + ]) + def test_all_apis_without_authentication_returns_401( + self, http_client: httpx.Client, endpoint: str, method: str + ): + """Test all APIs return 401 without authentication.""" + if method == "GET": + response = http_client.get(endpoint) + elif method == "DELETE": + response = http_client.delete(endpoint) + else: # POST + # Parse-catalog requires file upload, not JSON + if "/stages/parse-catalog" in endpoint: + files = {"file": ("test.json", b"{}", "application/json")} + response = http_client.post(endpoint, files=files) + else: + response = http_client.post(endpoint, json={}) + + assert response.status_code == 401 + + @pytest.mark.parametrize("endpoint,method", [ + ("/api/v1/jobs", "POST"), + ("/api/v1/jobs/test-id", "GET"), + ("/api/v1/jobs/test-id/stages/parse-catalog", "POST"), + ]) + def test_all_apis_with_invalid_token_returns_401( + self, http_client: httpx.Client, endpoint: str, method: str + ): + """Test all APIs return 401 with invalid token.""" + headers = { + "Authorization": "Bearer invalid-token-12345", + "Content-Type": "application/json", + } + + if method == "GET": + response = http_client.get(endpoint, headers=headers) + elif method == "DELETE": + response = http_client.delete(endpoint, headers=headers) + else: # POST + # Parse-catalog requires file upload, not JSON + if "/stages/parse-catalog" in endpoint: + files = {"file": ("test.json", b"{}", "application/json")} + response = http_client.post(endpoint, files=files, headers=headers) + else: + response = http_client.post(endpoint, json={}, headers=headers) + + assert response.status_code == 401 + + @pytest.mark.parametrize("endpoint", [ + "/api/v1/jobs/invalid-uuid-format", + "/api/v1/jobs/invalid-uuid-format/stages/parse-catalog", + "/api/v1/jobs/not-a-uuid/catalog/roles", + ]) + def test_all_apis_with_invalid_job_id_format_returns_400( + self, http_client: httpx.Client, auth_headers: dict, endpoint: str + ): + """Test all APIs return 400 for invalid job ID format.""" + # Try GET first (works for most endpoints) + response = http_client.get(endpoint, headers=auth_headers) + + # 400 for invalid format, 404 if validates format first, 405 if method not allowed, 422 for validation + assert response.status_code in [400, 404, 405, 422] diff --git a/build_stream/tests/uat/test_jobs_api.py b/build_stream/tests/uat/test_jobs_api.py new file mode 100644 index 0000000000..407f99bf9a --- /dev/null +++ b/build_stream/tests/uat/test_jobs_api.py @@ -0,0 +1,202 @@ +# Copyright 2026 Dell Inc. or its subsidiaries. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""UAT tests for Jobs API.""" + +import uuid +import httpx +import pytest + + +@pytest.mark.uat +class TestCreateJob: + """Test job creation endpoint.""" + + def test_create_job_returns_201(self, http_client: httpx.Client, auth_headers_with_ids: dict): + """Test job creation returns 201 with valid request.""" + payload = { + "client_id": "uat-test-client", + "client_name": "UAT Test Client", + } + + response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + + assert response.status_code == 201 + data = response.json() + assert "job_id" in data + assert "correlation_id" in data + assert "job_state" in data + assert "created_at" in data + assert "stages" in data + + def test_create_job_returns_valid_job_id(self, http_client: httpx.Client, auth_headers_with_ids: dict): + """Test created job has valid UUID job_id.""" + payload = { + "client_id": "uat-test-client", + "client_name": "UAT Test Client", + } + + response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + + assert response.status_code == 201 + job_id = response.json()["job_id"] + + # Validate UUID format + try: + uuid.UUID(job_id) + except ValueError: + pytest.fail(f"Invalid UUID format: {job_id}") + + def test_create_job_creates_all_stages(self, http_client: httpx.Client, auth_headers_with_ids: dict): + """Test job creation creates all expected stages.""" + payload = { + "client_id": "uat-test-client", + "client_name": "UAT Test Client", + } + + response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + + assert response.status_code == 201 + stages = response.json()["stages"] + assert len(stages) == 6 + + expected_stages = { + "build-image-aarch64", + "build-image-x86_64", + "create-local-repository", + "generate-input-files", + "parse-catalog", + "validate-image-on-test", + } + + # Check all expected stages are present (order doesn't matter) + stage_names = {s["stage_name"] for s in stages} + assert stage_names == expected_stages + + # Verify all stages are in PENDING state initially + for stage in stages: + assert stage["stage_state"] == "PENDING" + assert stage["started_at"] is None + assert stage["ended_at"] is None + + def test_create_job_with_missing_client_id_returns_400( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test job creation without client_id returns 400.""" + payload = { + "client_name": "UAT Test Client", + } + + response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + + assert response.status_code == 422 # FastAPI validation error + + @pytest.mark.skip(reason="Idempotency not implemented in API yet - returns 201 instead of 200") + def test_idempotency_key_prevents_duplicate_jobs( + self, http_client: httpx.Client, auth_headers: dict, unique_idempotency_key: str, + unique_correlation_id: str + ): + """Test idempotency key prevents duplicate job creation.""" + headers = { + **auth_headers, + "Idempotency-Key": unique_idempotency_key, + "X-Correlation-Id": unique_correlation_id, + } + + payload = { + "client_id": "uat-test-client", + "client_name": "UAT Test Client", + } + + # First request + response1 = http_client.post("/api/v1/jobs", json=payload, headers=headers) + assert response1.status_code == 201 + job_id1 = response1.json()["job_id"] + + # Second request with same idempotency key + response2 = http_client.post("/api/v1/jobs", json=payload, headers=headers) + assert response2.status_code == 200 # Returns existing job + job_id2 = response2.json()["job_id"] + + # Should return same job + assert job_id1 == job_id2 + + +@pytest.mark.uat +class TestGetJob: + """Test get job endpoint.""" + + def test_get_job_returns_200_with_valid_id( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test getting job with valid ID returns 200.""" + # First create a job + payload = { + "client_id": "uat-test-client", + "client_name": "UAT Test Client", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + # Then get the job + get_response = http_client.get(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + + assert get_response.status_code == 200 + data = get_response.json() + assert data["job_id"] == job_id + assert "job_state" in data + assert "stages" in data + + def test_get_job_returns_404_with_invalid_id( + self, http_client: httpx.Client, auth_headers: dict, invalid_job_id: str + ): + """Test getting job with invalid ID returns 404.""" + response = http_client.get(f"/api/v1/jobs/{invalid_job_id}", headers=auth_headers) + + assert response.status_code == 404 + + +@pytest.mark.uat +class TestDeleteJob: + """Test delete job endpoint.""" + + @pytest.mark.skip(reason="Not implemented") + def test_delete_job_returns_204( + self, http_client: httpx.Client, auth_headers_with_ids: dict + ): + """Test deleting job returns 204.""" + # First create a job + payload = { + "client_id": "uat-test-client", + "client_name": "UAT Test Client", + } + + create_response = http_client.post("/api/v1/jobs", json=payload, headers=auth_headers_with_ids) + assert create_response.status_code == 201 + job_id = create_response.json()["job_id"] + + # Then delete the job + delete_response = http_client.delete(f"/api/v1/jobs/{job_id}", headers=auth_headers_with_ids) + + assert delete_response.status_code == 204 + + def test_delete_nonexistent_job_returns_404( + self, http_client: httpx.Client, auth_headers: dict, invalid_job_id: str + ): + """Test deleting nonexistent job returns 404.""" + response = http_client.delete(f"/api/v1/jobs/{invalid_job_id}", headers=auth_headers) + + assert response.status_code == 404