187 lines
8.3 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from loguru import logger
import json
from typing import Optional, List, Union
from sqlalchemy.orm import Session
from pydantic import BaseModel, ConfigDict, field_validator
from datetime import datetime, timezone # Import timezone
import uuid
from config import settings
from langfuse import Langfuse
from database.crud import create_analysis_record, get_analysis_record, update_analysis_record
from llm.models import JiraWebhookPayload, AnalysisFlags
from llm.chains import analysis_chain, validate_response
from database.database import get_db_session
webhook_router = APIRouter()
class BadRequestError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=400, detail=detail)
class RateLimitError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=429, detail=detail)
class ValidationError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=422, detail=detail)
class ValidationError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=422, detail=detail)
class JiraWebhookHandler:
def __init__(self):
self.analysis_chain = analysis_chain
async def handle_webhook(self, payload: JiraWebhookPayload, db: Session):
try:
if not payload.issueKey:
raise BadRequestError("Missing required field: issueKey")
if not payload.summary:
raise BadRequestError("Missing required field: summary")
# Check for existing analysis record
existing_record = get_analysis_record(db, payload.issueKey)
if existing_record:
logger.info(f"Existing analysis record found for {payload.issueKey}. Skipping new analysis.")
return {"status": "skipped", "analysis_flags": existing_record.analysis_result}
# Create new analysis record with initial state
new_record = create_analysis_record(db=db, payload=payload)
update_analysis_record(db=db, record_id=new_record.id, status="processing")
logger.bind(
issue_key=payload.issueKey,
record_id=new_record.id,
timestamp=datetime.now(timezone.utc).isoformat()
).info(f"[{payload.issueKey}] Received webhook")
# Create Langfuse trace if enabled
trace = None
if settings.langfuse.enabled:
trace = settings.langfuse_client.start_span( # Use start_span
name="Jira Webhook",
input=payload.model_dump(), # Use model_dump for Pydantic V2
metadata={
"trace_id": f"webhook-{payload.issueKey}",
"issue_key": payload.issueKey,
"timestamp": datetime.now(timezone.utc).isoformat()
}
)
llm_input = {
"issueKey": payload.issueKey,
"summary": payload.summary,
"description": payload.description if payload.description else "No description provided.",
"status": payload.status if payload.status else "Unknown",
"labels": ", ".join(payload.labels) if payload.labels else "None",
"assignee": payload.assignee if payload.assignee else "Unassigned",
"updated": payload.updated if payload.updated else "Unknown",
"comment": payload.comment if payload.comment else "No new comment provided."
}
# Create Langfuse span for LLM processing if enabled
llm_span = None
if settings.langfuse.enabled and trace:
llm_span = trace.start_span(
name="LLM Processing",
input=llm_input,
metadata={
"model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model
}
)
try:
raw_llm_response = await self.analysis_chain.ainvoke(llm_input)
# Update Langfuse span with output if enabled
if settings.langfuse.enabled and llm_span:
llm_span.update(output=raw_llm_response)
llm_span.end()
# Validate LLM response
try:
# Validate using Pydantic model, extracting only relevant fields
AnalysisFlags(
hasMultipleEscalations=raw_llm_response.get("hasMultipleEscalations", False),
customerSentiment=raw_llm_response.get("customerSentiment", "neutral")
)
except Exception as e:
logger.error(f"[{payload.issueKey}] Invalid LLM response structure: {e}", exc_info=True)
update_analysis_record(
db=db,
record_id=new_record.id,
analysis_result={"hasMultipleEscalations": False, "customerSentiment": "neutral"},
raw_response=json.dumps(raw_llm_response), # Store as JSON string
status="validation_failed"
)
raise ValueError(f"Invalid LLM response format: {e}") from e
logger.debug(f"[{payload.issueKey}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}")
# Update record with final results
update_analysis_record(
db=db,
record_id=new_record.id,
analysis_result=raw_llm_response,
raw_response=str(raw_llm_response), # Store validated result as raw
status="completed"
)
return {"status": "success", "analysis_flags": raw_llm_response}
except Exception as e:
logger.error(f"[{payload.issueKey}] LLM processing failed: {str(e)}")
# Log error to Langfuse if enabled
if settings.langfuse.enabled and llm_span:
llm_span.end(status_message=str(e), status="ERROR")
update_analysis_record(
db=db,
record_id=new_record.id,
status="failed",
error_message=f"LLM processing failed: {str(e)}"
)
error_id = str(uuid.uuid4())
logger.error(f"[{payload.issueKey}] Error ID: {error_id}")
return {
"status": "error",
"error_id": error_id,
"analysis_flags": {
"hasMultipleEscalations": False,
"customerSentiment": "neutral"
},
"error": str(e)
}
except Exception as e:
issue_key = payload.issueKey if payload.issueKey else "N/A"
logger.error(f"[{issue_key}] Error processing webhook: {str(e)}")
import traceback
logger.error(f"[{issue_key}] Stack trace: {traceback.format_exc()}")
# Log error to Langfuse if enabled
if settings.langfuse.enabled and trace:
trace.end(status_message=str(e), status="ERROR")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
# Initialize handler
webhook_handler = JiraWebhookHandler()
@webhook_router.post("/api/jira-webhook")
async def jira_webhook_endpoint(payload: JiraWebhookPayload, db: Session = Depends(get_db_session)):
"""Jira webhook endpoint"""
try:
result = await webhook_handler.handle_webhook(payload, db)
return result
except ValidationError as e:
raise
except BadRequestError as e:
raise ValidationError(detail=e.detail)
except Exception as e:
logger.error(f"Unexpected error in webhook endpoint: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")