mirror of
https://github.com/comfyanonymous/ComfyUI.git
synced 2025-06-03 10:02:09 +08:00
rework client.py to be more robust, add logging of api requests (#7988)
* rework how errors are handled on the client side * add logging to /temp * fix ruff * fix rebase, stupid vscode gui
This commit is contained in:
parent
4a9014e201
commit
bab836d88d
@ -94,15 +94,18 @@ from __future__ import annotations
|
||||
import logging
|
||||
import time
|
||||
import io
|
||||
from typing import Dict, Type, Optional, Any, TypeVar, Generic, Callable
|
||||
import socket
|
||||
from typing import Dict, Type, Optional, Any, TypeVar, Generic, Callable, Tuple
|
||||
from enum import Enum
|
||||
import json
|
||||
import requests
|
||||
from urllib.parse import urljoin
|
||||
from urllib.parse import urljoin, urlparse
|
||||
from pydantic import BaseModel, Field
|
||||
import uuid # For generating unique operation IDs
|
||||
|
||||
from comfy.cli_args import args
|
||||
from comfy import utils
|
||||
from . import request_logger
|
||||
|
||||
T = TypeVar("T", bound=BaseModel)
|
||||
R = TypeVar("R", bound=BaseModel)
|
||||
@ -111,6 +114,21 @@ P = TypeVar("P", bound=BaseModel) # For poll response
|
||||
PROGRESS_BAR_MAX = 100
|
||||
|
||||
|
||||
class NetworkError(Exception):
|
||||
"""Base exception for network-related errors with diagnostic information."""
|
||||
pass
|
||||
|
||||
|
||||
class LocalNetworkError(NetworkError):
|
||||
"""Exception raised when local network connectivity issues are detected."""
|
||||
pass
|
||||
|
||||
|
||||
class ApiServerError(NetworkError):
|
||||
"""Exception raised when the API server is unreachable but internet is working."""
|
||||
pass
|
||||
|
||||
|
||||
class EmptyRequest(BaseModel):
|
||||
"""Base class for empty request bodies.
|
||||
For GET requests, fields will be sent as query parameters."""
|
||||
@ -141,7 +159,7 @@ class HttpMethod(str, Enum):
|
||||
|
||||
class ApiClient:
|
||||
"""
|
||||
Client for making HTTP requests to an API with authentication and error handling.
|
||||
Client for making HTTP requests to an API with authentication, error handling, and retry logic.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@ -151,12 +169,26 @@ class ApiClient:
|
||||
comfy_api_key: Optional[str] = None,
|
||||
timeout: float = 3600.0,
|
||||
verify_ssl: bool = True,
|
||||
max_retries: int = 3,
|
||||
retry_delay: float = 1.0,
|
||||
retry_backoff_factor: float = 2.0,
|
||||
retry_status_codes: Optional[Tuple[int, ...]] = None,
|
||||
):
|
||||
self.base_url = base_url
|
||||
self.auth_token = auth_token
|
||||
self.comfy_api_key = comfy_api_key
|
||||
self.timeout = timeout
|
||||
self.verify_ssl = verify_ssl
|
||||
self.max_retries = max_retries
|
||||
self.retry_delay = retry_delay
|
||||
self.retry_backoff_factor = retry_backoff_factor
|
||||
# Default retry status codes: 408 (Request Timeout), 429 (Too Many Requests),
|
||||
# 500, 502, 503, 504 (Server Errors)
|
||||
self.retry_status_codes = retry_status_codes or (408, 429, 500, 502, 503, 504)
|
||||
|
||||
def _generate_operation_id(self, path: str) -> str:
|
||||
"""Generates a unique operation ID for logging."""
|
||||
return f"{path.strip('/').replace('/', '_')}_{uuid.uuid4().hex[:8]}"
|
||||
|
||||
def _create_json_payload_args(
|
||||
self,
|
||||
@ -211,6 +243,56 @@ class ApiClient:
|
||||
|
||||
return headers
|
||||
|
||||
def _check_connectivity(self, target_url: str) -> Dict[str, bool]:
|
||||
"""
|
||||
Check connectivity to determine if network issues are local or server-related.
|
||||
|
||||
Args:
|
||||
target_url: URL to check connectivity to
|
||||
|
||||
Returns:
|
||||
Dictionary with connectivity status details
|
||||
"""
|
||||
results = {
|
||||
"internet_accessible": False,
|
||||
"api_accessible": False,
|
||||
"is_local_issue": False,
|
||||
"is_api_issue": False
|
||||
}
|
||||
|
||||
# First check basic internet connectivity using a reliable external site
|
||||
try:
|
||||
# Use a reliable external domain for checking basic connectivity
|
||||
check_response = requests.get("https://www.google.com",
|
||||
timeout=5.0,
|
||||
verify=self.verify_ssl)
|
||||
if check_response.status_code < 500:
|
||||
results["internet_accessible"] = True
|
||||
except (requests.RequestException, socket.error):
|
||||
results["internet_accessible"] = False
|
||||
results["is_local_issue"] = True
|
||||
return results
|
||||
|
||||
# Now check API server connectivity
|
||||
try:
|
||||
# Extract domain from the target URL to do a simpler health check
|
||||
parsed_url = urlparse(target_url)
|
||||
api_base = f"{parsed_url.scheme}://{parsed_url.netloc}"
|
||||
|
||||
# Try to reach the API domain
|
||||
api_response = requests.get(f"{api_base}/health", timeout=5.0, verify=self.verify_ssl)
|
||||
if api_response.status_code < 500:
|
||||
results["api_accessible"] = True
|
||||
else:
|
||||
results["api_accessible"] = False
|
||||
results["is_api_issue"] = True
|
||||
except requests.RequestException:
|
||||
results["api_accessible"] = False
|
||||
# If we can reach the internet but not the API, it's an API issue
|
||||
results["is_api_issue"] = True
|
||||
|
||||
return results
|
||||
|
||||
def request(
|
||||
self,
|
||||
method: str,
|
||||
@ -221,9 +303,10 @@ class ApiClient:
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
content_type: str = "application/json",
|
||||
multipart_parser: Callable = None,
|
||||
retry_count: int = 0, # Used internally for tracking retries
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Make an HTTP request to the API
|
||||
Make an HTTP request to the API with automatic retries for transient errors.
|
||||
|
||||
Args:
|
||||
method: HTTP method (GET, POST, etc.)
|
||||
@ -233,12 +316,15 @@ class ApiClient:
|
||||
files: Files to upload
|
||||
headers: Additional headers
|
||||
content_type: Content type of the request. Defaults to application/json.
|
||||
retry_count: Internal parameter for tracking retries, do not set manually
|
||||
|
||||
Returns:
|
||||
Parsed JSON response
|
||||
|
||||
Raises:
|
||||
requests.RequestException: If the request fails
|
||||
LocalNetworkError: If local network connectivity issues are detected
|
||||
ApiServerError: If the API server is unreachable but internet is working
|
||||
Exception: For other request failures
|
||||
"""
|
||||
url = urljoin(self.base_url, path)
|
||||
self.check_auth(self.auth_token, self.comfy_api_key)
|
||||
@ -265,6 +351,16 @@ class ApiClient:
|
||||
else:
|
||||
payload_args = self._create_json_payload_args(data, request_headers)
|
||||
|
||||
operation_id = self._generate_operation_id(path)
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method=method,
|
||||
request_url=url,
|
||||
request_headers=request_headers,
|
||||
request_params=params,
|
||||
request_data=data if content_type == "application/json" else "[form-data or other]"
|
||||
)
|
||||
|
||||
try:
|
||||
response = requests.request(
|
||||
method=method,
|
||||
@ -275,50 +371,228 @@ class ApiClient:
|
||||
**payload_args,
|
||||
)
|
||||
|
||||
# Check if we should retry based on status code
|
||||
if (response.status_code in self.retry_status_codes and
|
||||
retry_count < self.max_retries):
|
||||
|
||||
# Calculate delay with exponential backoff
|
||||
delay = self.retry_delay * (self.retry_backoff_factor ** retry_count)
|
||||
|
||||
logging.warning(
|
||||
f"Request failed with status {response.status_code}. "
|
||||
f"Retrying in {delay:.2f}s ({retry_count + 1}/{self.max_retries})"
|
||||
)
|
||||
|
||||
time.sleep(delay)
|
||||
return self.request(
|
||||
method=method,
|
||||
path=path,
|
||||
params=params,
|
||||
data=data,
|
||||
files=files,
|
||||
headers=headers,
|
||||
content_type=content_type,
|
||||
multipart_parser=multipart_parser,
|
||||
retry_count=retry_count + 1,
|
||||
)
|
||||
|
||||
# Raise exception for error status codes
|
||||
response.raise_for_status()
|
||||
except requests.ConnectionError:
|
||||
raise Exception(
|
||||
f"Unable to connect to the API server at {self.base_url}. Please check your internet connection or verify the service is available."
|
||||
|
||||
# Log successful response
|
||||
response_content_to_log = response.content
|
||||
try:
|
||||
# Attempt to parse JSON for prettier logging, fallback to raw content
|
||||
response_content_to_log = response.json()
|
||||
except json.JSONDecodeError:
|
||||
pass # Keep as bytes/str if not JSON
|
||||
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method=method, # Pass request details again for context in log
|
||||
request_url=url,
|
||||
response_status_code=response.status_code,
|
||||
response_headers=dict(response.headers),
|
||||
response_content=response_content_to_log
|
||||
)
|
||||
|
||||
except requests.Timeout:
|
||||
raise Exception(
|
||||
f"Request timed out after {self.timeout} seconds. The server might be experiencing high load or the operation is taking longer than expected."
|
||||
except requests.ConnectionError as e:
|
||||
error_message = f"ConnectionError: {str(e)}"
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method=method,
|
||||
request_url=url,
|
||||
error_message=error_message
|
||||
)
|
||||
# Only perform connectivity check if we've exhausted all retries
|
||||
if retry_count >= self.max_retries:
|
||||
# Check connectivity to determine if it's a local or API issue
|
||||
connectivity = self._check_connectivity(self.base_url)
|
||||
|
||||
if connectivity["is_local_issue"]:
|
||||
raise LocalNetworkError(
|
||||
"Unable to connect to the API server due to local network issues. "
|
||||
"Please check your internet connection and try again."
|
||||
) from e
|
||||
elif connectivity["is_api_issue"]:
|
||||
raise ApiServerError(
|
||||
f"The API server at {self.base_url} is currently unreachable. "
|
||||
f"The service may be experiencing issues. Please try again later."
|
||||
) from e
|
||||
|
||||
# If we haven't exhausted retries yet, retry the request
|
||||
if retry_count < self.max_retries:
|
||||
delay = self.retry_delay * (self.retry_backoff_factor ** retry_count)
|
||||
logging.warning(
|
||||
f"Connection error: {str(e)}. "
|
||||
f"Retrying in {delay:.2f}s ({retry_count + 1}/{self.max_retries})"
|
||||
)
|
||||
time.sleep(delay)
|
||||
return self.request(
|
||||
method=method,
|
||||
path=path,
|
||||
params=params,
|
||||
data=data,
|
||||
files=files,
|
||||
headers=headers,
|
||||
content_type=content_type,
|
||||
multipart_parser=multipart_parser,
|
||||
retry_count=retry_count + 1,
|
||||
)
|
||||
|
||||
# If we've exhausted retries and didn't identify the specific issue,
|
||||
# raise a generic exception
|
||||
final_error_message = (
|
||||
f"Unable to connect to the API server after {self.max_retries} attempts. "
|
||||
f"Please check your internet connection or try again later."
|
||||
)
|
||||
request_logger.log_request_response( # Log final failure
|
||||
operation_id=operation_id,
|
||||
request_method=method, request_url=url,
|
||||
error_message=final_error_message
|
||||
)
|
||||
raise Exception(final_error_message) from e
|
||||
|
||||
except requests.Timeout as e:
|
||||
error_message = f"Timeout: {str(e)}"
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method=method, request_url=url,
|
||||
error_message=error_message
|
||||
)
|
||||
# Retry timeouts if we haven't exhausted retries
|
||||
if retry_count < self.max_retries:
|
||||
delay = self.retry_delay * (self.retry_backoff_factor ** retry_count)
|
||||
logging.warning(
|
||||
f"Request timed out. "
|
||||
f"Retrying in {delay:.2f}s ({retry_count + 1}/{self.max_retries})"
|
||||
)
|
||||
time.sleep(delay)
|
||||
return self.request(
|
||||
method=method,
|
||||
path=path,
|
||||
params=params,
|
||||
data=data,
|
||||
files=files,
|
||||
headers=headers,
|
||||
content_type=content_type,
|
||||
multipart_parser=multipart_parser,
|
||||
retry_count=retry_count + 1,
|
||||
)
|
||||
final_error_message = (
|
||||
f"Request timed out after {self.timeout} seconds and {self.max_retries} retry attempts. "
|
||||
f"The server might be experiencing high load or the operation is taking longer than expected."
|
||||
)
|
||||
request_logger.log_request_response( # Log final failure
|
||||
operation_id=operation_id,
|
||||
request_method=method, request_url=url,
|
||||
error_message=final_error_message
|
||||
)
|
||||
raise Exception(final_error_message) from e
|
||||
|
||||
except requests.HTTPError as e:
|
||||
status_code = e.response.status_code if hasattr(e, "response") else None
|
||||
error_message = f"HTTP Error: {str(e)}"
|
||||
original_error_message = f"HTTP Error: {str(e)}"
|
||||
error_content_for_log = None
|
||||
if hasattr(e, "response") and e.response is not None:
|
||||
error_content_for_log = e.response.content
|
||||
try:
|
||||
error_content_for_log = e.response.json()
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
|
||||
# Try to extract detailed error message from JSON response for user display
|
||||
# but log the full error content.
|
||||
user_display_error_message = original_error_message
|
||||
|
||||
# Try to extract detailed error message from JSON response
|
||||
try:
|
||||
if hasattr(e, "response") and e.response.content:
|
||||
if hasattr(e, "response") and e.response is not None and e.response.content:
|
||||
error_json = e.response.json()
|
||||
if "error" in error_json and "message" in error_json["error"]:
|
||||
error_message = f"API Error: {error_json['error']['message']}"
|
||||
user_display_error_message = f"API Error: {error_json['error']['message']}"
|
||||
if "type" in error_json["error"]:
|
||||
error_message += f" (Type: {error_json['error']['type']})"
|
||||
user_display_error_message += f" (Type: {error_json['error']['type']})"
|
||||
elif isinstance(error_json, dict): # Handle cases where error is just a JSON dict
|
||||
user_display_error_message = f"API Error: {json.dumps(error_json)}"
|
||||
else: # Non-dict JSON error
|
||||
user_display_error_message = f"API Error: {str(error_json)}"
|
||||
except json.JSONDecodeError:
|
||||
# If not JSON, use the raw content if it's not too long, or a summary
|
||||
if hasattr(e, "response") and e.response is not None and e.response.content:
|
||||
raw_content = e.response.content.decode(errors='ignore')
|
||||
if len(raw_content) < 200: # Arbitrary limit for display
|
||||
user_display_error_message = f"API Error (raw): {raw_content}"
|
||||
else:
|
||||
error_message = f"API Error: {error_json}"
|
||||
except Exception as json_error:
|
||||
# If we can't parse the JSON, fall back to the original error message
|
||||
logging.debug(
|
||||
f"[DEBUG] Failed to parse error response: {str(json_error)}"
|
||||
user_display_error_message = f"API Error (raw, status {status_code})"
|
||||
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method=method, request_url=url,
|
||||
response_status_code=status_code,
|
||||
response_headers=dict(e.response.headers) if hasattr(e, "response") and e.response is not None else None,
|
||||
response_content=error_content_for_log,
|
||||
error_message=original_error_message # Log the original exception string as error
|
||||
)
|
||||
|
||||
logging.debug(f"[DEBUG] API Error: {user_display_error_message} (Status: {status_code})")
|
||||
if hasattr(e, "response") and e.response is not None and e.response.content:
|
||||
logging.debug(f"[DEBUG] Response content: {e.response.content}")
|
||||
|
||||
# Retry if the status code is in our retry list and we haven't exhausted retries
|
||||
if (status_code in self.retry_status_codes and
|
||||
retry_count < self.max_retries):
|
||||
|
||||
delay = self.retry_delay * (self.retry_backoff_factor ** retry_count)
|
||||
logging.warning(
|
||||
f"HTTP error {status_code}. "
|
||||
f"Retrying in {delay:.2f}s ({retry_count + 1}/{self.max_retries})"
|
||||
)
|
||||
time.sleep(delay)
|
||||
return self.request(
|
||||
method=method,
|
||||
path=path,
|
||||
params=params,
|
||||
data=data,
|
||||
files=files,
|
||||
headers=headers,
|
||||
content_type=content_type,
|
||||
multipart_parser=multipart_parser,
|
||||
retry_count=retry_count + 1,
|
||||
)
|
||||
|
||||
logging.debug(f"[DEBUG] API Error: {error_message} (Status: {status_code})")
|
||||
if hasattr(e, "response") and e.response.content:
|
||||
logging.debug(f"[DEBUG] Response content: {e.response.content}")
|
||||
# Specific error messages for common status codes for user display
|
||||
if status_code == 401:
|
||||
error_message = "Unauthorized: Please login first to use this node."
|
||||
if status_code == 402:
|
||||
error_message = "Payment Required: Please add credits to your account to use this node."
|
||||
if status_code == 409:
|
||||
error_message = "There is a problem with your account. Please contact support@comfy.org. "
|
||||
if status_code == 429:
|
||||
error_message = "Rate Limit Exceeded: Please try again later."
|
||||
raise Exception(error_message)
|
||||
user_display_error_message = "Unauthorized: Please login first to use this node."
|
||||
elif status_code == 402:
|
||||
user_display_error_message = "Payment Required: Please add credits to your account to use this node."
|
||||
elif status_code == 409:
|
||||
user_display_error_message = "There is a problem with your account. Please contact support@comfy.org."
|
||||
elif status_code == 429:
|
||||
user_display_error_message = "Rate Limit Exceeded: Please try again later."
|
||||
# else, user_display_error_message remains as parsed from response or original HTTPError string
|
||||
|
||||
raise Exception(user_display_error_message) # Raise with the user-friendly message
|
||||
|
||||
# Parse and return JSON response
|
||||
if response.content:
|
||||
@ -336,26 +610,126 @@ class ApiClient:
|
||||
upload_url: str,
|
||||
file: io.BytesIO | str,
|
||||
content_type: str | None = None,
|
||||
max_retries: int = 3,
|
||||
retry_delay: float = 1.0,
|
||||
retry_backoff_factor: float = 2.0,
|
||||
):
|
||||
"""Upload a file to the API. Make sure the file has a filename equal to what the url expects.
|
||||
"""Upload a file to the API with retry logic.
|
||||
|
||||
Args:
|
||||
upload_url: The URL to upload to
|
||||
file: Either a file path string, BytesIO object, or tuple of (file_path, filename)
|
||||
mime_type: Optional mime type to set for the upload
|
||||
content_type: Optional mime type to set for the upload
|
||||
max_retries: Maximum number of retry attempts
|
||||
retry_delay: Initial delay between retries in seconds
|
||||
retry_backoff_factor: Multiplier for the delay after each retry
|
||||
"""
|
||||
headers = {}
|
||||
if content_type:
|
||||
headers["Content-Type"] = content_type
|
||||
|
||||
# Prepare the file data
|
||||
if isinstance(file, io.BytesIO):
|
||||
file.seek(0) # Ensure we're at the start of the file
|
||||
data = file.read()
|
||||
return requests.put(upload_url, data=data, headers=headers)
|
||||
elif isinstance(file, str):
|
||||
with open(file, "rb") as f:
|
||||
data = f.read()
|
||||
return requests.put(upload_url, data=data, headers=headers)
|
||||
else:
|
||||
raise ValueError("File must be either a BytesIO object or a file path string")
|
||||
|
||||
# Try the upload with retries
|
||||
last_exception = None
|
||||
operation_id = f"upload_{upload_url.split('/')[-1]}_{uuid.uuid4().hex[:8]}" # Simplified ID for uploads
|
||||
|
||||
# Log initial attempt (without full file data for brevity)
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method="PUT",
|
||||
request_url=upload_url,
|
||||
request_headers=headers,
|
||||
request_data=f"[File data of type {content_type or 'unknown'}, size {len(data)} bytes]"
|
||||
)
|
||||
|
||||
for retry_attempt in range(max_retries + 1):
|
||||
try:
|
||||
response = requests.put(upload_url, data=data, headers=headers)
|
||||
response.raise_for_status()
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method="PUT", request_url=upload_url, # For context
|
||||
response_status_code=response.status_code,
|
||||
response_headers=dict(response.headers),
|
||||
response_content="File uploaded successfully." # Or response.text if available
|
||||
)
|
||||
return response
|
||||
|
||||
except (requests.ConnectionError, requests.Timeout, requests.HTTPError) as e:
|
||||
last_exception = e
|
||||
error_message_for_log = f"{type(e).__name__}: {str(e)}"
|
||||
response_content_for_log = None
|
||||
status_code_for_log = None
|
||||
headers_for_log = None
|
||||
|
||||
if hasattr(e, 'response') and e.response is not None:
|
||||
status_code_for_log = e.response.status_code
|
||||
headers_for_log = dict(e.response.headers)
|
||||
try:
|
||||
response_content_for_log = e.response.json()
|
||||
except json.JSONDecodeError:
|
||||
response_content_for_log = e.response.content
|
||||
|
||||
|
||||
request_logger.log_request_response(
|
||||
operation_id=operation_id,
|
||||
request_method="PUT", request_url=upload_url,
|
||||
response_status_code=status_code_for_log,
|
||||
response_headers=headers_for_log,
|
||||
response_content=response_content_for_log,
|
||||
error_message=error_message_for_log
|
||||
)
|
||||
|
||||
if retry_attempt < max_retries:
|
||||
delay = retry_delay * (retry_backoff_factor ** retry_attempt)
|
||||
logging.warning(
|
||||
f"File upload failed: {str(e)}. "
|
||||
f"Retrying in {delay:.2f}s ({retry_attempt + 1}/{max_retries})"
|
||||
)
|
||||
time.sleep(delay)
|
||||
else:
|
||||
break # Max retries reached
|
||||
|
||||
# If we've exhausted all retries, determine the final error type and raise
|
||||
final_error_message = f"Failed to upload file after {max_retries + 1} attempts. Error: {str(last_exception)}"
|
||||
try:
|
||||
# Check basic internet connectivity
|
||||
check_response = requests.get("https://www.google.com", timeout=5.0, verify=True) # Assuming verify=True is desired
|
||||
if check_response.status_code >= 500: # Google itself has an issue (rare)
|
||||
final_error_message = (f"Failed to upload file. Internet connectivity check to Google failed "
|
||||
f"(status {check_response.status_code}). Original error: {str(last_exception)}")
|
||||
# Not raising LocalNetworkError here as Google itself might be down.
|
||||
# If Google is reachable, the issue is likely with the upload server or a more specific local problem
|
||||
# not caught by a simple Google ping (e.g., DNS for the specific upload URL, firewall).
|
||||
# The original last_exception is probably most relevant.
|
||||
|
||||
except (requests.RequestException, socket.error) as conn_check_exc:
|
||||
# Could not reach Google, likely a local network issue
|
||||
final_error_message = (f"Failed to upload file due to network connectivity issues "
|
||||
f"(cannot reach Google: {str(conn_check_exc)}). "
|
||||
f"Original upload error: {str(last_exception)}")
|
||||
request_logger.log_request_response( # Log final failure reason
|
||||
operation_id=operation_id,
|
||||
request_method="PUT", request_url=upload_url,
|
||||
error_message=final_error_message
|
||||
)
|
||||
raise LocalNetworkError(final_error_message) from last_exception
|
||||
|
||||
request_logger.log_request_response( # Log final failure reason if not LocalNetworkError
|
||||
operation_id=operation_id,
|
||||
request_method="PUT", request_url=upload_url,
|
||||
error_message=final_error_message
|
||||
)
|
||||
raise Exception(final_error_message) from last_exception
|
||||
|
||||
|
||||
class ApiEndpoint(Generic[T, R]):
|
||||
@ -403,6 +777,9 @@ class SynchronousOperation(Generic[T, R]):
|
||||
verify_ssl: bool = True,
|
||||
content_type: str = "application/json",
|
||||
multipart_parser: Callable = None,
|
||||
max_retries: int = 3,
|
||||
retry_delay: float = 1.0,
|
||||
retry_backoff_factor: float = 2.0,
|
||||
):
|
||||
self.endpoint = endpoint
|
||||
self.request = request
|
||||
@ -419,8 +796,12 @@ class SynchronousOperation(Generic[T, R]):
|
||||
self.files = files
|
||||
self.content_type = content_type
|
||||
self.multipart_parser = multipart_parser
|
||||
self.max_retries = max_retries
|
||||
self.retry_delay = retry_delay
|
||||
self.retry_backoff_factor = retry_backoff_factor
|
||||
|
||||
def execute(self, client: Optional[ApiClient] = None) -> R:
|
||||
"""Execute the API operation using the provided client or create one"""
|
||||
"""Execute the API operation using the provided client or create one with retry support"""
|
||||
try:
|
||||
# Create client if not provided
|
||||
if client is None:
|
||||
@ -430,6 +811,9 @@ class SynchronousOperation(Generic[T, R]):
|
||||
comfy_api_key=self.comfy_api_key,
|
||||
timeout=self.timeout,
|
||||
verify_ssl=self.verify_ssl,
|
||||
max_retries=self.max_retries,
|
||||
retry_delay=self.retry_delay,
|
||||
retry_backoff_factor=self.retry_backoff_factor,
|
||||
)
|
||||
|
||||
# Convert request model to dict, but use None for EmptyRequest
|
||||
@ -443,11 +827,6 @@ class SynchronousOperation(Generic[T, R]):
|
||||
if isinstance(value, Enum):
|
||||
request_dict[key] = value.value
|
||||
|
||||
if request_dict:
|
||||
for key, value in request_dict.items():
|
||||
if isinstance(value, Enum):
|
||||
request_dict[key] = value.value
|
||||
|
||||
# Debug log for request
|
||||
logging.debug(
|
||||
f"[DEBUG] API Request: {self.endpoint.method.value} {self.endpoint.path}"
|
||||
@ -455,7 +834,7 @@ class SynchronousOperation(Generic[T, R]):
|
||||
logging.debug(f"[DEBUG] Request Data: {json.dumps(request_dict, indent=2)}")
|
||||
logging.debug(f"[DEBUG] Query Params: {self.endpoint.query_params}")
|
||||
|
||||
# Make the request
|
||||
# Make the request with built-in retry
|
||||
resp = client.request(
|
||||
method=self.endpoint.method.value,
|
||||
path=self.endpoint.path,
|
||||
@ -476,8 +855,18 @@ class SynchronousOperation(Generic[T, R]):
|
||||
# Parse and return the response
|
||||
return self._parse_response(resp)
|
||||
|
||||
except LocalNetworkError as e:
|
||||
# Propagate specific network error types
|
||||
logging.error(f"[ERROR] Local network error: {str(e)}")
|
||||
raise
|
||||
|
||||
except ApiServerError as e:
|
||||
# Propagate API server errors
|
||||
logging.error(f"[ERROR] API server error: {str(e)}")
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"[DEBUG] API Exception: {str(e)}")
|
||||
logging.error(f"[ERROR] API Exception: {str(e)}")
|
||||
raise Exception(str(e))
|
||||
|
||||
def _parse_response(self, resp):
|
||||
@ -517,6 +906,10 @@ class PollingOperation(Generic[T, R]):
|
||||
comfy_api_key: Optional[str] = None,
|
||||
auth_kwargs: Optional[Dict[str,str]] = None,
|
||||
poll_interval: float = 5.0,
|
||||
max_poll_attempts: int = 120, # Default max polling attempts (10 minutes with 5s interval)
|
||||
max_retries: int = 3, # Max retries per individual API call
|
||||
retry_delay: float = 1.0,
|
||||
retry_backoff_factor: float = 2.0,
|
||||
):
|
||||
self.poll_endpoint = poll_endpoint
|
||||
self.request = request
|
||||
@ -527,6 +920,10 @@ class PollingOperation(Generic[T, R]):
|
||||
self.auth_token = auth_kwargs.get("auth_token", self.auth_token)
|
||||
self.comfy_api_key = auth_kwargs.get("comfy_api_key", self.comfy_api_key)
|
||||
self.poll_interval = poll_interval
|
||||
self.max_poll_attempts = max_poll_attempts
|
||||
self.max_retries = max_retries
|
||||
self.retry_delay = retry_delay
|
||||
self.retry_backoff_factor = retry_backoff_factor
|
||||
|
||||
# Polling configuration
|
||||
self.status_extractor = status_extractor or (
|
||||
@ -548,8 +945,23 @@ class PollingOperation(Generic[T, R]):
|
||||
base_url=self.api_base,
|
||||
auth_token=self.auth_token,
|
||||
comfy_api_key=self.comfy_api_key,
|
||||
max_retries=self.max_retries,
|
||||
retry_delay=self.retry_delay,
|
||||
retry_backoff_factor=self.retry_backoff_factor,
|
||||
)
|
||||
return self._poll_until_complete(client)
|
||||
except LocalNetworkError as e:
|
||||
# Provide clear message for local network issues
|
||||
raise Exception(
|
||||
f"Polling failed due to local network issues. Please check your internet connection. "
|
||||
f"Details: {str(e)}"
|
||||
) from e
|
||||
except ApiServerError as e:
|
||||
# Provide clear message for API server issues
|
||||
raise Exception(
|
||||
f"Polling failed due to API server issues. The service may be experiencing problems. "
|
||||
f"Please try again later. Details: {str(e)}"
|
||||
) from e
|
||||
except Exception as e:
|
||||
raise Exception(f"Error during polling: {str(e)}")
|
||||
|
||||
@ -569,10 +981,13 @@ class PollingOperation(Generic[T, R]):
|
||||
def _poll_until_complete(self, client: ApiClient) -> R:
|
||||
"""Poll until the task is complete"""
|
||||
poll_count = 0
|
||||
consecutive_errors = 0
|
||||
max_consecutive_errors = min(5, self.max_retries * 2) # Limit consecutive errors
|
||||
|
||||
if self.progress_extractor:
|
||||
progress = utils.ProgressBar(PROGRESS_BAR_MAX)
|
||||
|
||||
while True:
|
||||
while poll_count < self.max_poll_attempts:
|
||||
try:
|
||||
poll_count += 1
|
||||
logging.debug(f"[DEBUG] Polling attempt #{poll_count}")
|
||||
@ -599,8 +1014,12 @@ class PollingOperation(Generic[T, R]):
|
||||
data=request_dict,
|
||||
)
|
||||
|
||||
# Successfully got a response, reset consecutive error count
|
||||
consecutive_errors = 0
|
||||
|
||||
# Parse response
|
||||
response_obj = self.poll_endpoint.response_model.model_validate(resp)
|
||||
|
||||
# Check if task is complete
|
||||
status = self._check_task_status(response_obj)
|
||||
logging.debug(f"[DEBUG] Task Status: {status}")
|
||||
@ -630,6 +1049,38 @@ class PollingOperation(Generic[T, R]):
|
||||
)
|
||||
time.sleep(self.poll_interval)
|
||||
|
||||
except (LocalNetworkError, ApiServerError) as e:
|
||||
# For network-related errors, increment error count and potentially abort
|
||||
consecutive_errors += 1
|
||||
if consecutive_errors >= max_consecutive_errors:
|
||||
raise Exception(
|
||||
f"Polling aborted after {consecutive_errors} consecutive network errors: {str(e)}"
|
||||
) from e
|
||||
|
||||
# Log the error but continue polling
|
||||
logging.warning(
|
||||
f"Network error during polling (attempt {poll_count}/{self.max_poll_attempts}): {str(e)}. "
|
||||
f"Will retry in {self.poll_interval} seconds."
|
||||
)
|
||||
time.sleep(self.poll_interval)
|
||||
|
||||
except Exception as e:
|
||||
# For other errors, increment count and potentially abort
|
||||
consecutive_errors += 1
|
||||
if consecutive_errors >= max_consecutive_errors:
|
||||
raise Exception(
|
||||
f"Polling aborted after {consecutive_errors} consecutive errors: {str(e)}"
|
||||
) from e
|
||||
|
||||
logging.error(f"[DEBUG] Polling error: {str(e)}")
|
||||
raise Exception(f"Error while polling: {str(e)}")
|
||||
logging.warning(
|
||||
f"Error during polling (attempt {poll_count}/{self.max_poll_attempts}): {str(e)}. "
|
||||
f"Will retry in {self.poll_interval} seconds."
|
||||
)
|
||||
time.sleep(self.poll_interval)
|
||||
|
||||
# If we've exhausted all polling attempts
|
||||
raise Exception(
|
||||
f"Polling timed out after {poll_count} attempts ({poll_count * self.poll_interval} seconds). "
|
||||
f"The operation may still be running on the server but is taking longer than expected."
|
||||
)
|
||||
|
125
comfy_api_nodes/apis/request_logger.py
Normal file
125
comfy_api_nodes/apis/request_logger.py
Normal file
@ -0,0 +1,125 @@
|
||||
import os
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import folder_paths
|
||||
|
||||
# Get the logger instance
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def get_log_directory():
|
||||
"""
|
||||
Ensures the API log directory exists within ComfyUI's temp directory
|
||||
and returns its path.
|
||||
"""
|
||||
base_temp_dir = folder_paths.get_temp_directory()
|
||||
log_dir = os.path.join(base_temp_dir, "api_logs")
|
||||
try:
|
||||
os.makedirs(log_dir, exist_ok=True)
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating API log directory {log_dir}: {e}")
|
||||
# Fallback to base temp directory if sub-directory creation fails
|
||||
return base_temp_dir
|
||||
return log_dir
|
||||
|
||||
def _format_data_for_logging(data):
|
||||
"""Helper to format data (dict, str, bytes) for logging."""
|
||||
if isinstance(data, bytes):
|
||||
try:
|
||||
return data.decode('utf-8') # Try to decode as text
|
||||
except UnicodeDecodeError:
|
||||
return f"[Binary data of length {len(data)} bytes]"
|
||||
elif isinstance(data, (dict, list)):
|
||||
try:
|
||||
return json.dumps(data, indent=2, ensure_ascii=False)
|
||||
except TypeError:
|
||||
return str(data) # Fallback for non-serializable objects
|
||||
return str(data)
|
||||
|
||||
def log_request_response(
|
||||
operation_id: str,
|
||||
request_method: str,
|
||||
request_url: str,
|
||||
request_headers: dict | None = None,
|
||||
request_params: dict | None = None,
|
||||
request_data: any = None,
|
||||
response_status_code: int | None = None,
|
||||
response_headers: dict | None = None,
|
||||
response_content: any = None,
|
||||
error_message: str | None = None
|
||||
):
|
||||
"""
|
||||
Logs API request and response details to a file in the temp/api_logs directory.
|
||||
"""
|
||||
log_dir = get_log_directory()
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
||||
filename = f"{timestamp}_{operation_id.replace('/', '_').replace(':', '_')}.log"
|
||||
filepath = os.path.join(log_dir, filename)
|
||||
|
||||
log_content = []
|
||||
|
||||
log_content.append(f"Timestamp: {datetime.datetime.now().isoformat()}")
|
||||
log_content.append(f"Operation ID: {operation_id}")
|
||||
log_content.append("-" * 30 + " REQUEST " + "-" * 30)
|
||||
log_content.append(f"Method: {request_method}")
|
||||
log_content.append(f"URL: {request_url}")
|
||||
if request_headers:
|
||||
log_content.append(f"Headers:\n{_format_data_for_logging(request_headers)}")
|
||||
if request_params:
|
||||
log_content.append(f"Params:\n{_format_data_for_logging(request_params)}")
|
||||
if request_data:
|
||||
log_content.append(f"Data/Body:\n{_format_data_for_logging(request_data)}")
|
||||
|
||||
log_content.append("\n" + "-" * 30 + " RESPONSE " + "-" * 30)
|
||||
if response_status_code is not None:
|
||||
log_content.append(f"Status Code: {response_status_code}")
|
||||
if response_headers:
|
||||
log_content.append(f"Headers:\n{_format_data_for_logging(response_headers)}")
|
||||
if response_content:
|
||||
log_content.append(f"Content:\n{_format_data_for_logging(response_content)}")
|
||||
if error_message:
|
||||
log_content.append(f"Error:\n{error_message}")
|
||||
|
||||
try:
|
||||
with open(filepath, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(log_content))
|
||||
logger.debug(f"API log saved to: {filepath}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing API log to {filepath}: {e}")
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Example usage (for testing the logger directly)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
# Mock folder_paths for direct execution if not running within ComfyUI full context
|
||||
if not hasattr(folder_paths, 'get_temp_directory'):
|
||||
class MockFolderPaths:
|
||||
def get_temp_directory(self):
|
||||
# Create a local temp dir for testing if needed
|
||||
p = os.path.join(os.path.dirname(__file__), 'temp_test_logs')
|
||||
os.makedirs(p, exist_ok=True)
|
||||
return p
|
||||
folder_paths = MockFolderPaths()
|
||||
|
||||
log_request_response(
|
||||
operation_id="test_operation_get",
|
||||
request_method="GET",
|
||||
request_url="https://api.example.com/test",
|
||||
request_headers={"Authorization": "Bearer testtoken"},
|
||||
request_params={"param1": "value1"},
|
||||
response_status_code=200,
|
||||
response_content={"message": "Success!"}
|
||||
)
|
||||
log_request_response(
|
||||
operation_id="test_operation_post_error",
|
||||
request_method="POST",
|
||||
request_url="https://api.example.com/submit",
|
||||
request_data={"key": "value", "nested": {"num": 123}},
|
||||
error_message="Connection timed out"
|
||||
)
|
||||
log_request_response(
|
||||
operation_id="test_binary_response",
|
||||
request_method="GET",
|
||||
request_url="https://api.example.com/image.png",
|
||||
response_status_code=200,
|
||||
response_content=b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR...' # Sample binary data
|
||||
)
|
Loading…
x
Reference in New Issue
Block a user