165 lines
6.6 KiB
Python
165 lines
6.6 KiB
Python
from fastapi import HTTPException, Request
|
|
from loguru import logger
|
|
import json
|
|
import redis
|
|
from typing import Optional, List, Union
|
|
from pydantic import BaseModel, ConfigDict, field_validator
|
|
from datetime import datetime, timedelta
|
|
import time
|
|
|
|
from config import settings
|
|
from langfuse import Langfuse
|
|
from llm.models import JiraWebhookPayload, AnalysisFlags
|
|
from llm.chains import analysis_chain, validate_response
|
|
|
|
class BadRequestError(HTTPException):
|
|
def __init__(self, detail: str):
|
|
super().__init__(status_code=400, detail=detail)
|
|
|
|
class RateLimitError(HTTPException):
|
|
def __init__(self, detail: str):
|
|
super().__init__(status_code=429, detail=detail)
|
|
|
|
class ValidationError(HTTPException):
|
|
def __init__(self, detail: str):
|
|
super().__init__(status_code=422, detail=detail)
|
|
|
|
class JiraWebhookHandler:
|
|
def __init__(self):
|
|
self.analysis_chain = analysis_chain
|
|
self.redis = None
|
|
self.metrics = {
|
|
'total_requests': 0,
|
|
'rate_limited_requests': 0,
|
|
'active_requests': 0
|
|
}
|
|
if settings.redis.enabled:
|
|
self.redis = redis.Redis.from_url(settings.redis.url)
|
|
|
|
async def check_rate_limit(self, ip: str) -> bool:
|
|
"""Check if request is within rate limit using sliding window algorithm"""
|
|
if not self.redis:
|
|
return True
|
|
|
|
current_time = time.time()
|
|
window_size = settings.redis.rate_limit.window
|
|
max_requests = settings.redis.rate_limit.max_requests
|
|
|
|
# Remove old requests outside the window
|
|
self.redis.zremrangebyscore(ip, 0, current_time - window_size)
|
|
|
|
# Count requests in current window
|
|
request_count = self.redis.zcard(ip)
|
|
self.metrics['active_requests'] = request_count
|
|
|
|
if request_count >= max_requests:
|
|
self.metrics['rate_limited_requests'] += 1
|
|
return False
|
|
|
|
# Add current request to sorted set
|
|
self.redis.zadd(ip, {current_time: current_time})
|
|
self.redis.expire(ip, window_size)
|
|
return True
|
|
|
|
async def handle_webhook(self, payload: JiraWebhookPayload, request: Request):
|
|
try:
|
|
self.metrics['total_requests'] += 1
|
|
# Check rate limit
|
|
if settings.redis.enabled:
|
|
ip = request.client.host
|
|
if not await self.check_rate_limit(ip):
|
|
raise RateLimitError(
|
|
f"Too many requests. Limit is {settings.redis.rate_limit.max_requests} "
|
|
f"requests per {settings.redis.rate_limit.window} seconds"
|
|
)
|
|
if not payload.issueKey:
|
|
raise BadRequestError("Missing required field: issueKey")
|
|
|
|
if not payload.summary:
|
|
raise BadRequestError("Missing required field: summary")
|
|
|
|
logger.bind(
|
|
issue_key=payload.issueKey,
|
|
timestamp=datetime.utcnow().isoformat()
|
|
).info("Received webhook")
|
|
|
|
# Create Langfuse trace if enabled
|
|
trace = None
|
|
if settings.langfuse.enabled:
|
|
trace = settings.langfuse_client.start_span(
|
|
name="Jira Webhook",
|
|
input=payload.dict(),
|
|
metadata={
|
|
"trace_id": f"webhook-{payload.issueKey}",
|
|
"issue_key": payload.issueKey,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
)
|
|
|
|
llm_input = {
|
|
"issueKey": payload.issueKey,
|
|
"summary": payload.summary,
|
|
"description": payload.description if payload.description else "No description provided.",
|
|
"status": payload.status if payload.status else "Unknown",
|
|
"labels": ", ".join(payload.labels) if payload.labels else "None",
|
|
"assignee": payload.assignee if payload.assignee else "Unassigned",
|
|
"updated": payload.updated if payload.updated else "Unknown",
|
|
"comment": payload.comment if payload.comment else "No new comment provided."
|
|
}
|
|
|
|
# Create Langfuse span for LLM processing if enabled
|
|
llm_span = None
|
|
if settings.langfuse.enabled and trace:
|
|
llm_span = trace.start_span(
|
|
name="LLM Processing",
|
|
input=llm_input,
|
|
metadata={
|
|
"model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model
|
|
}
|
|
)
|
|
|
|
try:
|
|
analysis_result = await self.analysis_chain.ainvoke(llm_input)
|
|
|
|
# Update Langfuse span with output if enabled
|
|
if settings.langfuse.enabled and llm_span:
|
|
llm_span.update(output=analysis_result)
|
|
llm_span.end()
|
|
|
|
# Validate LLM response
|
|
if not validate_response(analysis_result):
|
|
logger.warning(f"Invalid LLM response format for {payload.issueKey}")
|
|
analysis_result = {
|
|
"hasMultipleEscalations": False,
|
|
"customerSentiment": "neutral"
|
|
}
|
|
|
|
logger.debug(f"LLM Analysis Result for {payload.issueKey}: {json.dumps(analysis_result, indent=2)}")
|
|
return {"status": "success", "analysis_flags": analysis_result}
|
|
|
|
except Exception as e:
|
|
logger.error(f"LLM processing failed for {payload.issueKey}: {str(e)}")
|
|
|
|
# Log error to Langfuse if enabled
|
|
if settings.langfuse.enabled and llm_span:
|
|
llm_span.error(e)
|
|
llm_span.end()
|
|
return {
|
|
"status": "error",
|
|
"analysis_flags": {
|
|
"hasMultipleEscalations": False,
|
|
"customerSentiment": "neutral"
|
|
},
|
|
"error": str(e)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing webhook: {str(e)}")
|
|
import traceback
|
|
logger.error(f"Stack trace: {traceback.format_exc()}")
|
|
|
|
# Log error to Langfuse if enabled
|
|
if settings.langfuse.enabled and trace:
|
|
trace.end(error=e)
|
|
|
|
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") |