222 lines
7.1 KiB
Python
222 lines
7.1 KiB
Python
import os
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
from fastapi import FastAPI, Request, HTTPException
|
|
from pydantic import BaseModel
|
|
from fastapi.responses import JSONResponse
|
|
from loguru import logger
|
|
import uuid
|
|
import sys
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
import asyncio
|
|
from functools import wraps
|
|
import redis.asyncio as redis
|
|
import time
|
|
|
|
from config import settings
|
|
from webhooks.handlers import JiraWebhookHandler
|
|
from llm.models import JiraWebhookPayload
|
|
from logging_config import configure_logging
|
|
|
|
# Initialize logging first
|
|
configure_logging(log_level="DEBUG")
|
|
|
|
import signal
|
|
|
|
# Initialize Redis client
|
|
redis_client = None
|
|
try:
|
|
if settings.redis.enabled:
|
|
redis_client = redis.from_url(settings.redis.url)
|
|
logger.info("Redis client initialized")
|
|
else:
|
|
logger.info("Redis is disabled in configuration")
|
|
|
|
app = FastAPI()
|
|
logger.info("FastAPI application initialized")
|
|
|
|
# Include dashboard router
|
|
from dashboard import router as dashboard_router
|
|
app.include_router(dashboard_router, prefix="/monitoring")
|
|
|
|
@app.on_event("shutdown")
|
|
async def shutdown_event():
|
|
"""Handle application shutdown"""
|
|
logger.info("Shutting down application...")
|
|
try:
|
|
# Cleanup Langfuse client if exists
|
|
if hasattr(settings, 'langfuse_handler') and hasattr(settings.langfuse_handler, 'close'):
|
|
try:
|
|
await settings.langfuse_handler.close()
|
|
except Exception as e:
|
|
logger.warning(f"Error closing handler: {str(e)}")
|
|
logger.info("Cleanup completed successfully")
|
|
except Exception as e:
|
|
logger.error(f"Error during shutdown: {str(e)}")
|
|
raise
|
|
|
|
def handle_shutdown_signal(signum, frame):
|
|
"""Handle OS signals for graceful shutdown"""
|
|
logger.info(f"Received signal {signum}, initiating shutdown...")
|
|
# Exit immediately after cleanup is complete
|
|
os._exit(0)
|
|
|
|
# Register signal handlers
|
|
signal.signal(signal.SIGTERM, handle_shutdown_signal)
|
|
signal.signal(signal.SIGINT, handle_shutdown_signal)
|
|
|
|
except Exception as e:
|
|
logger.critical(f"Failed to initialize FastAPI: {str(e)}")
|
|
logger.warning("Application cannot continue without FastAPI initialization")
|
|
sys.exit(1)
|
|
|
|
def retry(max_retries: int = 3, delay: float = 1.0):
|
|
"""Decorator for retrying failed operations"""
|
|
def decorator(func):
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
last_error = None
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return await func(*args, **kwargs)
|
|
except Exception as e:
|
|
last_error = e
|
|
logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
|
|
if attempt < max_retries - 1:
|
|
await asyncio.sleep(delay * (attempt + 1))
|
|
raise last_error
|
|
return wrapper
|
|
return decorator
|
|
|
|
class HealthCheckResponse(BaseModel):
|
|
status: str
|
|
timestamp: str
|
|
details: Optional[dict] = None
|
|
|
|
class RedisHealthResponse(BaseModel):
|
|
connected: bool
|
|
latency_ms: Optional[float] = None
|
|
error: Optional[str] = None
|
|
|
|
class WorkerStatusResponse(BaseModel):
|
|
running: bool
|
|
workers: int
|
|
active_tasks: int
|
|
queue_depth: int
|
|
|
|
class ErrorResponse(BaseModel):
|
|
error_id: str
|
|
timestamp: str
|
|
status_code: int
|
|
message: str
|
|
details: Optional[str] = None
|
|
|
|
@app.middleware("http")
|
|
async def error_handling_middleware(request: Request, call_next):
|
|
request_id = str(uuid.uuid4())
|
|
logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}")
|
|
|
|
try:
|
|
response = await call_next(request)
|
|
return response
|
|
except HTTPException as e:
|
|
logger.error(f"HTTP Error: {e.status_code} - {e.detail}")
|
|
error_response = ErrorResponse(
|
|
error_id=request_id,
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
status_code=e.status_code,
|
|
message=e.detail,
|
|
details=str(e)
|
|
)
|
|
return JSONResponse(status_code=e.status_code, content=error_response.model_dump())
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error: {str(e)}")
|
|
error_response = ErrorResponse(
|
|
error_id=request_id,
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
status_code=500,
|
|
message="Internal Server Error",
|
|
details=str(e)
|
|
)
|
|
return JSONResponse(status_code=500, content=error_response.model_dump())
|
|
webhook_handler = JiraWebhookHandler()
|
|
|
|
@app.post("/jira-webhook")
|
|
async def jira_webhook_handler(payload: JiraWebhookPayload):
|
|
logger.info(f"Received webhook payload: {payload.model_dump()}")
|
|
try:
|
|
response = await webhook_handler.handle_webhook(payload)
|
|
logger.info(f"Webhook processed successfully")
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"Error processing webhook: {str(e)}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.post("/test-llm")
|
|
async def test_llm():
|
|
"""Test endpoint for LLM integration"""
|
|
test_payload = JiraWebhookPayload(
|
|
issueKey="TEST-123",
|
|
summary="Test issue",
|
|
description="This is a test issue for LLM integration",
|
|
comment="Testing OpenAI integration with Langfuse",
|
|
labels=["test"],
|
|
status="Open",
|
|
assignee="Tester",
|
|
updated="2025-07-04T21:40:00Z"
|
|
)
|
|
return await webhook_handler.handle_webhook(test_payload)
|
|
async def check_redis_health() -> RedisHealthResponse:
|
|
"""Check Redis connection health and latency"""
|
|
if not redis_client:
|
|
return RedisHealthResponse(connected=False, error="Redis is disabled")
|
|
|
|
try:
|
|
start_time = time.time()
|
|
await redis_client.ping()
|
|
latency_ms = (time.time() - start_time) * 1000
|
|
return RedisHealthResponse(connected=True, latency_ms=latency_ms)
|
|
except Exception as e:
|
|
return RedisHealthResponse(connected=False, error=str(e))
|
|
|
|
async def get_worker_status() -> WorkerStatusResponse:
|
|
"""Get worker process status and queue depth"""
|
|
# TODO: Implement actual worker status checking
|
|
return WorkerStatusResponse(
|
|
running=True,
|
|
workers=1,
|
|
active_tasks=0,
|
|
queue_depth=0
|
|
)
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""Service health check endpoint"""
|
|
redis_health = await check_redis_health()
|
|
worker_status = await get_worker_status()
|
|
|
|
return HealthCheckResponse(
|
|
status="healthy",
|
|
timestamp=datetime.utcnow().isoformat(),
|
|
details={
|
|
"redis": redis_health.model_dump(),
|
|
"workers": worker_status.model_dump()
|
|
}
|
|
)
|
|
|
|
@app.get("/health/redis")
|
|
async def redis_health_check():
|
|
"""Redis-specific health check endpoint"""
|
|
return await check_redis_health()
|
|
|
|
@app.get("/health/workers")
|
|
async def workers_health_check():
|
|
"""Worker process health check endpoint"""
|
|
return await get_worker_status()
|
|
|
|
# if __name__ == "__main__":
|
|
# import uvicorn
|
|
# uvicorn.run(app, host="0.0.0.0", port=8000)
|