239 lines
		
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			239 lines
		
	
	
		
			9.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import json
 | |
| from dotenv import load_dotenv
 | |
| load_dotenv()
 | |
| 
 | |
| from fastapi import FastAPI, Request, HTTPException
 | |
| from pydantic import BaseModel
 | |
| from fastapi.responses import JSONResponse
 | |
| from http import HTTPStatus
 | |
| from loguru import logger
 | |
| import uuid
 | |
| from database.database import init_db
 | |
| import sys
 | |
| from typing import Optional
 | |
| from datetime import datetime, timezone
 | |
| import asyncio
 | |
| from functools import wraps, partial
 | |
| from concurrent.futures import ThreadPoolExecutor
 | |
| from database.database import init_db, get_db
 | |
| from database.crud import create_analysis_record, update_analysis_record
 | |
| from llm.models import JiraWebhookPayload
 | |
| from llm.chains import analysis_chain, validate_response
 | |
| from api.handlers import router  # Correct variable name
 | |
| from webhooks.handlers import webhook_router
 | |
| from database.crud import get_all_analysis_records, delete_all_analysis_records, get_analysis_by_id, get_analysis_record
 | |
| from logging_config import configure_logging
 | |
| 
 | |
| # Initialize logging as early as possible
 | |
| from config import settings
 | |
| 
 | |
| import signal
 | |
| 
 | |
| from contextlib import asynccontextmanager
 | |
| 
 | |
| cleanup_tasks = [] # Initialize cleanup_tasks globally
 | |
| 
 | |
| # Setup async-compatible signal handling
 | |
| def handle_shutdown_signal(signum, loop):
 | |
|     """Graceful shutdown signal handler"""
 | |
|     logger.info(f"Received signal {signum}, initiating shutdown...")
 | |
|     # Set shutdown flag and remove signal handlers to prevent reentrancy
 | |
|     if not hasattr(loop, '_shutdown'):
 | |
|         loop._shutdown = True
 | |
| 
 | |
|         # Prevent further signal handling
 | |
|         for sig in (signal.SIGTERM, signal.SIGINT):
 | |
|             loop.remove_signal_handler(sig)
 | |
|     
 | |
| @asynccontextmanager
 | |
| async def lifespan(app: FastAPI):
 | |
|     """
 | |
|     Context manager for managing the lifespan of the FastAPI application.
 | |
|     Initializes the database, sets up signal handlers, and handles cleanup.
 | |
|     """
 | |
|     # Initialize ThreadPoolExecutor for background tasks
 | |
|     executor = ThreadPoolExecutor(max_workers=settings.thread_pool_max_workers)
 | |
|     app.state.executor = executor
 | |
|     
 | |
|     # Flag to track if initialization succeeded
 | |
|     init_success = False
 | |
|     
 | |
|     try:
 | |
|         logger.info("Initializing application...")
 | |
|         init_db() # Initialize the database
 | |
|         
 | |
|         # Setup signal handlers
 | |
|         loop = asyncio.get_running_loop()
 | |
|         for sig in (signal.SIGTERM, signal.SIGINT):
 | |
|             loop.add_signal_handler(sig, partial(handle_shutdown_signal, sig, loop))
 | |
|         logger.info("Signal handlers configured successfully")
 | |
| 
 | |
|         # Verify critical components
 | |
|         if not hasattr(settings, 'langfuse_handler'):
 | |
|             logger.error("Langfuse handler not found in settings")
 | |
|             raise RuntimeError("Langfuse handler not initialized")
 | |
| 
 | |
|         logger.info("Application initialized successfully")
 | |
|         init_success = True
 | |
|     except Exception as e:
 | |
|         logger.critical(f"Application initialization failed: {str(e)}. Exiting.")
 | |
|         # Don't re-raise to allow finally block to execute cleanup
 | |
|     
 | |
|     try:
 | |
|         # Yield control to the application
 | |
|         yield
 | |
|     finally:
 | |
|         # Cleanup logic runs after application finishes
 | |
|         if init_success:
 | |
|             # Check shutdown flag before cleanup
 | |
|             loop = asyncio.get_running_loop()
 | |
|             if hasattr(loop, '_shutdown'):
 | |
|                 logger.info("Shutdown initiated, starting cleanup...")
 | |
|         
 | |
|         # Close langfuse with retry
 | |
|         if hasattr(settings, 'langfuse_handler') and hasattr(settings.langfuse_handler, 'close'):
 | |
|             try:
 | |
|                 await asyncio.wait_for(settings.langfuse_handler.close(), timeout=5.0)
 | |
|                 logger.info("Langfuse client closed successfully")
 | |
|             except asyncio.TimeoutError:
 | |
|                 logger.warning("Timeout while closing Langfuse client")
 | |
|             except Exception as e:
 | |
|                 logger.error(f"Error closing Langfuse client: {str(e)}")
 | |
|         
 | |
|         # Execute any other cleanup tasks
 | |
|         if cleanup_tasks:
 | |
|             try:
 | |
|                 await asyncio.gather(*cleanup_tasks)
 | |
|             except Exception as e:
 | |
|                 logger.error(f"Error during additional cleanup tasks: {str(e)}")
 | |
| def create_app():
 | |
|     """Factory function to create FastAPI app instance"""
 | |
|     configure_logging(log_level="DEBUG")
 | |
|     _app = FastAPI(lifespan=lifespan)
 | |
|     
 | |
|     # Include routers without prefixes to match test expectations
 | |
|     _app.include_router(webhook_router)
 | |
|     _app.include_router(router)
 | |
|     
 | |
|     # Add health check endpoint
 | |
|     @_app.get("/health")
 | |
|     async def health_check():
 | |
|         return {"status": "healthy"}
 | |
|     
 | |
|     # Add error handling middleware
 | |
|     @_app.middleware("http")
 | |
|     async def error_handling_middleware(request: Request, call_next):
 | |
|         request_id = str(uuid.uuid4())
 | |
|         logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}")
 | |
|         
 | |
|         try:
 | |
|             response = await call_next(request)
 | |
|             if response.status_code >= 400:
 | |
|                 error_response = ErrorResponse(
 | |
|                     error_id=request_id,
 | |
|                     timestamp=datetime.now(timezone.utc).isoformat(),
 | |
|                     status_code=response.status_code,
 | |
|                     message=HTTPStatus(response.status_code).phrase,
 | |
|                     details="Endpoint not found or invalid request"
 | |
|                 )
 | |
|                 return JSONResponse(status_code=response.status_code, content=error_response.model_dump())
 | |
|             return response
 | |
|         except HTTPException as e:
 | |
|             logger.error(f"HTTP Error: {e.status_code} - {e.detail}")
 | |
|             error_response = ErrorResponse(
 | |
|                 error_id=request_id,
 | |
|                 timestamp=datetime.now(timezone.utc).isoformat(),
 | |
|                 status_code=e.status_code,
 | |
|                 message=e.detail,
 | |
|                 details=str(e)
 | |
|             )
 | |
|             return JSONResponse(status_code=e.status_code, content=error_response.model_dump())
 | |
|         except Exception as e:
 | |
|             logger.error(f"Unexpected error: {str(e)}")
 | |
|             error_response = ErrorResponse(
 | |
|                 error_id=request_id,
 | |
|                 timestamp=datetime.now(timezone.utc).isoformat(),
 | |
|                 status_code=500,
 | |
|                 message="Internal Server Error",
 | |
|                 details=str(e)
 | |
|             )
 | |
|             return JSONResponse(status_code=500, content=error_response.model_dump())
 | |
|     
 | |
|     return _app
 | |
| 
 | |
| from api.handlers import test_llm_endpoint
 | |
| 
 | |
| app = create_app()
 | |
| 
 | |
|  
 | |
| async def process_jira_webhook_background(record_id: int, payload: JiraWebhookPayload):
 | |
|     """
 | |
|     Background task to process Jira webhook and perform LLM analysis.
 | |
|     """
 | |
|     try:
 | |
|         logger.info(f"Starting background processing for record ID: {record_id}, Issue Key: {payload.issueKey}")
 | |
|     except Exception as e:
 | |
|         logger.error(f"Failed to start background processing for record {record_id}: {str(e)}")
 | |
|         raise
 | |
|     
 | |
|     with get_db() as db:
 | |
|         try:
 | |
|             update_analysis_record(db, record_id, "processing")
 | |
|             
 | |
|             llm_input = {
 | |
|                 "issueKey": payload.issueKey,
 | |
|                 "summary": payload.summary,
 | |
|                 "description": payload.description if payload.description else "No description provided.",
 | |
|                 "status": payload.status if payload.status else "Unknown",
 | |
|                 "labels": ", ".join(payload.labels) if payload.labels else "None",
 | |
|                 "assignee": payload.assignee if payload.assignee else "Unassigned",
 | |
|                 "updated": payload.updated if payload.updated else "Unknown",
 | |
|                 "comment": payload.comment if payload.comment else "No new comment provided."
 | |
|             }
 | |
|             
 | |
|             analysis_result = await analysis_chain.ainvoke(llm_input)
 | |
|             
 | |
|             if not validate_response(analysis_result):
 | |
|                 logger.warning(f"Invalid LLM response format for {payload.issueKey}")
 | |
|                 analysis_result = {
 | |
|                     "hasMultipleEscalations": False,
 | |
|                     "customerSentiment": "neutral"
 | |
|                 }
 | |
|                 update_analysis_record(db, record_id, "failed", analysis_result=analysis_result, error_message="Invalid LLM response format")
 | |
|                 logger.error(f"LLM processing failed for {payload.issueKey}: Invalid response format")
 | |
|                 return
 | |
|             
 | |
|             update_analysis_record(db, record_id, "completed", analysis_result=analysis_result)
 | |
|             logger.info(f"Background processing completed for record ID: {record_id}, Issue Key: {payload.issueKey}")
 | |
|             
 | |
|         except Exception as e:
 | |
|             logger.error(f"Error during background processing for record ID {record_id}, Issue Key {payload.issueKey}: {str(e)}")
 | |
|             update_analysis_record(db, record_id, "failed", error_message=str(e))
 | |
| 
 | |
| def retry(max_retries: int = 3, delay: float = 1.0):
 | |
|     """Decorator for retrying failed operations"""
 | |
|     def decorator(func):
 | |
|         @wraps(func)
 | |
|         async def wrapper(*args, **kwargs):
 | |
|             last_error = None
 | |
|             for attempt in range(max_retries):
 | |
|                 try:
 | |
|                     return await func(*args, **kwargs)
 | |
|                 except Exception as e:
 | |
|                     last_error = e
 | |
|                     logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
 | |
|                     if attempt < max_retries - 1:
 | |
|                         await asyncio.sleep(delay * (attempt + 1))
 | |
|             raise last_error
 | |
|         return wrapper
 | |
|     return decorator
 | |
| 
 | |
| class ErrorResponse(BaseModel):
 | |
|     error_id: str
 | |
|     timestamp: str
 | |
|     status_code: int
 | |
|     message: str
 | |
|     details: Optional[str] = None
 | |
| 
 |