from fastapi import HTTPException, Request from loguru import logger import json import redis from typing import Optional, List, Union from pydantic import BaseModel, ConfigDict, field_validator from datetime import datetime, timedelta import time from config import settings from langfuse import Langfuse from llm.models import JiraWebhookPayload, AnalysisFlags from llm.chains import analysis_chain, validate_response class BadRequestError(HTTPException): def __init__(self, detail: str): super().__init__(status_code=400, detail=detail) class RateLimitError(HTTPException): def __init__(self, detail: str): super().__init__(status_code=429, detail=detail) class ValidationError(HTTPException): def __init__(self, detail: str): super().__init__(status_code=422, detail=detail) class JiraWebhookHandler: def __init__(self): self.analysis_chain = analysis_chain self.redis = None self.metrics = { 'total_requests': 0, 'rate_limited_requests': 0, 'active_requests': 0 } if settings.redis.enabled: self.redis = redis.Redis.from_url(settings.redis.url) async def check_rate_limit(self, ip: str) -> bool: """Check if request is within rate limit using sliding window algorithm""" if not self.redis: return True current_time = time.time() window_size = settings.redis.rate_limit.window max_requests = settings.redis.rate_limit.max_requests # Remove old requests outside the window self.redis.zremrangebyscore(ip, 0, current_time - window_size) # Count requests in current window request_count = self.redis.zcard(ip) self.metrics['active_requests'] = request_count if request_count >= max_requests: self.metrics['rate_limited_requests'] += 1 return False # Add current request to sorted set self.redis.zadd(ip, {current_time: current_time}) self.redis.expire(ip, window_size) return True async def handle_webhook(self, payload: JiraWebhookPayload, request: Request): try: self.metrics['total_requests'] += 1 # Check rate limit if settings.redis.enabled: ip = request.client.host if not await self.check_rate_limit(ip): raise RateLimitError( f"Too many requests. Limit is {settings.redis.rate_limit.max_requests} " f"requests per {settings.redis.rate_limit.window} seconds" ) if not payload.issueKey: raise BadRequestError("Missing required field: issueKey") if not payload.summary: raise BadRequestError("Missing required field: summary") logger.bind( issue_key=payload.issueKey, timestamp=datetime.utcnow().isoformat() ).info("Received webhook") # Create Langfuse trace if enabled trace = None if settings.langfuse.enabled: trace = settings.langfuse_client.start_span( name="Jira Webhook", input=payload.dict(), metadata={ "trace_id": f"webhook-{payload.issueKey}", "issue_key": payload.issueKey, "timestamp": datetime.utcnow().isoformat() } ) llm_input = { "issueKey": payload.issueKey, "summary": payload.summary, "description": payload.description if payload.description else "No description provided.", "status": payload.status if payload.status else "Unknown", "labels": ", ".join(payload.labels) if payload.labels else "None", "assignee": payload.assignee if payload.assignee else "Unassigned", "updated": payload.updated if payload.updated else "Unknown", "comment": payload.comment if payload.comment else "No new comment provided." } # Create Langfuse span for LLM processing if enabled llm_span = None if settings.langfuse.enabled and trace: llm_span = trace.start_span( name="LLM Processing", input=llm_input, metadata={ "model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model } ) try: analysis_result = await self.analysis_chain.ainvoke(llm_input) # Update Langfuse span with output if enabled if settings.langfuse.enabled and llm_span: llm_span.update(output=analysis_result) llm_span.end() # Validate LLM response if not validate_response(analysis_result): logger.warning(f"Invalid LLM response format for {payload.issueKey}") analysis_result = { "hasMultipleEscalations": False, "customerSentiment": "neutral" } logger.debug(f"LLM Analysis Result for {payload.issueKey}: {json.dumps(analysis_result, indent=2)}") return {"status": "success", "analysis_flags": analysis_result} except Exception as e: logger.error(f"LLM processing failed for {payload.issueKey}: {str(e)}") # Log error to Langfuse if enabled if settings.langfuse.enabled and llm_span: llm_span.error(e) llm_span.end() return { "status": "error", "analysis_flags": { "hasMultipleEscalations": False, "customerSentiment": "neutral" }, "error": str(e) } except Exception as e: logger.error(f"Error processing webhook: {str(e)}") import traceback logger.error(f"Stack trace: {traceback.format_exc()}") # Log error to Langfuse if enabled if settings.langfuse.enabled and trace: trace.end(error=e) raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")