from fastapi import APIRouter, Depends, HTTPException from loguru import logger import json from typing import Optional, List, Union from sqlalchemy.orm import Session from pydantic import BaseModel, ConfigDict, field_validator from datetime import datetime import uuid from config import settings from langfuse import Langfuse from database.crud import create_analysis_record, get_analysis_record, update_analysis_record from llm.models import JiraWebhookPayload, AnalysisFlags from llm.chains import analysis_chain, validate_response from database.database import get_db_session webhook_router = APIRouter() class BadRequestError(HTTPException): def __init__(self, detail: str): super().__init__(status_code=400, detail=detail) class RateLimitError(HTTPException): def __init__(self, detail: str): super().__init__(status_code=429, detail=detail) class ValidationError(HTTPException): def __init__(self, detail: str): super().__init__(status_code=422, detail=detail) class ValidationError(HTTPException): def __init__(self, detail: str): super().__init__(status_code=422, detail=detail) class JiraWebhookHandler: def __init__(self): self.analysis_chain = analysis_chain async def handle_webhook(self, payload: JiraWebhookPayload, db: Session): try: if not payload.issueKey: raise BadRequestError("Missing required field: issueKey") if not payload.summary: raise BadRequestError("Missing required field: summary") # Check for existing analysis record existing_record = get_analysis_record(db, payload.issueKey) if existing_record: logger.info(f"Existing analysis record found for {payload.issueKey}. Skipping new analysis.") return {"status": "skipped", "analysis_flags": existing_record.analysis_result} # Create new analysis record with initial state new_record = create_analysis_record(db=db, payload=payload) update_analysis_record(db=db, record_id=new_record.id, status="processing") logger.bind( issue_key=payload.issueKey, record_id=new_record.id, timestamp=datetime.utcnow().isoformat() ).info(f"[{payload.issueKey}] Received webhook") # Create Langfuse trace if enabled trace = None if settings.langfuse.enabled: trace = settings.langfuse_client.start_span( name="Jira Webhook", input=payload.dict(), metadata={ "trace_id": f"webhook-{payload.issueKey}", "issue_key": payload.issueKey, "timestamp": datetime.utcnow().isoformat() } ) llm_input = { "issueKey": payload.issueKey, "summary": payload.summary, "description": payload.description if payload.description else "No description provided.", "status": payload.status if payload.status else "Unknown", "labels": ", ".join(payload.labels) if payload.labels else "None", "assignee": payload.assignee if payload.assignee else "Unassigned", "updated": payload.updated if payload.updated else "Unknown", "comment": payload.comment if payload.comment else "No new comment provided." } # Create Langfuse span for LLM processing if enabled llm_span = None if settings.langfuse.enabled and trace: llm_span = trace.start_span( name="LLM Processing", input=llm_input, metadata={ "model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model } ) try: raw_llm_response = await self.analysis_chain.ainvoke(llm_input) # Update Langfuse span with output if enabled if settings.langfuse.enabled and llm_span: llm_span.update(output=raw_llm_response) llm_span.end() # Validate LLM response try: # Validate using Pydantic model AnalysisFlags(**raw_llm_response) except Exception as e: logger.error(f"[{payload.issueKey}] Invalid LLM response structure: {str(e)}", exc_info=True) update_analysis_record( db=db, record_id=new_record.id, analysis_result={"hasMultipleEscalations": False, "customerSentiment": "neutral"}, raw_response=str(raw_llm_response), status="validation_failed" ) raise ValueError(f"Invalid LLM response format: {str(e)}") from e logger.debug(f"[{payload.issueKey}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}") # Update record with final results update_analysis_record( db=db, record_id=new_record.id, analysis_result=raw_llm_response, raw_response=str(raw_llm_response), # Store validated result as raw status="completed" ) return {"status": "success", "analysis_flags": raw_llm_response} except Exception as e: logger.error(f"[{payload.issueKey}] LLM processing failed: {str(e)}") # Log error to Langfuse if enabled if settings.langfuse.enabled and llm_span: llm_span.error(e) llm_span.end() update_analysis_record( db=db, record_id=new_record.id, status="failed", error_message=f"LLM processing failed: {str(e)}" ) error_id = str(uuid.uuid4()) logger.error(f"[{payload.issueKey}] Error ID: {error_id}") return { "status": "error", "error_id": error_id, "analysis_flags": { "hasMultipleEscalations": False, "customerSentiment": "neutral" }, "error": str(e) } except Exception as e: issue_key = payload.issueKey if payload.issueKey else "N/A" logger.error(f"[{issue_key}] Error processing webhook: {str(e)}") import traceback logger.error(f"[{issue_key}] Stack trace: {traceback.format_exc()}") # Log error to Langfuse if enabled if settings.langfuse.enabled and trace: trace.end(error=e) raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") # Initialize handler webhook_handler = JiraWebhookHandler() @webhook_router.post("/jira-webhook") async def jira_webhook_endpoint(payload: JiraWebhookPayload, db: Session = Depends(get_db_session)): """Jira webhook endpoint""" try: result = await webhook_handler.handle_webhook(payload, db) return result except HTTPException: raise except Exception as e: logger.error(f"Unexpected error in webhook endpoint: {str(e)}") raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")