jira-webhook-llm/jira_webhook_llm.py
Ireneusz Bachanowicz 935a8a49ae
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
Almost stable tests
2025-07-17 02:21:56 +02:00

251 lines
10 KiB
Python

import os
import json
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from fastapi.responses import JSONResponse
from http import HTTPStatus
from loguru import logger
import uuid
from database.database import init_db
import sys
from typing import Optional
from datetime import datetime, timezone
import asyncio
from functools import wraps, partial
from concurrent.futures import ThreadPoolExecutor
from database.database import init_db, get_db
from database.crud import create_analysis_record, update_analysis_record
from llm.models import JiraWebhookPayload
from llm.chains import analysis_chain, validate_response
from api.handlers import router # Correct variable name
from webhooks.handlers import webhook_router
from database.crud import get_all_analysis_records, delete_all_analysis_records, get_analysis_by_id, get_analysis_record
from logging_config import configure_logging
# Initialize logging as early as possible
from config import settings
import signal
from contextlib import asynccontextmanager
cleanup_tasks = [] # Initialize cleanup_tasks globally
# Setup async-compatible signal handling
def handle_shutdown_signal(signum, loop):
"""Graceful shutdown signal handler"""
logger.info(f"Received signal {signum}, initiating shutdown...")
# Set shutdown flag and remove signal handlers to prevent reentrancy
if not hasattr(loop, '_shutdown'):
loop._shutdown = True
# Prevent further signal handling
for sig in (signal.SIGTERM, signal.SIGINT):
loop.remove_signal_handler(sig)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Context manager for managing the lifespan of the FastAPI application.
Initializes the database, sets up signal handlers, and handles cleanup.
"""
# Initialize ThreadPoolExecutor for background tasks
executor = ThreadPoolExecutor(max_workers=settings.thread_pool_max_workers)
app.state.executor = executor
try:
logger.info("Initializing application...")
init_db() # Initialize the database
# Initialize event loop
loop = asyncio.get_running_loop()
logger.debug("Event loop initialized")
# Setup signal handlers
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, partial(handle_shutdown_signal, sig, loop))
logger.info("Signal handlers configured successfully")
# Verify critical components
if not hasattr(settings, 'langfuse_handler'):
logger.error("Langfuse handler not found in settings")
raise RuntimeError("Langfuse handler not initialized")
logger.info("Application initialized successfully")
yield
# Check shutdown flag before cleanup
loop = asyncio.get_running_loop()
if hasattr(loop, '_shutdown'):
logger.info("Shutdown initiated, starting cleanup...")
except Exception as e:
logger.critical(f"Application initialization failed: {str(e)}. Exiting.")
# Do not re-raise here, allow finally block to execute cleanup
finally:
# Ensure async context for cleanup
async def perform_shutdown():
try:
await execute_cleanup()
except Exception as e:
logger.error(f"Cleanup failed: {str(e)}")
finally:
loop.stop()
# Run within event loop
asyncio.get_event_loop().run_until_complete(perform_shutdown())
# Close langfuse with retry
if hasattr(settings, 'langfuse_handler') and hasattr(settings.langfuse_handler, 'close'):
async def close_langfuse():
try:
await asyncio.wait_for(settings.langfuse_handler.close(), timeout=5.0)
logger.info("Langfuse client closed successfully")
except asyncio.TimeoutError:
logger.warning("Timeout while closing Langfuse client")
except Exception as e:
logger.error(f"Error closing Langfuse client: {str(e)}")
cleanup_tasks.append(close_langfuse)
# Wrap in async function and structure properly
async def execute_cleanup():
try:
await asyncio.wait_for(asyncio.gather(*cleanup_tasks), timeout=10.0)
except asyncio.TimeoutError:
logger.warning("Timeout during cleanup sequence")
finally:
loop.stop()
# Wrap in try/except and ensure async context
# The following lines were causing syntax errors due to incorrect indentation and placement.
# The cleanup logic is already handled by `execute_cleanup` and `perform_shutdown`.
# Removing redundant and misplaced code.
# Ensure proper top-level placement after async blocks
def create_app():
"""Factory function to create FastAPI app instance"""
configure_logging(log_level="DEBUG")
_app = FastAPI(lifespan=lifespan)
# Include routers without prefixes to match test expectations
_app.include_router(webhook_router)
_app.include_router(router)
# Add health check endpoint
@_app.get("/health")
async def health_check():
return {"status": "healthy"}
# Add error handling middleware
@_app.middleware("http")
async def error_handling_middleware(request: Request, call_next):
request_id = str(uuid.uuid4())
logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}")
try:
response = await call_next(request)
if response.status_code >= 400:
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.now(timezone.utc).isoformat(),
status_code=response.status_code,
message=HTTPStatus(response.status_code).phrase,
details="Endpoint not found or invalid request"
)
return JSONResponse(status_code=response.status_code, content=error_response.model_dump())
return response
except HTTPException as e:
logger.error(f"HTTP Error: {e.status_code} - {e.detail}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.now(timezone.utc).isoformat(),
status_code=e.status_code,
message=e.detail,
details=str(e)
)
return JSONResponse(status_code=e.status_code, content=error_response.model_dump())
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.now(timezone.utc).isoformat(),
status_code=500,
message="Internal Server Error",
details=str(e)
)
return JSONResponse(status_code=500, content=error_response.model_dump())
return _app
app = create_app()
async def process_jira_webhook_background(record_id: int, payload: JiraWebhookPayload):
"""
Background task to process Jira webhook and perform LLM analysis.
"""
try:
logger.info(f"Starting background processing for record ID: {record_id}, Issue Key: {payload.issueKey}")
except Exception as e:
logger.error(f"Failed to start background processing for record {record_id}: {str(e)}")
raise
with get_db() as db:
try:
update_analysis_record(db, record_id, "processing")
llm_input = {
"issueKey": payload.issueKey,
"summary": payload.summary,
"description": payload.description if payload.description else "No description provided.",
"status": payload.status if payload.status else "Unknown",
"labels": ", ".join(payload.labels) if payload.labels else "None",
"assignee": payload.assignee if payload.assignee else "Unassigned",
"updated": payload.updated if payload.updated else "Unknown",
"comment": payload.comment if payload.comment else "No new comment provided."
}
analysis_result = await analysis_chain.ainvoke(llm_input)
if not validate_response(analysis_result):
logger.warning(f"Invalid LLM response format for {payload.issueKey}")
analysis_result = {
"hasMultipleEscalations": False,
"customerSentiment": "neutral"
}
update_analysis_record(db, record_id, "failed", analysis_result=analysis_result, error_message="Invalid LLM response format")
logger.error(f"LLM processing failed for {payload.issueKey}: Invalid response format")
return
update_analysis_record(db, record_id, "completed", analysis_result=analysis_result)
logger.info(f"Background processing completed for record ID: {record_id}, Issue Key: {payload.issueKey}")
except Exception as e:
logger.error(f"Error during background processing for record ID {record_id}, Issue Key {payload.issueKey}: {str(e)}")
update_analysis_record(db, record_id, "failed", error_message=str(e))
def retry(max_retries: int = 3, delay: float = 1.0):
"""Decorator for retrying failed operations"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
last_error = e
logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
await asyncio.sleep(delay * (attempt + 1))
raise last_error
return wrapper
return decorator
class ErrorResponse(BaseModel):
error_id: str
timestamp: str
status_code: int
message: str
details: Optional[str] = None