205 lines
7.5 KiB
Python
205 lines
7.5 KiB
Python
import os
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
from fastapi import FastAPI, Request, HTTPException
|
|
from pydantic import BaseModel
|
|
from fastapi.responses import JSONResponse
|
|
from loguru import logger
|
|
import uuid
|
|
import sys
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
import asyncio
|
|
from functools import wraps, partial
|
|
|
|
from config import settings
|
|
from webhooks.handlers import JiraWebhookHandler
|
|
from llm.models import JiraWebhookPayload
|
|
from logging_config import configure_logging
|
|
|
|
# Initialize logging first
|
|
configure_logging(log_level="DEBUG")
|
|
|
|
import signal
|
|
|
|
from contextlib import asynccontextmanager
|
|
|
|
|
|
|
|
# Setup async-compatible signal handling
|
|
def handle_shutdown_signal(signum, loop):
|
|
"""Graceful shutdown signal handler"""
|
|
logger.info(f"Received signal {signum}, initiating shutdown...")
|
|
# Set shutdown flag and remove signal handlers to prevent reentrancy
|
|
if not hasattr(loop, '_shutdown'):
|
|
loop._shutdown = True
|
|
|
|
# Prevent further signal handling
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
loop.remove_signal_handler(sig)
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Handle startup and shutdown events"""
|
|
# Startup
|
|
try:
|
|
logger.info("Initializing application...")
|
|
|
|
# Initialize event loop
|
|
loop = asyncio.get_running_loop()
|
|
logger.debug("Event loop initialized")
|
|
|
|
# Setup signal handlers
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
loop.add_signal_handler(sig, partial(handle_shutdown_signal, sig, loop))
|
|
logger.info("Signal handlers configured successfully")
|
|
|
|
# Verify critical components
|
|
if not hasattr(settings, 'langfuse_handler'):
|
|
logger.error("Langfuse handler not found in settings")
|
|
raise RuntimeError("Langfuse handler not initialized")
|
|
|
|
logger.info("Application initialized successfully")
|
|
yield
|
|
|
|
# Check shutdown flag before cleanup
|
|
loop = asyncio.get_running_loop()
|
|
if hasattr(loop, '_shutdown'):
|
|
logger.info("Shutdown initiated, starting cleanup...")
|
|
except Exception as e:
|
|
logger.critical(f"Application initialization failed: {str(e)}")
|
|
raise
|
|
finally:
|
|
# Shutdown
|
|
logger.info("Shutting down application...")
|
|
try:
|
|
# Cleanup sequence with async safety
|
|
cleanup_tasks = []
|
|
shutdown_success = True
|
|
|
|
# Close langfuse with retry
|
|
if hasattr(settings, 'langfuse_handler') and hasattr(settings.langfuse_handler, 'close'):
|
|
async def close_langfuse():
|
|
try:
|
|
await asyncio.wait_for(settings.langfuse_handler.close(), timeout=5.0)
|
|
logger.info("Langfuse client closed successfully")
|
|
except asyncio.TimeoutError:
|
|
logger.warning("Timeout while closing Langfuse client")
|
|
except Exception as e:
|
|
logger.error(f"Error closing Langfuse client: {str(e)}")
|
|
cleanup_tasks.append(close_langfuse())
|
|
|
|
# Remove confirm_shutdown entirely
|
|
# Execute all cleanup tasks with timeout
|
|
try:
|
|
await asyncio.wait_for(asyncio.gather(*cleanup_tasks), timeout=10.0)
|
|
except asyncio.TimeoutError:
|
|
logger.warning("Timeout during cleanup sequence")
|
|
loop.stop() # Explicit loop stop after cleanup
|
|
# Cancel all pending tasks
|
|
async def cancel_pending_tasks():
|
|
try:
|
|
pending = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
|
|
for task in pending:
|
|
task.cancel()
|
|
await asyncio.gather(*pending, return_exceptions=True)
|
|
logger.info("Pending tasks cancelled successfully")
|
|
except Exception as e:
|
|
logger.error(f"Error cancelling pending tasks: {str(e)}")
|
|
cleanup_tasks.append(cancel_pending_tasks())
|
|
|
|
# Execute all cleanup tasks with timeout
|
|
try:
|
|
await asyncio.wait_for(asyncio.gather(*cleanup_tasks), timeout=10.0)
|
|
except asyncio.TimeoutError:
|
|
logger.warning("Timeout during cleanup sequence")
|
|
loop.stop() # Add explicit loop stop after cleanup
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during shutdown: {str(e)}")
|
|
raise
|
|
|
|
# Initialize FastAPI app after lifespan definition
|
|
app = FastAPI(lifespan=lifespan)
|
|
|
|
def retry(max_retries: int = 3, delay: float = 1.0):
|
|
"""Decorator for retrying failed operations"""
|
|
def decorator(func):
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
last_error = None
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return await func(*args, **kwargs)
|
|
except Exception as e:
|
|
last_error = e
|
|
logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
|
|
if attempt < max_retries - 1:
|
|
await asyncio.sleep(delay * (attempt + 1))
|
|
raise last_error
|
|
return wrapper
|
|
return decorator
|
|
|
|
class ErrorResponse(BaseModel):
|
|
error_id: str
|
|
timestamp: str
|
|
status_code: int
|
|
message: str
|
|
details: Optional[str] = None
|
|
|
|
@app.middleware("http")
|
|
async def error_handling_middleware(request: Request, call_next):
|
|
request_id = str(uuid.uuid4())
|
|
logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}")
|
|
|
|
try:
|
|
response = await call_next(request)
|
|
return response
|
|
except HTTPException as e:
|
|
logger.error(f"HTTP Error: {e.status_code} - {e.detail}")
|
|
error_response = ErrorResponse(
|
|
error_id=request_id,
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
status_code=e.status_code,
|
|
message=e.detail,
|
|
details=str(e)
|
|
)
|
|
return JSONResponse(status_code=e.status_code, content=error_response.model_dump())
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error: {str(e)}")
|
|
error_response = ErrorResponse(
|
|
error_id=request_id,
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
status_code=500,
|
|
message="Internal Server Error",
|
|
details=str(e)
|
|
)
|
|
return JSONResponse(status_code=500, content=error_response.model_dump())
|
|
webhook_handler = JiraWebhookHandler()
|
|
|
|
@app.post("/jira-webhook")
|
|
async def jira_webhook_handler(payload: JiraWebhookPayload):
|
|
logger.info(f"Received webhook payload: {payload.model_dump()}")
|
|
try:
|
|
response = await webhook_handler.handle_webhook(payload)
|
|
logger.info(f"Webhook processed successfully")
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"Error processing webhook: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.post("/test-llm")
|
|
async def test_llm():
|
|
"""Test endpoint for LLM integration"""
|
|
test_payload = JiraWebhookPayload(
|
|
issueKey="TEST-123",
|
|
summary="Test issue",
|
|
description="This is a test issue for LLM integration",
|
|
comment="Testing OpenAI integration with Langfuse",
|
|
labels=["test"],
|
|
status="Open",
|
|
assignee="Tester",
|
|
updated="2025-07-04T21:40:00Z"
|
|
)
|
|
return await webhook_handler.handle_webhook(test_payload)
|