import os import json from dotenv import load_dotenv load_dotenv() from fastapi import FastAPI, Request, HTTPException from pydantic import BaseModel from fastapi.responses import JSONResponse from http import HTTPStatus from loguru import logger import uuid from database.database import init_db import sys from typing import Optional from datetime import datetime, timezone import asyncio from functools import wraps, partial from concurrent.futures import ThreadPoolExecutor from database.database import init_db, get_db from database.crud import create_analysis_record, update_analysis_record from llm.models import JiraWebhookPayload from llm.chains import analysis_chain, validate_response from api.handlers import router # Correct variable name from webhooks.handlers import webhook_router from database.crud import get_all_analysis_records, delete_all_analysis_records, get_analysis_by_id, get_analysis_record from logging_config import configure_logging # Initialize logging as early as possible from config import settings import signal from contextlib import asynccontextmanager cleanup_tasks = [] # Initialize cleanup_tasks globally # Setup async-compatible signal handling def handle_shutdown_signal(signum, loop): """Graceful shutdown signal handler""" logger.info(f"Received signal {signum}, initiating shutdown...") # Set shutdown flag and remove signal handlers to prevent reentrancy if not hasattr(loop, '_shutdown'): loop._shutdown = True # Prevent further signal handling for sig in (signal.SIGTERM, signal.SIGINT): loop.remove_signal_handler(sig) @asynccontextmanager async def lifespan(app: FastAPI): """ Context manager for managing the lifespan of the FastAPI application. Initializes the database, sets up signal handlers, and handles cleanup. """ # Initialize ThreadPoolExecutor for background tasks executor = ThreadPoolExecutor(max_workers=settings.thread_pool_max_workers) app.state.executor = executor # Flag to track if initialization succeeded init_success = False try: logger.info("Initializing application...") init_db() # Initialize the database # Setup signal handlers loop = asyncio.get_running_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, partial(handle_shutdown_signal, sig, loop)) logger.info("Signal handlers configured successfully") # Verify critical components if not hasattr(settings, 'langfuse_handler'): logger.error("Langfuse handler not found in settings") raise RuntimeError("Langfuse handler not initialized") logger.info("Application initialized successfully") init_success = True except Exception as e: logger.critical(f"Application initialization failed: {str(e)}. Exiting.") # Don't re-raise to allow finally block to execute cleanup try: # Yield control to the application yield finally: # Cleanup logic runs after application finishes if init_success: # Check shutdown flag before cleanup loop = asyncio.get_running_loop() if hasattr(loop, '_shutdown'): logger.info("Shutdown initiated, starting cleanup...") # Close langfuse with retry if hasattr(settings, 'langfuse_handler') and hasattr(settings.langfuse_handler, 'close'): try: await asyncio.wait_for(settings.langfuse_handler.close(), timeout=5.0) logger.info("Langfuse client closed successfully") except asyncio.TimeoutError: logger.warning("Timeout while closing Langfuse client") except Exception as e: logger.error(f"Error closing Langfuse client: {str(e)}") # Execute any other cleanup tasks if cleanup_tasks: try: await asyncio.gather(*cleanup_tasks) except Exception as e: logger.error(f"Error during additional cleanup tasks: {str(e)}") def create_app(): """Factory function to create FastAPI app instance""" configure_logging(log_level="DEBUG") _app = FastAPI(lifespan=lifespan) # Include routers without prefixes to match test expectations _app.include_router(webhook_router) _app.include_router(router) # Add health check endpoint @_app.get("/health") async def health_check(): return {"status": "healthy"} # Add error handling middleware @_app.middleware("http") async def error_handling_middleware(request: Request, call_next): request_id = str(uuid.uuid4()) logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}") try: response = await call_next(request) if response.status_code >= 400: error_response = ErrorResponse( error_id=request_id, timestamp=datetime.now(timezone.utc).isoformat(), status_code=response.status_code, message=HTTPStatus(response.status_code).phrase, details="Endpoint not found or invalid request" ) return JSONResponse(status_code=response.status_code, content=error_response.model_dump()) return response except HTTPException as e: logger.error(f"HTTP Error: {e.status_code} - {e.detail}") error_response = ErrorResponse( error_id=request_id, timestamp=datetime.now(timezone.utc).isoformat(), status_code=e.status_code, message=e.detail, details=str(e) ) return JSONResponse(status_code=e.status_code, content=error_response.model_dump()) except Exception as e: logger.error(f"Unexpected error: {str(e)}") error_response = ErrorResponse( error_id=request_id, timestamp=datetime.now(timezone.utc).isoformat(), status_code=500, message="Internal Server Error", details=str(e) ) return JSONResponse(status_code=500, content=error_response.model_dump()) return _app from api.handlers import test_llm_endpoint app = create_app() async def process_jira_webhook_background(record_id: int, payload: JiraWebhookPayload): """ Background task to process Jira webhook and perform LLM analysis. """ try: logger.info(f"Starting background processing for record ID: {record_id}, Issue Key: {payload.issueKey}") except Exception as e: logger.error(f"Failed to start background processing for record {record_id}: {str(e)}") raise with get_db() as db: try: update_analysis_record(db, record_id, "processing") llm_input = { "issueKey": payload.issueKey, "summary": payload.summary, "description": payload.description if payload.description else "No description provided.", "status": payload.status if payload.status else "Unknown", "labels": ", ".join(payload.labels) if payload.labels else "None", "assignee": payload.assignee if payload.assignee else "Unassigned", "updated": payload.updated if payload.updated else "Unknown", "comment": payload.comment if payload.comment else "No new comment provided." } analysis_result = await analysis_chain.ainvoke(llm_input) if not validate_response(analysis_result): logger.warning(f"Invalid LLM response format for {payload.issueKey}") analysis_result = { "hasMultipleEscalations": False, "customerSentiment": "neutral" } update_analysis_record(db, record_id, "failed", analysis_result=analysis_result, error_message="Invalid LLM response format") logger.error(f"LLM processing failed for {payload.issueKey}: Invalid response format") return update_analysis_record(db, record_id, "completed", analysis_result=analysis_result) logger.info(f"Background processing completed for record ID: {record_id}, Issue Key: {payload.issueKey}") except Exception as e: logger.error(f"Error during background processing for record ID {record_id}, Issue Key {payload.issueKey}: {str(e)}") update_analysis_record(db, record_id, "failed", error_message=str(e)) def retry(max_retries: int = 3, delay: float = 1.0): """Decorator for retrying failed operations""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_error = None for attempt in range(max_retries): try: return await func(*args, **kwargs) except Exception as e: last_error = e logger.warning(f"Attempt {attempt + 1} failed: {str(e)}") if attempt < max_retries - 1: await asyncio.sleep(delay * (attempt + 1)) raise last_error return wrapper return decorator class ErrorResponse(BaseModel): error_id: str timestamp: str status_code: int message: str details: Optional[str] = None