jira-webhook-llm/jira_webhook_llm.py

174 lines
6.5 KiB
Python

# Standard library imports
import json
import os
import sys
import time
import asyncio
import signal
import uuid
from datetime import datetime, timezone
from typing import Dict, Optional
from http import HTTPStatus
from functools import partial, wraps
from contextlib import asynccontextmanager
# Third-party imports
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from loguru import logger
# Local application imports
from shared_store import RequestStatus, requests_queue, ProcessingRequest
from llm.models import JiraWebhookPayload
from llm.chains import analysis_chain, validate_response
from app.handlers import router, webhook_router # Import from unified handlers
from config import settings
async def process_single_jira_request(request: ProcessingRequest):
"""Processes a single Jira webhook request using the LLM."""
payload = JiraWebhookPayload.model_validate(request.payload)
logger.bind(
issue_key=payload.issueKey,
request_id=request.id,
timestamp=datetime.now(timezone.utc).isoformat()
).info(f"[{payload.issueKey}] Processing webhook request.")
llm_input = {
"issueKey": payload.issueKey,
"summary": payload.summary,
"description": payload.description if payload.description else "No description provided.",
"status": payload.status if payload.status else "Unknown",
"labels": ", ".join(payload.labels) if payload.labels else "None",
"assignee": payload.assignee if payload.assignee else "Unassigned",
"updated": payload.updated if payload.updated else "Unknown",
"comment": payload.comment if payload.comment else "No new comment provided."
}
try:
raw_llm_response = await analysis_chain.ainvoke(llm_input)
# Store the raw LLM response
request.response = raw_llm_response
if not validate_response(raw_llm_response, payload.issueKey): # Pass issueKey for logging
error_msg = f"Invalid LLM response structure: {raw_llm_response}"
logger.error(f"[{payload.issueKey}] {error_msg}")
raise ValueError(error_msg)
logger.debug(f"[{payload.issueKey}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}")
logger.info(f"[{payload.issueKey}] Successfully processed request {request.id}.")
except Exception as e:
logger.error(f"[{payload.issueKey}] LLM processing failed: {str(e)}")
request.status = RequestStatus.FAILED
request.error = str(e)
raise
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Starts background processing loop with database integration"""
async def processing_loop():
while True:
request = None
try:
request = requests_queue.get_next_request()
if request:
try:
request.status = RequestStatus.PROCESSING
request.started_at = datetime.now(timezone.utc)
# Process request
await process_single_jira_request(request)
request.status = RequestStatus.COMPLETED
request.completed_at = datetime.now(timezone.utc)
except Exception as e:
request.status = RequestStatus.FAILED
request.error = str(e)
request.completed_at = datetime.now(timezone.utc)
request.retry_count += 1
if request.retry_count < settings.processor.max_retries:
retry_delay = min(
settings.processor.initial_retry_delay_seconds * (2 ** request.retry_count),
3600
)
logger.warning(f"Request {request.id} failed, will retry in {retry_delay}s")
else:
logger.error(f"Request {request.id} failed after {request.retry_count} attempts")
finally:
if request:
requests_queue.task_done()
except Exception as e:
logger.error(f"Processing loop error: {str(e)}")
await asyncio.sleep(settings.processor.poll_interval_seconds)
task = asyncio.create_task(processing_loop())
try:
logger.info("Application initialized with processing loop started")
yield
finally:
task.cancel()
logger.info("Processing loop terminated")
def create_app():
"""Factory function to create FastAPI app instance"""
_app = FastAPI(lifespan=lifespan)
# Include routers
_app.include_router(webhook_router)
_app.include_router(router)
# Add health check endpoint
@_app.get("/health")
async def health_check():
return {"status": "healthy"}
# Add error handling middleware
@_app.middleware("http")
async def error_handling_middleware(request: Request, call_next):
request_id = str(uuid.uuid4())
logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}")
try:
response = await call_next(request)
return response
except HTTPException as e:
logger.error(f"HTTP Error: {e.status_code} - {e.detail}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.now(timezone.utc).isoformat(),
status_code=e.status_code,
message=e.detail,
details=str(e)
)
return JSONResponse(status_code=e.status_code, content=error_response.model_dump())
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.now(timezone.utc).isoformat(),
status_code=500,
message="Internal Server Error",
details=str(e)
)
return JSONResponse(status_code=500, content=error_response.model_dump())
return _app
class ErrorResponse(BaseModel):
error_id: str
timestamp: str
status_code: int
message: str
details: Optional[str] = None
app = create_app()