211 lines
8.0 KiB
Python
211 lines
8.0 KiB
Python
# Standard library imports
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import asyncio
|
|
import signal
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, Optional
|
|
from http import HTTPStatus
|
|
from functools import partial, wraps
|
|
from contextlib import asynccontextmanager
|
|
|
|
# Third-party imports
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
from fastapi import FastAPI, Request, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
from pydantic import BaseModel
|
|
from loguru import logger
|
|
from langfuse import Langfuse # Import Langfuse
|
|
from langfuse.langchain import CallbackHandler # Import CallbackHandler
|
|
|
|
# Local application imports
|
|
from shared_store import RequestStatus, requests_queue, ProcessingRequest
|
|
from llm.models import JiraWebhookPayload
|
|
from llm.chains import analysis_chain, validate_response
|
|
from app.handlers import jira_router, queue_router # Import new routers
|
|
from config import settings
|
|
|
|
# Initialize Langfuse client globally
|
|
langfuse_client = None
|
|
if settings.langfuse.enabled:
|
|
langfuse_client = Langfuse(
|
|
public_key=settings.langfuse.public_key,
|
|
secret_key=settings.langfuse.secret_key,
|
|
host=settings.langfuse.host
|
|
)
|
|
logger.info("Langfuse client initialized.")
|
|
else:
|
|
logger.info("Langfuse integration is disabled.")
|
|
|
|
async def process_single_jira_request(request: ProcessingRequest):
|
|
"""Processes a single Jira webhook request using the LLM."""
|
|
payload = JiraWebhookPayload.model_validate(request.payload)
|
|
|
|
# Initialize Langfuse callback handler for this trace
|
|
langfuse_handler = None
|
|
if langfuse_client:
|
|
langfuse_handler = CallbackHandler() # No arguments needed for constructor
|
|
|
|
logger.bind(
|
|
issue_key=payload.issueKey,
|
|
request_id=request.id,
|
|
timestamp=datetime.now(timezone.utc).isoformat()
|
|
).info(f"[{payload.issueKey}] Processing webhook request.")
|
|
|
|
llm_input = {
|
|
"issueKey": payload.issueKey,
|
|
"summary": payload.summary,
|
|
"description": payload.description if payload.description else "No description provided.",
|
|
"status": payload.status if payload.status else "Unknown",
|
|
"labels": ", ".join(payload.labels) if payload.labels else "None",
|
|
"assignee": payload.assignee if payload.assignee else "Unassigned",
|
|
"updated": payload.updated if payload.updated else "Unknown",
|
|
"comment": payload.comment if payload.comment else "No new comment provided."
|
|
}
|
|
|
|
try:
|
|
# Pass the Langfuse callback handler to the ainvoke method
|
|
raw_llm_response = await analysis_chain.ainvoke(
|
|
llm_input,
|
|
config={
|
|
"callbacks": [langfuse_handler],
|
|
"callbacks_extra": {
|
|
"session_id": str(request.id),
|
|
"trace_name": f"Jira-Analysis-{payload.issueKey}"
|
|
}
|
|
} if langfuse_handler else {}
|
|
)
|
|
|
|
# Store the raw LLM response
|
|
request.response = raw_llm_response
|
|
|
|
if not validate_response(raw_llm_response, payload.issueKey): # Pass issueKey for logging
|
|
error_msg = f"Invalid LLM response structure: {raw_llm_response}"
|
|
logger.error(f"[{payload.issueKey}] {error_msg}")
|
|
raise ValueError(error_msg)
|
|
|
|
logger.debug(f"[{payload.issueKey}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}")
|
|
|
|
logger.info(f"[{payload.issueKey}] Successfully processed request {request.id}.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[{payload.issueKey}] LLM processing failed: {str(e)}")
|
|
request.status = RequestStatus.FAILED
|
|
request.error = str(e)
|
|
raise
|
|
finally:
|
|
if langfuse_handler:
|
|
langfuse_client.flush() # Ensure all traces are sent
|
|
logger.debug(f"[{payload.issueKey}] Langfuse client flushed.")
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Starts background processing loop with database integration"""
|
|
|
|
async def processing_loop():
|
|
while True:
|
|
request = None
|
|
try:
|
|
request = requests_queue.get_next_request()
|
|
if request:
|
|
try:
|
|
request.status = RequestStatus.PROCESSING
|
|
request.started_at = datetime.now(timezone.utc)
|
|
|
|
# Process request
|
|
await process_single_jira_request(request)
|
|
|
|
request.status = RequestStatus.COMPLETED
|
|
request.completed_at = datetime.now(timezone.utc)
|
|
|
|
except Exception as e:
|
|
request.status = RequestStatus.FAILED
|
|
request.error = str(e)
|
|
request.completed_at = datetime.now(timezone.utc)
|
|
request.retry_count += 1
|
|
|
|
if request.retry_count < settings.processor.max_retries:
|
|
retry_delay = min(
|
|
settings.processor.initial_retry_delay_seconds * (2 ** request.retry_count),
|
|
3600
|
|
)
|
|
logger.warning(f"Request {request.id} failed, will retry in {retry_delay}s")
|
|
else:
|
|
logger.error(f"Request {request.id} failed after {request.retry_count} attempts")
|
|
finally:
|
|
if request:
|
|
requests_queue.task_done()
|
|
except Exception as e:
|
|
logger.error(f"Processing loop error: {str(e)}")
|
|
await asyncio.sleep(settings.processor.poll_interval_seconds)
|
|
|
|
task = asyncio.create_task(processing_loop())
|
|
try:
|
|
logger.info("Application initialized with processing loop started")
|
|
yield
|
|
finally:
|
|
# Ensure all tasks are done before cancelling the processing loop
|
|
logger.info("Waiting for pending queue tasks to complete...")
|
|
# requests_queue.join()
|
|
task.cancel()
|
|
await task # Await the task to ensure it's fully cancelled and cleaned up
|
|
logger.info("Processing loop terminated")
|
|
|
|
def create_app():
|
|
"""Factory function to create FastAPI app instance"""
|
|
_app = FastAPI(lifespan=lifespan)
|
|
|
|
# Include routers
|
|
_app.include_router(jira_router)
|
|
_app.include_router(queue_router)
|
|
|
|
# Add health check endpoint
|
|
@_app.get("/health")
|
|
async def health_check():
|
|
return {"status": "healthy"}
|
|
|
|
# Add error handling middleware
|
|
@_app.middleware("http")
|
|
async def error_handling_middleware(request: Request, call_next):
|
|
request_id = str(uuid.uuid4())
|
|
logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}")
|
|
|
|
try:
|
|
response = await call_next(request)
|
|
return response
|
|
except HTTPException as e:
|
|
logger.error(f"HTTP Error: {e.status_code} - {e.detail}")
|
|
error_response = ErrorResponse(
|
|
error_id=request_id,
|
|
timestamp=datetime.now(timezone.utc).isoformat(),
|
|
status_code=e.status_code,
|
|
message=e.detail,
|
|
details=str(e)
|
|
)
|
|
return JSONResponse(status_code=e.status_code, content=error_response.model_dump())
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error: {str(e)}")
|
|
error_response = ErrorResponse(
|
|
error_id=request_id,
|
|
timestamp=datetime.now(timezone.utc).isoformat(),
|
|
status_code=500,
|
|
message="Internal Server Error",
|
|
details=str(e)
|
|
)
|
|
return JSONResponse(status_code=500, content=error_response.model_dump())
|
|
|
|
return _app
|
|
|
|
class ErrorResponse(BaseModel):
|
|
error_id: str
|
|
timestamp: str
|
|
status_code: int
|
|
message: str
|
|
details: Optional[str] = None
|
|
|
|
app = create_app()
|
|
app = create_app() |