Ireneusz Bachanowicz d1fa9385e7
Some checks failed
CI/CD Pipeline / test (push) Has been cancelled
Release for qwen3:4b model
2025-08-01 18:15:57 +02:00

211 lines
8.0 KiB
Python

# Standard library imports
import json
import os
import sys
import time
import asyncio
import signal
import uuid
from datetime import datetime, timezone
from typing import Dict, Optional
from http import HTTPStatus
from functools import partial, wraps
from contextlib import asynccontextmanager
# Third-party imports
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from loguru import logger
from langfuse import Langfuse # Import Langfuse
from langfuse.langchain import CallbackHandler # Import CallbackHandler
# Local application imports
from shared_store import RequestStatus, requests_queue, ProcessingRequest
from llm.models import JiraWebhookPayload
from llm.chains import analysis_chain, validate_response
from app.handlers import jira_router, queue_router # Import new routers
from config import settings
# Initialize Langfuse client globally
langfuse_client = None
if settings.langfuse.enabled:
langfuse_client = Langfuse(
public_key=settings.langfuse.public_key,
secret_key=settings.langfuse.secret_key,
host=settings.langfuse.host
)
logger.info("Langfuse client initialized.")
else:
logger.info("Langfuse integration is disabled.")
async def process_single_jira_request(request: ProcessingRequest):
"""Processes a single Jira webhook request using the LLM."""
payload = JiraWebhookPayload.model_validate(request.payload)
# Initialize Langfuse callback handler for this trace
langfuse_handler = None
if langfuse_client:
langfuse_handler = CallbackHandler() # No arguments needed for constructor
logger.bind(
issue_key=payload.issueKey,
request_id=request.id,
timestamp=datetime.now(timezone.utc).isoformat()
).info(f"[{payload.issueKey}] Processing webhook request.")
llm_input = {
"issueKey": payload.issueKey,
"summary": payload.summary,
"description": payload.description if payload.description else "No description provided.",
"status": payload.status if payload.status else "Unknown",
"labels": ", ".join(payload.labels) if payload.labels else "None",
"assignee": payload.assignee if payload.assignee else "Unassigned",
"updated": payload.updated if payload.updated else "Unknown",
"comment": payload.comment if payload.comment else "No new comment provided."
}
try:
# Pass the Langfuse callback handler to the ainvoke method
raw_llm_response = await analysis_chain.ainvoke(
llm_input,
config={
"callbacks": [langfuse_handler],
"callbacks_extra": {
"session_id": str(request.id),
"trace_name": f"Jira-Analysis-{payload.issueKey}"
}
} if langfuse_handler else {}
)
# Store the raw LLM response
request.response = raw_llm_response
if not validate_response(raw_llm_response, payload.issueKey): # Pass issueKey for logging
error_msg = f"Invalid LLM response structure: {raw_llm_response}"
logger.error(f"[{payload.issueKey}] {error_msg}")
raise ValueError(error_msg)
logger.debug(f"[{payload.issueKey}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}")
logger.info(f"[{payload.issueKey}] Successfully processed request {request.id}.")
except Exception as e:
logger.error(f"[{payload.issueKey}] LLM processing failed: {str(e)}")
request.status = RequestStatus.FAILED
request.error = str(e)
raise
finally:
if langfuse_handler:
langfuse_client.flush() # Ensure all traces are sent
logger.debug(f"[{payload.issueKey}] Langfuse client flushed.")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Starts background processing loop with database integration"""
async def processing_loop():
while True:
request = None
try:
request = requests_queue.get_next_request()
if request:
try:
request.status = RequestStatus.PROCESSING
request.started_at = datetime.now(timezone.utc)
# Process request
await process_single_jira_request(request)
request.status = RequestStatus.COMPLETED
request.completed_at = datetime.now(timezone.utc)
except Exception as e:
request.status = RequestStatus.FAILED
request.error = str(e)
request.completed_at = datetime.now(timezone.utc)
request.retry_count += 1
if request.retry_count < settings.processor.max_retries:
retry_delay = min(
settings.processor.initial_retry_delay_seconds * (2 ** request.retry_count),
3600
)
logger.warning(f"Request {request.id} failed, will retry in {retry_delay}s")
else:
logger.error(f"Request {request.id} failed after {request.retry_count} attempts")
finally:
if request:
requests_queue.task_done()
except Exception as e:
logger.error(f"Processing loop error: {str(e)}")
await asyncio.sleep(settings.processor.poll_interval_seconds)
task = asyncio.create_task(processing_loop())
try:
logger.info("Application initialized with processing loop started")
yield
finally:
# Ensure all tasks are done before cancelling the processing loop
logger.info("Waiting for pending queue tasks to complete...")
# requests_queue.join()
task.cancel()
await task # Await the task to ensure it's fully cancelled and cleaned up
logger.info("Processing loop terminated")
def create_app():
"""Factory function to create FastAPI app instance"""
_app = FastAPI(lifespan=lifespan)
# Include routers
_app.include_router(jira_router)
_app.include_router(queue_router)
# Add health check endpoint
@_app.get("/health")
async def health_check():
return {"status": "healthy"}
# Add error handling middleware
@_app.middleware("http")
async def error_handling_middleware(request: Request, call_next):
request_id = str(uuid.uuid4())
logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}")
try:
response = await call_next(request)
return response
except HTTPException as e:
logger.error(f"HTTP Error: {e.status_code} - {e.detail}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.now(timezone.utc).isoformat(),
status_code=e.status_code,
message=e.detail,
details=str(e)
)
return JSONResponse(status_code=e.status_code, content=error_response.model_dump())
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.now(timezone.utc).isoformat(),
status_code=500,
message="Internal Server Error",
details=str(e)
)
return JSONResponse(status_code=500, content=error_response.model_dump())
return _app
class ErrorResponse(BaseModel):
error_id: str
timestamp: str
status_code: int
message: str
details: Optional[str] = None
app = create_app()
app = create_app()