# Standard library imports import json import os import sys import time import asyncio import signal import uuid from datetime import datetime, timezone from typing import Dict, Optional from http import HTTPStatus from functools import partial, wraps from contextlib import asynccontextmanager # Third-party imports from dotenv import load_dotenv load_dotenv() from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel from loguru import logger from langfuse import Langfuse # Import Langfuse from langfuse.langchain import CallbackHandler # Import CallbackHandler # Local application imports from shared_store import RequestStatus, requests_queue, ProcessingRequest from llm.models import JiraWebhookPayload from llm.chains import analysis_chain, validate_response from app.handlers import jira_router, queue_router # Import new routers from config import settings # Initialize Langfuse client globally langfuse_client = None if settings.langfuse.enabled: langfuse_client = Langfuse( public_key=settings.langfuse.public_key, secret_key=settings.langfuse.secret_key, host=settings.langfuse.host ) logger.info("Langfuse client initialized.") else: logger.info("Langfuse integration is disabled.") async def process_single_jira_request(request: ProcessingRequest): """Processes a single Jira webhook request using the LLM.""" payload = JiraWebhookPayload.model_validate(request.payload) # Initialize Langfuse callback handler for this trace langfuse_handler = None if langfuse_client: langfuse_handler = CallbackHandler() # No arguments needed for constructor logger.bind( issue_key=payload.issueKey, request_id=request.id, timestamp=datetime.now(timezone.utc).isoformat() ).info(f"[{payload.issueKey}] Processing webhook request.") llm_input = { "issueKey": payload.issueKey, "summary": payload.summary, "description": payload.description if payload.description else "No description provided.", "status": payload.status if payload.status else "Unknown", "labels": ", ".join(payload.labels) if payload.labels else "None", "assignee": payload.assignee if payload.assignee else "Unassigned", "updated": payload.updated if payload.updated else "Unknown", "comment": payload.comment if payload.comment else "No new comment provided." } try: # Pass the Langfuse callback handler to the ainvoke method raw_llm_response = await analysis_chain.ainvoke( llm_input, config={ "callbacks": [langfuse_handler], "callbacks_extra": { "session_id": str(request.id), "trace_name": f"Jira-Analysis-{payload.issueKey}" } } if langfuse_handler else {} ) # Store the raw LLM response request.response = raw_llm_response if not validate_response(raw_llm_response, payload.issueKey): # Pass issueKey for logging error_msg = f"Invalid LLM response structure: {raw_llm_response}" logger.error(f"[{payload.issueKey}] {error_msg}") raise ValueError(error_msg) logger.debug(f"[{payload.issueKey}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}") logger.info(f"[{payload.issueKey}] Successfully processed request {request.id}.") except Exception as e: logger.error(f"[{payload.issueKey}] LLM processing failed: {str(e)}") request.status = RequestStatus.FAILED request.error = str(e) raise finally: if langfuse_handler: langfuse_client.flush() # Ensure all traces are sent logger.debug(f"[{payload.issueKey}] Langfuse client flushed.") @asynccontextmanager async def lifespan(app: FastAPI): """Starts background processing loop with database integration""" async def processing_loop(): while True: request = None try: request = requests_queue.get_next_request() if request: try: request.status = RequestStatus.PROCESSING request.started_at = datetime.now(timezone.utc) # Process request await process_single_jira_request(request) request.status = RequestStatus.COMPLETED request.completed_at = datetime.now(timezone.utc) except Exception as e: request.status = RequestStatus.FAILED request.error = str(e) request.completed_at = datetime.now(timezone.utc) request.retry_count += 1 if request.retry_count < settings.processor.max_retries: retry_delay = min( settings.processor.initial_retry_delay_seconds * (2 ** request.retry_count), 3600 ) logger.warning(f"Request {request.id} failed, will retry in {retry_delay}s") else: logger.error(f"Request {request.id} failed after {request.retry_count} attempts") finally: if request: requests_queue.task_done() except Exception as e: logger.error(f"Processing loop error: {str(e)}") await asyncio.sleep(settings.processor.poll_interval_seconds) task = asyncio.create_task(processing_loop()) try: logger.info("Application initialized with processing loop started") yield finally: # Ensure all tasks are done before cancelling the processing loop logger.info("Waiting for pending queue tasks to complete...") requests_queue.join() task.cancel() await task # Await the task to ensure it's fully cancelled and cleaned up logger.info("Processing loop terminated") def create_app(): """Factory function to create FastAPI app instance""" _app = FastAPI(lifespan=lifespan) # Include routers _app.include_router(jira_router) _app.include_router(queue_router) # Add health check endpoint @_app.get("/health") async def health_check(): return {"status": "healthy"} # Add error handling middleware @_app.middleware("http") async def error_handling_middleware(request: Request, call_next): request_id = str(uuid.uuid4()) logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}") try: response = await call_next(request) return response except HTTPException as e: logger.error(f"HTTP Error: {e.status_code} - {e.detail}") error_response = ErrorResponse( error_id=request_id, timestamp=datetime.now(timezone.utc).isoformat(), status_code=e.status_code, message=e.detail, details=str(e) ) return JSONResponse(status_code=e.status_code, content=error_response.model_dump()) except Exception as e: logger.error(f"Unexpected error: {str(e)}") error_response = ErrorResponse( error_id=request_id, timestamp=datetime.now(timezone.utc).isoformat(), status_code=500, message="Internal Server Error", details=str(e) ) return JSONResponse(status_code=500, content=error_response.model_dump()) return _app class ErrorResponse(BaseModel): error_id: str timestamp: str status_code: int message: str details: Optional[str] = None app = create_app() app = create_app()