Refactor LLM analysis chain and models; remove deprecated prompt files
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run

- Updated `chains.py` to streamline imports and improve error handling for LLM initialization.
- Modified `models.py` to enhance the `AnalysisFlags` model with field aliases and added datetime import.
- Deleted outdated prompt files (`jira_analysis_v1.0.0.txt`, `jira_analysis_v1.1.0.txt`, `jira_analysis_v1.2.0.txt`) to clean up the repository.
- Introduced a new prompt file `jira_analysis_v1.2.0.txt` with updated instructions for analysis.
- Removed `logging_config.py` and test files to simplify the codebase.
- Updated webhook handler to improve error handling and logging.
- Added a new shared store for managing processing requests in a thread-safe manner.
This commit is contained in:
Ireneusz Bachanowicz 2025-07-21 01:06:45 +02:00
parent a1bec4f674
commit 8c1ab79eeb
19 changed files with 245 additions and 1376 deletions

View File

@ -1,139 +1,45 @@
from datetime import datetime, timezone
from fastapi import APIRouter, Request, HTTPException, Depends from fastapi import APIRouter, Request, HTTPException, Depends
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from typing import Dict, Any from pydantic import BaseModel
import config from llm.models import JiraWebhookPayload
from llm.models import LLMResponse, JiraWebhookPayload, JiraAnalysisResponse from shared_store import requests_queue, ProcessingRequest
from database.database import get_db_session # Removed Session import here
from sqlalchemy.orm import Session # Added correct SQLAlchemy import
from database.crud import get_all_analysis_records, delete_all_analysis_records, get_analysis_by_id, create_analysis_record, get_pending_analysis_records, update_record_status
router = APIRouter( router = APIRouter(
prefix="/api", prefix="/api",
tags=["API"] tags=["API"]
) )
@router.post("/jira_webhook", status_code=201)
async def receive_jira_webhook(payload: JiraWebhookPayload):
"""Handle incoming Jira webhook and store request"""
request_id = requests_queue.add_request(payload.model_dump())
return {"request_id": request_id}
@router.get("/request") @router.get("/pending_requests")
async def get_analysis_records_endpoint(db: Session = Depends(get_db_session)): async def get_pending_requests():
"""Get analysis records""" """Return all pending requests"""
try: all_requests = requests_queue.get_all_requests()
records = get_all_analysis_records(db) pending = [req for req in all_requests if req.status == "pending"]
return JSONResponse( return {"requests": pending}
status_code=200,
content={"data": records}
)
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": str(e)}
)
@router.post("/request", status_code=201)
async def create_analysis_record_endpoint(
payload: JiraWebhookPayload,
db: Session = Depends(get_db_session)
):
"""Create a new Jira analysis record"""
try:
db_record = create_analysis_record(db, payload)
return JSONResponse(
status_code=201,
content={"message": "Record created successfully", "record_id": db_record.id}
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create record: {str(e)}")
@router.post("/test-llm") @router.delete("/requests/{request_id}")
async def test_llm_endpoint(db: Session = Depends(get_db_session)): async def delete_specific_request(request_id: int):
"""Test endpoint for LLM integration""" """Delete specific request by ID"""
try: if requests_queue.delete_request_by_id(request_id):
from llm.chains import llm return {"deleted": True}
test_prompt = "What is 1 + 1? Respond only with the number." raise HTTPException(status_code=404, detail="Request not found")
response = llm.invoke(test_prompt)
return {
"status": "success",
"message": "LLM integration test successful",
"response": str(response)
}
except Exception as e:
return JSONResponse(
status_code=500,
content={
"status": "error",
"message": f"LLM test failed: {str(e)}"
}
)
@router.delete("/request") @router.delete("/requests")
async def delete_analysis_records_endpoint(db: Session = Depends(get_db_session)): async def delete_all_requests():
"""Delete analysis records""" """Clear all requests"""
try: requests_queue.clear_all_requests()
deleted_count = delete_all_analysis_records(db) return {"status": "cleared"}
return JSONResponse(
status_code=200,
content={"message": f"Successfully deleted {deleted_count} records", "deleted_count": deleted_count}
)
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": str(e)})
@router.get("/request/{record_id}") @router.get("/requests/{request_id}/response")
async def get_analysis_record_endpoint(record_id: int, db: Session = Depends(get_db_session)): async def get_request_response(request_id: int):
"""Get specific analysis record by ID""" """Get response for specific request"""
record = get_analysis_by_id(db, record_id) matched_request = requests_queue.get_request_by_id(request_id)
if not record: if not matched_request:
raise HTTPException(status_code=404, detail="Analysis record not found") raise HTTPException(status_code=404, detail="Request not found")
return JiraAnalysisResponse.model_validate(record) return matched_request.response if matched_request.response else "No response yet"
@router.get("/queue/pending")
async def get_pending_queue_records_endpoint(db: Session = Depends(get_db_session)):
"""Get all pending or retrying analysis records."""
try:
records = get_pending_analysis_records(db)
# Convert records to serializable format
serialized_records = []
for record in records:
record_dict = JiraAnalysisResponse.model_validate(record).model_dump()
# Convert datetime fields to ISO format
record_dict["created_at"] = record_dict["created_at"].isoformat() if record_dict["created_at"] else None
record_dict["updated_at"] = record_dict["updated_at"].isoformat() if record_dict["updated_at"] else None
serialized_records.append(record_dict)
return JSONResponse(
status_code=200,
content={"data": serialized_records}
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}")
@router.post("/queue/{record_id}/retry", status_code=200)
async def retry_analysis_record_endpoint(record_id: int, db: Session = Depends(get_db_session)):
"""Manually trigger a retry for a failed, processing or validation_failed analysis record."""
db_record = get_analysis_by_id(db, record_id)
if not db_record:
raise HTTPException(status_code=404, detail="Analysis record not found")
if db_record.status not in ["processing", "failed", "validation_failed"]:
raise HTTPException(status_code=400, detail=f"Record status is '{db_record.status}'. Only 'failed', 'processing' or 'validation_failed' records can be retried.")
# Reset status to pending and clear error message for retry
updated_record = update_record_status(
db=db,
record_id=record_id,
status="pending",
error_message=None,
analysis_result=None,
raw_response=None,
next_retry_at=None # Reset retry time
)
if not updated_record:
raise HTTPException(status_code=500, detail="Failed to update record for retry.")
return JSONResponse(
status_code=200,
content={"message": f"Record {record_id} marked for retry.", "record_id": updated_record.id}
)

183
config.py
View File

@ -1,64 +1,21 @@
import os import os
import sys import sys
import traceback
from typing import Optional from typing import Optional
from pydantic_settings import BaseSettings from pydantic_settings import BaseSettings
from pydantic import field_validator, ConfigDict from pydantic import field_validator, ConfigDict
from loguru import logger
from watchfiles import watch, Change
from threading import Thread, Event
from langfuse import Langfuse
from langfuse.langchain import CallbackHandler
import yaml import yaml
from pathlib import Path from pathlib import Path
class LangfuseConfig(BaseSettings): class LangfuseConfig(BaseSettings):
enabled: bool = True enabled: bool = False
public_key: Optional[str] = None
secret_key: Optional[str] = None secret_key: Optional[str] = None
public_key: Optional[str] = None
host: Optional[str] = None host: Optional[str] = None
@field_validator('host')
def validate_host(cls, v):
if v and not v.startswith(('http://', 'https://')):
raise ValueError("Langfuse host must start with http:// or https://")
return v
def __init__(self, **data):
try:
logger.info("Initializing LangfuseConfig with data: {}", data)
logger.info("Environment variables:")
logger.info("LANGFUSE_PUBLIC_KEY: {}", os.getenv('LANGFUSE_PUBLIC_KEY'))
logger.info("LANGFUSE_SECRET_KEY: {}", os.getenv('LANGFUSE_SECRET_KEY'))
logger.info("LANGFUSE_HOST: {}", os.getenv('LANGFUSE_HOST'))
super().__init__(**data)
logger.info("LangfuseConfig initialized successfully")
logger.info("Public Key: {}", self.public_key)
logger.info("Secret Key: {}", self.secret_key)
logger.info("Host: {}", self.host)
except Exception as e:
logger.error("Failed to initialize LangfuseConfig: {}", e)
logger.error("Current environment variables:")
logger.error("LANGFUSE_PUBLIC_KEY: {}", os.getenv('LANGFUSE_PUBLIC_KEY'))
logger.error("LANGFUSE_SECRET_KEY: {}", os.getenv('LANGFUSE_SECRET_KEY'))
logger.error("LANGFUSE_HOST: {}", os.getenv('LANGFUSE_HOST'))
raise
model_config = ConfigDict( model_config = ConfigDict(
env_prefix='LANGFUSE_', env_prefix='LANGFUSE_',
env_file='.env', env_file='.env',
env_file_encoding='utf-8', env_file_encoding='utf-8',
extra='ignore',
env_nested_delimiter='__',
case_sensitive=True
)
class LogConfig(BaseSettings):
level: str = 'INFO'
model_config = ConfigDict(
env_prefix='LOG_',
extra='ignore' extra='ignore'
) )
@ -98,7 +55,7 @@ class ApiConfig(BaseSettings):
) )
class ProcessorConfig(BaseSettings): class ProcessorConfig(BaseSettings):
poll_interval_seconds: int = 30 poll_interval_seconds: int = 10
max_retries: int = 5 max_retries: int = 5
initial_retry_delay_seconds: int = 60 initial_retry_delay_seconds: int = 60
@ -110,154 +67,46 @@ class ProcessorConfig(BaseSettings):
) )
class Settings: class Settings:
logging_ready = Event() # Event to signal logging is configured
def __init__(self): def __init__(self):
try: try:
logger.info("Loading configuration from application.yml and environment variables")
# Load configuration from YAML file # Load configuration from YAML file
yaml_config = self._load_yaml_config() yaml_config = self._load_yaml_config()
logger.info("Loaded YAML config: {}", yaml_config)
# Initialize configurations, allowing environment variables to override YAML # Initialize configurations
logger.info("Initializing LogConfig")
self.log = LogConfig(**yaml_config.get('log', {}))
logger.info("LogConfig initialized: {}", self.log.model_dump())
logger.info("Initializing LLMConfig")
self.llm = LLMConfig(**yaml_config.get('llm', {})) self.llm = LLMConfig(**yaml_config.get('llm', {}))
logger.info("LLMConfig initialized: {}", self.llm.model_dump())
logger.info("Initializing LangfuseConfig")
self.langfuse = LangfuseConfig(**yaml_config.get('langfuse', {}))
logger.info("LangfuseConfig initialized: {}", self.langfuse.model_dump())
logger.info("Initializing ApiConfig")
self.api = ApiConfig(**yaml_config.get('api', {})) self.api = ApiConfig(**yaml_config.get('api', {}))
logger.info("ApiConfig initialized: {}", self.api.model_dump())
logger.info("Initializing ProcessorConfig")
self.processor = ProcessorConfig(**yaml_config.get('processor', {})) self.processor = ProcessorConfig(**yaml_config.get('processor', {}))
logger.info("ProcessorConfig initialized: {}", self.processor.model_dump()) self.langfuse = LangfuseConfig(**yaml_config.get('langfuse', {}))
logger.info("Validating configuration")
self._validate() self._validate()
logger.info("Starting config watcher")
self._start_watcher()
logger.info("Initializing Langfuse")
self._init_langfuse()
logger.info("Configuration initialized successfully")
except Exception as e: except Exception as e:
logger.error("Configuration initialization failed: {}", e) print(f"Configuration initialization failed: {e}")
logger.error("Current configuration state:") sys.exit(1)
logger.error("LogConfig: {}", self.log.model_dump() if hasattr(self, 'log') else 'Not initialized')
logger.error("LLMConfig: {}", self.llm.model_dump() if hasattr(self, 'llm') else 'Not initialized')
logger.error("LangfuseConfig: {}", self.langfuse.model_dump() if hasattr(self, 'langfuse') else 'Not initialized')
logger.error("ProcessorConfig: {}", self.processor.model_dump() if hasattr(self, 'processor') else 'Not initialized')
raise
def _load_yaml_config(self): def _load_yaml_config(self):
config_path = Path('config/application.yml') config_path = Path('config/application.yml')
if not config_path.exists(): if not config_path.exists():
logger.warning("Configuration file not found at {}", config_path)
return {} return {}
try: try:
with open(config_path, 'r') as f: with open(config_path, 'r') as f:
return yaml.safe_load(f) or {} return yaml.safe_load(f) or {}
except Exception as e: except Exception as e:
logger.error("Error loading configuration from {}: {}", config_path, e)
return {} return {}
def _validate(self): def _validate(self):
logger.info("LLM mode set to: '{}'", self.llm.mode)
if self.llm.mode == 'openai': if self.llm.mode == 'openai':
if not self.llm.openai_api_key: if not self.llm.openai_api_key:
raise ValueError("LLM mode is 'openai', but OPENAI_API_KEY is not set.") raise ValueError("OPENAI_API_KEY is not set.")
if not self.llm.openai_api_base_url: if not self.llm.openai_api_base_url:
raise ValueError("LLM mode is 'openai', but OPENAI_API_BASE_URL is not set.") raise ValueError("OPENAI_API_BASE_URL is not set.")
if not self.llm.openai_model: if not self.llm.openai_model:
raise ValueError("LLM mode is 'openai', but OPENAI_MODEL is not set.") raise ValueError("OPENAI_MODEL is not set.")
elif self.llm.mode == 'ollama': elif self.llm.mode == 'ollama':
if not self.llm.ollama_base_url: if not self.llm.ollama_base_url:
raise ValueError("LLM mode is 'ollama', but OLLAMA_BASE_URL is not set.") raise ValueError("OLLAMA_BASE_URL is not set.")
if not self.llm.ollama_model: if not self.llm.ollama_model:
raise ValueError("LLM mode is 'ollama', but OLLAMA_MODEL is not set.") raise ValueError("OLLAMA_MODEL is not set.")
logger.info("Configuration validated successfully.")
def _init_langfuse(self):
if self.langfuse.enabled:
try:
# Verify all required credentials are present
if not all([self.langfuse.public_key, self.langfuse.secret_key, self.langfuse.host]):
raise ValueError("Missing required Langfuse credentials")
logger.debug("Initializing Langfuse client with:")
logger.debug("Public Key: {}", self.langfuse.public_key)
logger.debug("Secret Key: {}", self.langfuse.secret_key)
logger.debug("Host: {}", self.langfuse.host)
# Initialize Langfuse client
self.langfuse_client = Langfuse(
public_key=self.langfuse.public_key,
secret_key=self.langfuse.secret_key,
host=self.langfuse.host
)
# Test Langfuse connection
try:
self.langfuse_client.auth_check()
logger.debug("Langfuse connection test successful")
except Exception as e:
logger.error("Langfuse connection test failed: {}", e)
raise
# Initialize CallbackHandler with debug logging
logger.debug("Langfuse client attributes: {}", vars(self.langfuse_client))
try:
self.langfuse_handler = CallbackHandler()
logger.debug("CallbackHandler initialized successfully")
except Exception as e:
logger.error("CallbackHandler initialization failed: {}", e)
raise
logger.info("Langfuse client and handler initialized successfully")
except ValueError as e:
logger.warning("Langfuse configuration error: {}. Disabling Langfuse.", e)
self.langfuse.enabled = False
except Exception as e:
logger.error("Failed to initialize Langfuse: {}", e)
self.langfuse.enabled = False
def _start_watcher(self): # Create settings instance
def watch_config(): settings = Settings()
# Wait for logging to be fully configured
self.logging_ready.wait()
for changes in watch('config/application.yml'):
for change in changes:
if change[0] == Change.modified:
logger.info("Configuration file modified, reloading settings...")
try:
# Reload YAML config and re-initialize all settings
yaml_config = self._load_yaml_config()
self.log = LogConfig(**yaml_config.get('log', {}))
self.llm = LLMConfig(**yaml_config.get('llm', {}))
self.langfuse = LangfuseConfig(**yaml_config.get('langfuse', {}))
self.api = ApiConfig(**yaml_config.get('api', {}))
self.processor = ProcessorConfig(**yaml_config.get('processor', {}))
self._validate()
self._init_langfuse() # Re-initialize Langfuse client if needed
logger.info("Configuration reloaded successfully")
except Exception as e:
logger.error("Error reloading configuration: {}", e)
Thread(target=watch_config, daemon=True).start()
# Create a single, validated instance of the settings to be imported by other modules.
try:
settings = Settings()
except ValueError as e:
logger.error("FATAL: {}", e)
logger.error("Application shutting down due to configuration error.")
sys.exit(1)

View File

@ -1,101 +0,0 @@
from loguru import logger
from sqlalchemy.orm import Session
from datetime import datetime, timezone
import json
from typing import Dict, Any, Optional
from database.models import JiraAnalysis
from llm.models import JiraWebhookPayload
def create_analysis_record(db: Session, payload: JiraWebhookPayload) -> JiraAnalysis:
"""Creates a new Jira analysis record in the database."""
db_analysis = JiraAnalysis(
issue_key=payload.issueKey,
status="pending",
issue_summary=payload.summary,
request_payload=payload.model_dump(),
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
retry_count=0,
last_processed_at=None,
next_retry_at=None
)
db.add(db_analysis)
db.commit()
db.refresh(db_analysis)
return db_analysis
def get_analysis_record(db: Session, issue_key: str) -> Optional[JiraAnalysis]:
"""Retrieves the latest analysis record for a given Jira issue key."""
logger.debug(f"Attempting to retrieve analysis record for issue key: {issue_key}")
record = db.query(JiraAnalysis).filter(JiraAnalysis.issue_key == issue_key).order_by(JiraAnalysis.created_at.desc()).first()
if record:
logger.debug(f"Found analysis record for {issue_key}: {record.id}")
else:
logger.debug(f"No analysis record found for {issue_key}")
return record
def update_record_status(
db: Session,
record_id: int,
status: str,
analysis_result: Optional[Dict[str, Any]] = None,
error_message: Optional[str] = None,
raw_response: Optional[Dict[str, Any]] = None,
retry_count_increment: int = 0,
last_processed_at: Optional[datetime] = None,
next_retry_at: Optional[datetime] = None
) -> Optional[JiraAnalysis]:
"""Updates an existing Jira analysis record."""
db_analysis = db.query(JiraAnalysis).filter(JiraAnalysis.id == record_id).first()
if db_analysis:
db_analysis.status = status
db_analysis.updated_at = datetime.now(timezone.utc)
# Only update if not None, allowing explicit None to clear values
# Always update these fields if provided, allowing explicit None to clear them
db_analysis.analysis_result = analysis_result
db_analysis.error_message = error_message
db_analysis.raw_response = json.dumps(raw_response) if raw_response is not None else None
if retry_count_increment > 0:
db_analysis.retry_count += retry_count_increment
db_analysis.last_processed_at = last_processed_at
db_analysis.next_retry_at = next_retry_at
# When status is set to "pending", clear relevant fields for retry
if status == "pending":
db_analysis.analysis_result = None
db_analysis.error_message = None
db_analysis.raw_response = None
db_analysis.next_retry_at = None
db.commit()
db.refresh(db_analysis)
return db_analysis
def get_pending_analysis_records(db: Session) -> list[JiraAnalysis]:
"""Retrieves all pending or retrying analysis records that are ready for processing."""
now = datetime.now(timezone.utc)
return db.query(JiraAnalysis).filter(
(JiraAnalysis.status == "pending") |
((JiraAnalysis.status == "retrying") & (JiraAnalysis.next_retry_at <= now))
).order_by(JiraAnalysis.created_at.asc()).all()
def get_all_analysis_records(db: Session) -> list[JiraAnalysis]:
"""Retrieves all analysis records from the database."""
return db.query(JiraAnalysis).all()
def get_analysis_by_id(db: Session, record_id: int) -> Optional[JiraAnalysis]:
"""Retrieves an analysis record by its unique database ID."""
return db.query(JiraAnalysis).filter(JiraAnalysis.id == record_id).first()
def delete_all_analysis_records(db: Session) -> int:
"""Deletes all analysis records from the database and returns count of deleted records."""
count = db.query(JiraAnalysis).count()
db.query(JiraAnalysis).delete()
db.commit()
return count
db.query(JiraAnalysis).delete()
db.commit()
return count

View File

@ -1,35 +0,0 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.declarative import declarative_base
from contextlib import contextmanager
from loguru import logger
from database.models import Base
DATABASE_URL = "sqlite:///./jira_analyses.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def init_db():
"""Initializes the database by creating all tables."""
logger.info("Initializing database...")
Base.metadata.create_all(bind=engine)
logger.info("Database initialized successfully.")
@contextmanager
def get_db():
"""Context manager to get a database session."""
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_db_session():
"""FastAPI dependency to get a database session."""
db = SessionLocal()
try:
yield db
finally:
db.close()

View File

@ -1,30 +0,0 @@
from sqlalchemy import Column, Integer, String, DateTime, Text, JSON
from datetime import datetime
from enum import Enum
class AnalysisFlags(str, Enum):
BUG = "bug"
FEATURE = "feature"
IMPROVEMENT = "improvement"
SUPPORT = "support"
OTHER = "other"
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class JiraAnalysis(Base):
__tablename__ = "jira_analyses"
id = Column(Integer, primary_key=True, index=True)
issue_key = Column(String, index=True, nullable=False)
status = Column(String, default="pending", nullable=False) # pending, processing, completed, failed
issue_summary = Column(Text, nullable=False)
request_payload = Column(JSON, nullable=False) # Store the original Jira webhook payload
analysis_result = Column(JSON, nullable=True) # Store the structured LLM output
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False)
error_message = Column(Text, nullable=True) # To store any error messages
raw_response = Column(JSON, nullable=True) # Store raw LLM response before validation
retry_count = Column(Integer, default=0, nullable=False)
last_processed_at = Column(DateTime, nullable=True)
next_retry_at = Column(DateTime, nullable=True)

Binary file not shown.

View File

@ -1,88 +1,42 @@
import os # Standard library imports
import json import json
import time import os
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from fastapi.responses import JSONResponse
from http import HTTPStatus
from loguru import logger
import uuid
import sys import sys
from typing import Optional import time
from datetime import datetime, timedelta, timezone
import asyncio import asyncio
from functools import partial, wraps
from sqlalchemy.orm import Session
from database.database import init_db, get_db, SessionLocal
from database.crud import get_analysis_record, update_record_status, create_analysis_record
from database.models import JiraAnalysis, AnalysisFlags
from llm.models import JiraWebhookPayload
from llm.chains import analysis_chain, validate_response
from api.handlers import router # Correct variable name
from webhooks.handlers import webhook_router
from database.crud import get_all_analysis_records, delete_all_analysis_records, get_analysis_by_id, get_analysis_record
from logging_config import configure_logging
# Initialize logging as early as possible
from config import settings
import signal import signal
import uuid
from datetime import datetime, timezone
from typing import Dict, Optional
from http import HTTPStatus
from functools import partial, wraps
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
cleanup_tasks = [] # Initialize cleanup_tasks globally # Third-party imports
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from loguru import logger
def calculate_next_retry_time(retry_count: int) -> datetime: # Local application imports
"""Calculates the next retry time using exponential backoff.""" from shared_store import RequestStatus, requests_queue, ProcessingRequest
delay = settings.processor.initial_retry_delay_seconds * (2 ** retry_count) from llm.models import JiraWebhookPayload
return datetime.now(timezone.utc) + timedelta(seconds=delay) from llm.chains import analysis_chain, validate_response
from api.handlers import router
from webhooks.handlers import webhook_router
from config import settings
def retry(max_retries: int = 3, initial_delay: float = 1.0): async def process_single_jira_request(request: ProcessingRequest):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for i in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
if i == max_retries:
logger.error(f"Function {func.__name__} failed after {max_retries} retries: {e}")
raise
delay = initial_delay * (2 ** i)
logger.warning(f"Function {func.__name__} failed, retrying in {delay:.2f} seconds (attempt {i+1}/{max_retries})...")
await asyncio.sleep(delay)
return wrapper
return decorator
async def process_single_jira_request(db: Session, record: JiraAnalysis):
"""Processes a single Jira webhook request using the LLM.""" """Processes a single Jira webhook request using the LLM."""
issue_key = record.issue_key payload = JiraWebhookPayload.model_validate(request.payload)
record_id = record.id
payload = JiraWebhookPayload.model_validate(record.request_payload)
logger.bind( logger.bind(
issue_key=issue_key, issue_key=payload.issueKey,
record_id=record_id, request_id=request.id,
timestamp=datetime.now(timezone.utc).isoformat() timestamp=datetime.now(timezone.utc).isoformat()
).info(f"[{issue_key}] Processing webhook request.") ).info(f"[{payload.issueKey}] Processing webhook request.")
# Create Langfuse trace if enabled
trace = None
if settings.langfuse.enabled:
trace = settings.langfuse_client.start_span(
name="Jira Webhook Processing",
input=payload.model_dump(),
metadata={
"trace_id": f"processor-{issue_key}-{record_id}",
"issue_key": issue_key,
"record_id": record_id,
"timestamp": datetime.now(timezone.utc).isoformat()
}
)
llm_input = { llm_input = {
"issueKey": payload.issueKey, "issueKey": payload.issueKey,
@ -95,234 +49,78 @@ async def process_single_jira_request(db: Session, record: JiraAnalysis):
"comment": payload.comment if payload.comment else "No new comment provided." "comment": payload.comment if payload.comment else "No new comment provided."
} }
llm_span = None
if settings.langfuse.enabled and trace:
llm_span = trace.start_span(
name="LLM Processing",
input=llm_input,
metadata={
"model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model
}
)
try: try:
raw_llm_response = await analysis_chain.ainvoke(llm_input) raw_llm_response = await analysis_chain.ainvoke(llm_input)
if settings.langfuse.enabled and llm_span:
llm_span.update(output=raw_llm_response)
llm_span.end()
# Validate response structure before processing
if not validate_response(raw_llm_response): if not validate_response(raw_llm_response):
error_msg = f"Invalid LLM response structure: {raw_llm_response}" error_msg = f"Invalid LLM response structure: {raw_llm_response}"
logger.error(f"[{issue_key}] {error_msg}") logger.error(f"[{payload.issueKey}] {error_msg}")
update_record_status(
db=db,
record_id=record_id,
analysis_result={"hasMultipleEscalations": False, "customerSentiment": "neutral"},
raw_response=json.dumps(raw_llm_response),
status="validation_failed",
error_message=error_msg,
last_processed_at=datetime.now(timezone.utc),
retry_count_increment=1,
next_retry_at=calculate_next_retry_time(record.retry_count + 1) if record.retry_count < settings.processor.max_retries else None
)
if settings.langfuse.enabled and trace:
trace.end()
raise ValueError(error_msg) raise ValueError(error_msg)
try: logger.debug(f"[{payload.issueKey}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}")
AnalysisFlags(
hasMultipleEscalations=raw_llm_response.get("hasMultipleEscalations", False), logger.info(f"[{payload.issueKey}] Successfully processed request {request.id}.")
customerSentiment=raw_llm_response.get("customerSentiment", "neutral")
)
except Exception as e:
logger.error(f"[{issue_key}] Invalid LLM response structure: {e}", exc_info=True)
update_record_status(
db=db,
record_id=record_id,
analysis_result={"hasMultipleEscalations": False, "customerSentiment": "neutral"},
raw_response=json.dumps(raw_llm_response),
status="validation_failed",
error_message=f"LLM response validation failed: {e}",
last_processed_at=datetime.now(timezone.utc),
retry_count_increment=1,
next_retry_at=calculate_next_retry_time(record.retry_count + 1) if record.retry_count < settings.processor.max_retries else None
)
if settings.langfuse.enabled and trace:
trace.end()
raise ValueError(f"Invalid LLM response format: {e}") from e
logger.debug(f"[{issue_key}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}")
update_record_status(
db=db,
record_id=record_id,
analysis_result=raw_llm_response,
raw_response=json.dumps(raw_llm_response),
status="completed",
last_processed_at=datetime.now(timezone.utc),
next_retry_at=None # No retry needed on success
)
if settings.langfuse.enabled and trace:
trace.end()
logger.info(f"[{issue_key}] Successfully processed and updated record {record_id}.")
except Exception as e: except Exception as e:
logger.error(f"[{issue_key}] LLM processing failed for record {record_id}: {str(e)}") logger.error(f"[{payload.issueKey}] LLM processing failed: {str(e)}")
if settings.langfuse.enabled and llm_span: request.status = RequestStatus.FAILED
llm_span.end() request.error = str(e)
raise
new_retry_count = record.retry_count + 1
new_status = "failed"
next_retry = None
if new_retry_count <= settings.processor.max_retries:
next_retry = calculate_next_retry_time(new_retry_count)
new_status = "retrying" # Indicate that it will be retried
update_record_status(
db=db,
record_id=record_id,
status=new_status,
error_message=f"LLM processing failed: {str(e)}",
last_processed_at=datetime.now(timezone.utc),
retry_count_increment=1,
next_retry_at=next_retry
)
if settings.langfuse.enabled and trace:
trace.end()
logger.error(f"[{issue_key}] Record {record_id} status updated to '{new_status}'. Retry count: {new_retry_count}")
async def main_processor_loop():
"""Main loop for the Jira webhook processor."""
logger.info("Starting Jira webhook processor.")
while True: # This loop will run indefinitely until the app shuts down
db: Session = SessionLocal() # Get a new session for each loop iteration
try:
# Fetch records that are 'pending' or 'retrying' and past their next_retry_at
# Order by created_at to process older requests first
pending_records = db.query(JiraAnalysis).filter(
(JiraAnalysis.status == "pending") |
((JiraAnalysis.status == "retrying") & (JiraAnalysis.next_retry_at <= datetime.now(timezone.utc)))
).order_by(JiraAnalysis.created_at.asc()).all()
if not pending_records:
logger.debug(f"No pending or retrying records found. Sleeping for {settings.processor.poll_interval_seconds} seconds.")
for record in pending_records:
# Update status to 'processing' immediately to prevent other workers from picking it up
update_record_status(db, record.id, "processing", last_processed_at=datetime.now(timezone.utc))
db.refresh(record) # Refresh to get the latest state
await process_single_jira_request(db, record)
except Exception as e:
logger.error(f"Error in main processor loop: {str(e)}", exc_info=True)
finally:
db.close() # Ensure the session is closed
await asyncio.sleep(settings.processor.poll_interval_seconds) # Use asyncio.sleep for non-blocking sleep
# Setup async-compatible signal handling
def handle_shutdown_signal(signum, loop):
"""Graceful shutdown signal handler"""
logger.info(f"Received signal {signum}, initiating shutdown...")
# Set shutdown flag and remove signal handlers to prevent reentrancy
if not hasattr(loop, '_shutdown'):
loop._shutdown = True
# Prevent further signal handling
for sig in (signal.SIGTERM, signal.SIGINT):
loop.remove_signal_handler(sig)
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
""" """Starts background processing loop with database integration"""
Context manager for managing the lifespan of the FastAPI application.
Initializes the database, sets up signal handlers, and handles cleanup.
"""
# Flag to track if initialization succeeded
init_success = False
async def processing_loop():
while True:
request = None
try:
request = requests_queue.get_next_request()
if request:
try:
request.status = RequestStatus.PROCESSING
request.started_at = datetime.now(timezone.utc)
# Process request
await process_single_jira_request(request)
request.status = RequestStatus.COMPLETED
request.completed_at = datetime.now(timezone.utc)
except Exception as e:
request.status = RequestStatus.FAILED
request.error = str(e)
request.completed_at = datetime.now(timezone.utc)
request.retry_count += 1
if request.retry_count < settings.processor.max_retries:
retry_delay = min(
settings.processor.initial_retry_delay_seconds * (2 ** request.retry_count),
3600
)
logger.warning(f"Request {request.id} failed, will retry in {retry_delay}s")
else:
logger.error(f"Request {request.id} failed after {request.retry_count} attempts")
finally:
if request:
requests_queue.task_done()
except Exception as e:
logger.error(f"Processing loop error: {str(e)}")
await asyncio.sleep(settings.processor.poll_interval_seconds)
task = asyncio.create_task(processing_loop())
try: try:
logger.info("Initializing application...") logger.info("Application initialized with processing loop started")
init_db() # Initialize the database
# Setup signal handlers
# Only set up signal handlers if not in a test environment
if os.getenv("IS_TEST_ENV") != "true":
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, partial(handle_shutdown_signal, sig, loop))
logger.info("Signal handlers configured successfully")
else:
logger.info("Skipping signal handler configuration in test environment.")
# Start the background processor task only if not in a test environment
processor_task = None
if os.getenv("IS_TEST_ENV") != "true":
processor_task = asyncio.create_task(main_processor_loop())
cleanup_tasks.append(processor_task)
logger.info("Background Jira processor started.")
else:
logger.info("Skipping background Jira processor in test environment.")
# Verify critical components
if not hasattr(settings, 'langfuse_handler'):
logger.error("Langfuse handler not found in settings")
raise RuntimeError("Langfuse handler not initialized")
logger.info("Application initialized successfully")
init_success = True
except Exception as e:
logger.critical(f"Application initialization failed: {str(e)}. Exiting.")
# Don't re-raise to allow finally block to execute cleanup
try:
# Yield control to the application
yield yield
finally: finally:
# Cleanup logic runs after application finishes task.cancel()
if init_success: logger.info("Processing loop terminated")
# Check shutdown flag before cleanup
loop = asyncio.get_running_loop()
if hasattr(loop, '_shutdown'):
logger.info("Shutdown initiated, starting cleanup...")
# Cancel the processor task
if processor_task:
logger.info("Cancelling background Jira processor task...")
processor_task.cancel()
try:
await processor_task
except asyncio.CancelledError:
logger.info("Background Jira processor task cancelled.")
except Exception as e:
logger.error(f"Error cancelling processor task: {str(e)}")
# Close langfuse with retry
if hasattr(settings, 'langfuse_handler') and hasattr(settings.langfuse_handler, 'close'):
try:
await asyncio.wait_for(settings.langfuse_handler.close(), timeout=5.0)
logger.info("Langfuse client closed successfully")
except asyncio.TimeoutError:
logger.warning("Timeout while closing Langfuse client")
except Exception as e:
logger.error(f"Error closing Langfuse client: {str(e)}")
# Execute any other cleanup tasks
if cleanup_tasks:
try:
# Filter out the processor_task if it's already handled
remaining_cleanup_tasks = [task for task in cleanup_tasks if task != processor_task]
if remaining_cleanup_tasks:
await asyncio.gather(*remaining_cleanup_tasks)
except Exception as e:
logger.error(f"Error during additional cleanup tasks: {str(e)}")
def create_app(): def create_app():
"""Factory function to create FastAPI app instance""" """Factory function to create FastAPI app instance"""
configure_logging(log_level="DEBUG")
_app = FastAPI(lifespan=lifespan) _app = FastAPI(lifespan=lifespan)
# Include routers without prefixes to match test expectations # Include routers
_app.include_router(webhook_router) _app.include_router(webhook_router)
_app.include_router(router) _app.include_router(router)
@ -363,10 +161,6 @@ def create_app():
return _app return _app
from api.handlers import test_llm_endpoint
app = create_app()
class ErrorResponse(BaseModel): class ErrorResponse(BaseModel):
error_id: str error_id: str
timestamp: str timestamp: str
@ -374,3 +168,4 @@ class ErrorResponse(BaseModel):
message: str message: str
details: Optional[str] = None details: Optional[str] = None
app = create_app()

View File

@ -1,22 +1,21 @@
import json import json
import sys
from typing import Union from typing import Union
from loguru import logger
from langchain_core.prompts import ChatPromptTemplate from langchain_core.prompts import (
ChatPromptTemplate,
PromptTemplate,
SystemMessagePromptTemplate,
HumanMessagePromptTemplate,
)
from langchain_core.output_parsers import JsonOutputParser from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnablePassthrough from langchain_core.runnables import RunnablePassthrough
from llm.models import AnalysisFlags
from config import settings
import json
from typing import Union
from langchain_ollama import OllamaLLM from langchain_ollama import OllamaLLM
from langchain_openai import ChatOpenAI from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from loguru import logger from loguru import logger
import sys
from llm.models import AnalysisFlags
from config import settings from config import settings
from .models import AnalysisFlags
class LLMInitializationError(Exception): class LLMInitializationError(Exception):
"""Custom exception for LLM initialization errors""" """Custom exception for LLM initialization errors"""
@ -53,11 +52,8 @@ elif settings.llm.mode == 'ollama':
model=settings.llm.ollama_model, model=settings.llm.ollama_model,
base_url=base_url, base_url=base_url,
streaming=False, streaming=False,
timeout=30, # 30 second timeout timeout=30,
max_retries=3 max_retries=3
# , # Retry up to 3 times
# temperature=0.1,
# top_p=0.2
) )
# Test connection only if not in a test environment # Test connection only if not in a test environment
@ -97,7 +93,7 @@ parser = JsonOutputParser(pydantic_object=AnalysisFlags)
# Load prompt template from file # Load prompt template from file
def load_prompt_template(version="v1.2.0"): def load_prompt_template(version="v1.2.0"):
try: try:
with open(f"llm/prompts/jira_analysis_{version}.txt", "r") as f: with open(f"llm/jira_analysis_{version}.txt", "r") as f:
template_content = f.read() template_content = f.read()
# Split system and user parts # Split system and user parts
@ -148,8 +144,8 @@ def create_analysis_chain():
| parser | parser
) )
# Add langfuse handler if enabled # Add langfuse handler if enabled and available
if settings.langfuse.enabled: if settings.langfuse.enabled and hasattr(settings, 'langfuse_handler'):
chain = chain.with_config( chain = chain.with_config(
callbacks=[settings.langfuse_handler] callbacks=[settings.langfuse_handler]
) )
@ -159,7 +155,8 @@ def create_analysis_chain():
logger.warning(f"Using fallback prompt due to error: {str(e)}") logger.warning(f"Using fallback prompt due to error: {str(e)}")
chain = FALLBACK_PROMPT | llm | parser chain = FALLBACK_PROMPT | llm | parser
if settings.langfuse.enabled: # Add langfuse handler if enabled and available
if settings.langfuse.enabled and hasattr(settings, 'langfuse_handler'):
chain = chain.with_config( chain = chain.with_config(
callbacks=[settings.langfuse_handler] callbacks=[settings.langfuse_handler]
) )

View File

@ -2,6 +2,7 @@ from typing import Optional, List, Union
from enum import Enum from enum import Enum
from loguru import logger from loguru import logger
from pydantic import BaseModel, ConfigDict, field_validator, Field from pydantic import BaseModel, ConfigDict, field_validator, Field
from datetime import datetime
from config import settings from config import settings
class LLMResponse(BaseModel): class LLMResponse(BaseModel):
@ -35,8 +36,8 @@ class JiraWebhookPayload(BaseModel):
updated: Optional[str] = None updated: Optional[str] = None
class AnalysisFlags(BaseModel): class AnalysisFlags(BaseModel):
hasMultipleEscalations: bool = Field(description="Is there evidence of multiple escalation attempts?") hasMultipleEscalations: bool = Field(alias="Hasmultipleescalations", description="Is there evidence of multiple escalation attempts?")
customerSentiment: Optional[CustomerSentiment] = Field(description="Overall customer sentiment (e.g., 'neutral', 'frustrated', 'calm').") customerSentiment: Optional[CustomerSentiment] = Field(alias="CustomerSentiment", description="Overall customer sentiment (e.g., 'neutral', 'frustrated', 'calm').")
def __init__(self, **data): def __init__(self, **data):
super().__init__(**data) super().__init__(**data)
@ -61,8 +62,6 @@ class AnalysisFlags(BaseModel):
).end() # End the trace immediately as it's just for tracking model usage ).end() # End the trace immediately as it's just for tracking model usage
except Exception as e: except Exception as e:
logger.error(f"Failed to track model usage: {e}") logger.error(f"Failed to track model usage: {e}")
from pydantic import BaseModel, Field
from datetime import datetime
class JiraAnalysisResponse(BaseModel): class JiraAnalysisResponse(BaseModel):
model_config = ConfigDict(from_attributes=True) model_config = ConfigDict(from_attributes=True)

View File

@ -1,23 +0,0 @@
You are an AI assistant designed to analyze Jira ticket details containe email correspondence and extract key flags and sentiment and extracting information into a strict JSON format.
Analyze the following Jira ticket information and provide your analysis in a JSON format.
Ensure the JSON strictly adheres to the specified schema.
Consider the overall context of the ticket and specifically the latest comment if provided.
Issue Key: {issueKey}
Summary: {summary}
Description: {description}
Status: {status}
Existing Labels: {labels}
Assignee: {assignee}
Last Updated: {updated}
Latest Comment (if applicable): {comment}
**Analysis Request:**
- Determine if there are signs of multiple escalation attempts in the descriptions or comments with regards to HUB team. Escalation to other teams are not considered.
-- Usually multiple requests one after another are being called by the same user in span of hours or days asking for immediate help of HUB team. Normall discussion, responses back and forth, are not considered as a escalation.
- Assess if the issue requires urgent attention based on language or context from the summary, description, or latest comment.
-- Usually means that Customer is asking for help due to upcoming deadlines, other high priority issues which are blocked due to our stall.
- Summarize the overall customer sentiment evident in the issue. Analyse tone of responses, happiness, gratefullnes, iritation, etc.
{format_instructions}

View File

@ -1,27 +0,0 @@
SYSTEM:
You are an AI assistant designed to analyze Jira ticket details containing email correspondence and extract key flags and sentiment, outputting information in a strict JSON format.
Your output MUST be ONLY a valid JSON object. Do NOT include any conversational text, explanations, or markdown outside the JSON.
The JSON structure MUST follow this exact schema. If a field cannot be determined, use `null` for strings/numbers or empty list `[]` for arrays.
Consider the overall context of the ticket and specifically the latest comment if provided.
**Analysis Request:**
- Determine if there are signs of multiple escalation attempts in the descriptions or comments with regards to HUB team. Escalation to other teams are not considered.
-- Usually multiple requests one after another are being called by the same user in span of hours or days asking for immediate help of HUB team. Normal discussion, responses back and forth, are not considered as an escalation.
- Assess if the issue requires urgent attention based on language or context from the summary, description, or latest comment.
-- Usually means that Customer is asking for help due to upcoming deadlines, other high priority issues which are blocked due to our stall.
- Summarize the overall customer sentiment evident in the issue. Analyze tone of responses, happiness, gratefulness, irritation, etc.
{format_instructions}
USER:
Issue Key: {issueKey}
Summary: {summary}
Description: {description}
Status: {status}
Existing Labels: {labels}
Assignee: {assignee}
Last Updated: {updated}
Latest Comment (if applicable): {comment}

View File

@ -1,77 +0,0 @@
import sys
import os
from pathlib import Path
from datetime import datetime
from typing import Optional
from loguru import logger
from config import Settings
# Basic fallback logging configuration
logger.remove()
logger.add(sys.stderr, level="WARNING", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
def configure_logging(log_level: str = "INFO", log_dir: Optional[str] = None):
"""Configure structured logging for the application with fallback handling"""
try:
# Log that we're attempting to configure logging
# Default log directory
if not log_dir:
log_dir = os.getenv("LOG_DIR", "logs")
# Create log directory if it doesn't exist
Path(log_dir).mkdir(parents=True, exist_ok=True)
# Log file path with timestamp
log_file = Path(log_dir) / f"jira-webhook-llm_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
# Remove any existing loggers
logger.remove()
# Add console logger
logger.add(
sys.stdout,
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {extra[request_id]} | {message}",
colorize=True,
backtrace=True,
diagnose=True
)
# Add file logger
logger.add(
str(log_file),
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {extra[request_id]} | {message}",
rotation="100 MB",
retention="30 days",
compression="zip",
backtrace=True,
diagnose=True
)
# Configure default extras
# Configure thread-safe defaults
logger.configure(
extra={"request_id": "N/A"},
patcher=lambda record: record["extra"].update(
thread_id = record["thread"].id if hasattr(record.get("thread"), 'id') else "main"
)
)
logger.info("Logging configured successfully")
settings = Settings()
# Removed duplicate logging_ready.set() call
logger.debug("Signaled logging_ready event")
except Exception as e:
# Fallback to basic logging if configuration fails
logger.remove()
logger.add(sys.stderr, level="WARNING", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
logger.error(f"Failed to configure logging: {str(e)}. Using fallback logging configuration.")
settings = Settings()
try:
settings.logging_ready.set()
logger.debug("Signaled logging_ready event")
except Exception as inner_e:
logger.error(f"Failed to signal logging_ready: {str(inner_e)}")
raise # Re-raise the original exception

91
shared_store.py Normal file
View File

@ -0,0 +1,91 @@
from typing import List, Dict, Optional
import threading
from datetime import datetime, timezone
from enum import Enum
class RequestStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
TIMEOUT = "timeout"
# Thread-safe storage for requests and responses
from queue import Queue
from dataclasses import dataclass, field
@dataclass
class ProcessingRequest:
id: int
payload: Dict
status: RequestStatus = RequestStatus.PENDING
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
error: Optional[str] = None
retry_count: int = 0
class RequestQueue:
def __init__(self):
self._queue: Queue[ProcessingRequest] = Queue()
self._requests: List[ProcessingRequest] = [] # To store all requests for retrieval
self._processing_lock = threading.Lock()
self._id_lock = threading.Lock()
self._current_id = 0
def _get_next_id(self) -> int:
"""Generate and return the next available request ID"""
with self._id_lock:
self._current_id += 1
return self._current_id
def add_request(self, payload: Dict) -> int:
"""Adds a new request to the queue and returns its ID"""
request_id = self._get_next_id()
request = ProcessingRequest(id=request_id, payload=payload)
self._queue.put(request)
with self._processing_lock: # Protect access to _requests list
self._requests.append(request)
return request_id
def get_next_request(self) -> Optional[ProcessingRequest]:
"""Fetches the next available request from the queue"""
with self._processing_lock:
if not self._queue.empty():
return self._queue.get()
return None
def get_all_requests(self) -> List[ProcessingRequest]:
"""Returns a list of all requests currently in the store."""
with self._processing_lock:
return list(self._requests) # Return a copy to prevent external modification
def get_request_by_id(self, request_id: int) -> Optional[ProcessingRequest]:
"""Retrieves a specific request by its ID."""
with self._processing_lock:
return next((req for req in self._requests if req.id == request_id), None)
def delete_request_by_id(self, request_id: int) -> bool:
"""Deletes a specific request by its ID."""
with self._processing_lock:
initial_length = len(self._requests)
self._requests = [req for req in self._requests if req.id != request_id]
return len(self._requests) < initial_length
def clear_all_requests(self):
"""Clears all requests from the store."""
with self._processing_lock:
self._requests.clear()
# Clear the queue as well, though it's generally processed
while not self._queue.empty():
try:
self._queue.get_nowait()
except Exception:
continue
self._queue.task_done() # Mark all tasks as done if clearing
def task_done(self):
"""Indicates that a formerly enqueued task is complete."""
self._queue.task_done()
requests_queue = RequestQueue()

View File

@ -1 +0,0 @@
# Initialize tests package

View File

@ -1,120 +0,0 @@
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import sessionmaker
from database.database import Base, get_db_session # Keep get_db_session for dependency override
from fastapi import FastAPI
from database import database as db # Import the database module directly
from jira_webhook_llm import create_app # Import create_app
@pytest.fixture(scope="function")
def setup_db(monkeypatch):
print("\n--- setup_db fixture started ---")
# Use in-memory SQLite for tests
test_db_url = "sqlite:///:memory:"
monkeypatch.setenv("DATABASE_URL", test_db_url)
monkeypatch.setenv("IS_TEST_ENV", "true")
# Monkeypatch the global engine and SessionLocal in the database module
engine = create_engine(test_db_url, connect_args={"check_same_thread": False})
connection = engine.connect()
# Begin a transaction and bind the session to it
transaction = connection.begin()
# Monkeypatch the global engine and SessionLocal in the database module
monkeypatch.setattr(db, 'engine', engine)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=connection) # Bind to the connection
monkeypatch.setattr(db, 'SessionLocal', SessionLocal)
from database.models import Base as ModelsBase # Renamed to avoid conflict with imported Base
# Create all tables within the same connection and commit
ModelsBase.metadata.create_all(bind=connection) # Use the connection here
# Verify table creation within setup_db
inspector = inspect(connection) # Use the connection here
if inspector.has_table("jira_analyses"):
print("--- jira_analyses table created successfully in setup_db ---")
else:
print("--- ERROR: jira_analyses table NOT created in setup_db ---")
yield engine # Yield the engine for test_client to use
# Cleanup: Rollback the test transaction and close the connection
transaction.rollback() # Rollback test data
connection.close()
print("--- setup_db fixture finished ---")
@pytest.fixture
def mock_full_jira_payload(setup_db):
mock_data = {
"issueKey": "PROJ-123",
"summary": "Test Issue",
"description": "Test Description",
"comment": "Test Comment",
"labels": ["test"],
"status": "open",
"assignee": "Tester",
"updated": "2025-07-13T12:00:00Z"
}
return mock_data
@pytest.fixture(scope="function")
def test_client(setup_db, monkeypatch):
print("\n--- test_client fixture started ---")
# Prevent signal handling in tests, but allow lifespan to run
monkeypatch.setattr("jira_webhook_llm.handle_shutdown_signal", lambda *args: None)
# Use the application factory to create the app instance with all middleware and routers
app = create_app()
# Override the get_db_session dependency to use the test database
# This will now correctly use the monkeypatched SessionLocal from database.database
def override_get_db_session():
db_session = db.SessionLocal() # Use the monkeypatched SessionLocal
try:
yield db_session
finally:
db_session.close()
app.dependency_overrides[get_db_session] = override_get_db_session
# Verify tables exist before running tests
# Verify tables exist before running tests using the monkeypatched engine
inspector = inspect(db.engine) # This will now inspect the engine bound to the single connection
if inspector.has_table("jira_analyses"):
print("--- jira_analyses table exists in test_client setup ---")
else:
print("--- ERROR: jira_analyses table NOT found in test_client setup ---")
assert inspector.has_table("jira_analyses"), "Test tables not created"
with TestClient(app) as client:
yield client
# Clean up dependency override
app.dependency_overrides.clear()
print("--- test_client fixture finished ---")
@pytest.fixture
def mock_jira_payload():
return {
"issueKey": "TEST-123",
"summary": "Test Issue",
"description": "Test Description",
"comment": "Test Comment",
"labels": ["test"],
"status": "Open",
"assignee": "Tester",
"updated": "2025-07-13T12:00:00Z"
}
# return {
# "issueKey": "TEST-123",
# "summary": "Test Issue",
# "description": "Test Description",
# "comment": "Test Comment",
# "labels": ["test"],
# "status": "Open",
# "assignee": "Tester",
# "updated": "2025-07-13T12:00:00Z"
# }

View File

@ -1,250 +0,0 @@
import pytest
from fastapi import HTTPException
from jira_webhook_llm import app
from llm.models import JiraWebhookPayload
from database.crud import create_analysis_record, get_analysis_by_id
from database.models import JiraAnalysis
from database.database import get_db
from database.models import JiraAnalysis
from database.crud import create_analysis_record, update_record_status, get_analysis_by_id
from unittest.mock import MagicMock # Import MagicMock
from datetime import datetime, timezone
def test_error_handling_middleware(test_client, mock_jira_payload):
# Test 404 error handling
response = test_client.post("/nonexistent-endpoint", json={})
assert response.status_code == 404
assert "detail" in response.json() # FastAPI's default 404 response uses "detail"
# Test validation error handling
invalid_payload = mock_jira_payload.copy()
invalid_payload.pop("issueKey")
response = test_client.post("/api/jira-webhook", json=invalid_payload)
assert response.status_code == 422
assert "detail" in response.json() # FastAPI's default 422 response uses "detail"
def test_webhook_handler(setup_db, test_client, mock_full_jira_payload, monkeypatch):
# Mock the LLM analysis chain to avoid external calls
mock_chain = MagicMock()
mock_chain.ainvoke.return_value = { # Use ainvoke as per webhooks/handlers.py
"hasMultipleEscalations": False,
"customerSentiment": "neutral",
"analysisSummary": "Mock analysis summary.",
"actionableItems": ["Mock action item 1", "Mock action item 2"],
"analysisFlags": ["mock_flag"]
}
monkeypatch.setattr("llm.chains.analysis_chain", mock_chain)
# Test successful webhook handling with full payload
response = test_client.post("/api/jira-webhook", json=mock_full_jira_payload)
assert response.status_code == 202
response_data = response.json()
assert "status" in response_data
assert response_data["status"] in ["success", "skipped", "queued"]
if response_data["status"] == "success":
assert "analysis_flags" in response_data
# Validate database storage
from database.models import JiraAnalysis
from database.database import get_db
with get_db() as db:
record = db.query(JiraAnalysis).filter_by(issue_key=mock_full_jira_payload["issueKey"]).first()
assert record is not None
assert record.issue_summary == mock_full_jira_payload["summary"]
assert record.request_payload == mock_full_jira_payload
def test_llm_test_endpoint(test_client):
# Test LLM test endpoint
response = test_client.post("/api/test-llm")
assert response.status_code == 200
assert "response" in response.json()
def test_create_analysis_record_endpoint(setup_db, test_client, mock_full_jira_payload):
# Test successful creation of a new analysis record via API
response = test_client.post("/api/request", json=mock_full_jira_payload)
assert response.status_code == 201
response_data = response.json()
assert "message" in response_data
assert response_data["message"] == "Record created successfully"
assert "record_id" in response_data
# Verify the record exists in the database
with get_db() as db:
record = get_analysis_by_id(db, response_data["record_id"])
assert record is not None
assert record.issue_key == mock_full_jira_payload["issueKey"]
assert record.issue_summary == mock_full_jira_payload["summary"]
assert record.request_payload == mock_full_jira_payload
@pytest.mark.asyncio
async def test_retry_decorator():
# Test retry decorator functionality
from jira_webhook_llm import retry # Import decorator from main module
@retry(max_retries=3) # Use imported decorator
async def failing_function():
raise Exception("Test error")
with pytest.raises(Exception):
await failing_function()
def test_get_pending_queue_records_endpoint(setup_db, test_client, mock_full_jira_payload):
# Create a pending record
with get_db() as db:
payload_model = JiraWebhookPayload(**mock_full_jira_payload)
pending_record = create_analysis_record(db, payload_model)
db.commit()
db.refresh(pending_record)
response = test_client.get("/api/queue/pending")
assert response.status_code == 200, f"Expected 200 but got {response.status_code}. Response: {response.text}"
data = response.json()["data"]
assert len(data) == 1
assert data[0]["issue_key"] == mock_full_jira_payload["issueKey"]
assert data[0]["status"] == "pending"
def test_get_pending_queue_records_endpoint_empty(setup_db, test_client):
# Ensure no records exist
with get_db() as db:
db.query(JiraAnalysis).delete()
db.commit()
response = test_client.get("/api/queue/pending")
assert response.status_code == 200
data = response.json()["data"]
assert len(data) == 0
def test_get_pending_queue_records_endpoint_error(test_client, monkeypatch):
def mock_get_pending_analysis_records(db):
raise Exception("Database error")
monkeypatch.setattr("api.handlers.get_pending_analysis_records", mock_get_pending_analysis_records)
response = test_client.get("/api/queue/pending")
assert response.status_code == 500, f"Expected 500 but got {response.status_code}. Response: {response.text}"
assert "detail" in response.json() # FastAPI's HTTPException uses "detail"
assert response.json()["detail"] == "Database error: Database error"
def test_retry_analysis_record_endpoint_success(setup_db, test_client, mock_full_jira_payload):
# Create a failed record
with get_db() as db:
payload_model = JiraWebhookPayload(**mock_full_jira_payload)
failed_record = create_analysis_record(db, payload_model)
update_record_status(db, failed_record.id, "failed", error_message="LLM failed")
db.commit()
db.refresh(failed_record)
response = test_client.post(f"/api/queue/{failed_record.id}/retry")
assert response.status_code == 200
assert response.json()["message"] == f"Record {failed_record.id} marked for retry."
with get_db() as db:
updated_record = get_analysis_by_id(db, failed_record.id)
assert updated_record.status == "pending"
assert updated_record.error_message is None
assert updated_record.analysis_result is None
assert updated_record.raw_response is None
assert updated_record.next_retry_at is None
def test_retry_analysis_record_endpoint_not_found(test_client):
response = test_client.post("/api/queue/99999/retry")
assert response.status_code == 404
# Handle both possible error response formats
assert "detail" in response.json() # FastAPI's HTTPException uses "detail"
assert response.json()["detail"] == "Analysis record not found"
def test_retry_analysis_record_endpoint_invalid_status(setup_db, test_client, mock_full_jira_payload):
# Create a successful record
with get_db() as db:
payload_model = JiraWebhookPayload(**mock_full_jira_payload)
successful_record = create_analysis_record(db, payload_model)
update_record_status(db, successful_record.id, "success")
db.commit()
db.refresh(successful_record)
response = test_client.post(f"/api/queue/{successful_record.id}/retry")
assert response.status_code == 400
assert response.json()["detail"] == f"Record status is 'success'. Only 'failed', 'processing' or 'validation_failed' records can be retried."
def test_retry_analysis_record_endpoint_db_update_failure(setup_db, test_client, mock_full_jira_payload, monkeypatch):
# Create a failed record
with get_db() as db:
payload_model = JiraWebhookPayload(**mock_full_jira_payload)
failed_record = create_analysis_record(db, payload_model)
update_record_status(db, failed_record.id, "failed", error_message="LLM failed")
db.commit()
db.refresh(failed_record)
def mock_update_record_status(*args, **kwargs):
return None # Simulate update failure
monkeypatch.setattr("api.handlers.update_record_status", mock_update_record_status)
response = test_client.post(f"/api/queue/{failed_record.id}/retry")
assert response.status_code == 500, f"Expected 500 but got {response.status_code}. Response: {response.text}"
assert response.json()["detail"] == "Failed to update record for retry."
def test_retry_analysis_record_endpoint_retry_count_and_next_retry_at(setup_db, test_client, mock_full_jira_payload):
# Create a failed record with an initial retry count and next_retry_at
with get_db() as db:
payload_model = JiraWebhookPayload(**mock_full_jira_payload)
failed_record = create_analysis_record(db, payload_model)
update_record_status(
db,
failed_record.id,
"failed",
error_message="LLM failed",
retry_count_increment=1,
next_retry_at=datetime.now(timezone.utc)
)
db.commit()
db.refresh(failed_record)
initial_retry_count = failed_record.retry_count
response = test_client.post(f"/api/queue/{failed_record.id}/retry")
assert response.status_code == 200
with get_db() as db:
updated_record = get_analysis_by_id(db, failed_record.id)
assert updated_record.status == "pending"
assert updated_record.error_message is None
assert updated_record.next_retry_at is None # Should be reset to None
# The retry endpoint itself doesn't increment retry_count,
# it just resets the status. The increment happens during processing.
# So, we assert it remains the same as before the retry request.
assert updated_record.retry_count == initial_retry_count
@pytest.mark.asyncio
async def test_concurrent_retry_operations(setup_db, test_client, mock_full_jira_payload):
# Create multiple failed records
record_ids = []
with get_db() as db:
for i in range(5):
payload = mock_full_jira_payload.copy()
payload["issueKey"] = f"TEST-{i}"
payload_model = JiraWebhookPayload(**payload)
failed_record = create_analysis_record(db, payload_model)
update_record_status(db, failed_record.id, "failed", error_message=f"LLM failed {i}")
db.commit()
db.refresh(failed_record)
record_ids.append(failed_record.id)
# Simulate concurrent retry requests
import asyncio
async def send_retry_request(record_id):
return test_client.post(f"/api/queue/{record_id}/retry")
tasks = [send_retry_request(rid) for rid in record_ids]
responses = await asyncio.gather(*tasks)
for response in responses:
assert response.status_code == 200
assert "message" in response.json()
# Verify all records are marked as pending
with get_db() as db:
for record_id in record_ids:
updated_record = get_analysis_by_id(db, record_id)
assert updated_record.status == "pending"
assert updated_record.error_message is None
assert updated_record.next_retry_at is None

View File

@ -1,38 +0,0 @@
import pytest
from llm.chains import validate_response
def test_validate_response_valid():
"""Test validation with valid response"""
response = {
"hasMultipleEscalations": False,
"customerSentiment": "neutral"
}
assert validate_response(response) is True
def test_validate_response_missing_field():
"""Test validation with missing required field"""
response = {
"hasMultipleEscalations": False
}
assert validate_response(response) is False
def test_validate_response_invalid_type():
"""Test validation with invalid field type"""
response = {
"hasMultipleEscalations": "not a boolean",
"customerSentiment": "neutral"
}
assert validate_response(response) is False
def test_validate_response_null_sentiment():
"""Test validation with null sentiment"""
response = {
"hasMultipleEscalations": True,
"customerSentiment": None
}
assert validate_response(response) is True
def test_validate_response_invalid_structure():
"""Test validation with invalid JSON structure"""
response = "not a dictionary"
assert validate_response(response) is False

View File

@ -1,76 +1,10 @@
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter
from loguru import logger
import json
from typing import Optional, List, Union
from sqlalchemy.orm import Session
from pydantic import BaseModel, ConfigDict, field_validator
from datetime import datetime, timezone # Import timezone
import uuid
from config import settings webhook_router = APIRouter(
from langfuse import Langfuse prefix="/webhooks",
from database.crud import create_analysis_record tags=["Webhooks"]
from llm.models import JiraWebhookPayload )
from database.database import get_db_session
webhook_router = APIRouter() @webhook_router.post("/jira")
async def handle_jira_webhook():
class BadRequestError(HTTPException): return {"status": "webhook received"}
def __init__(self, detail: str):
super().__init__(status_code=400, detail=detail)
class RateLimitError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=429, detail=detail)
class ValidationError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=422, detail=detail)
class ValidationError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=422, detail=detail)
class JiraWebhookHandler:
async def process_jira_request(self, payload: JiraWebhookPayload, db: Session):
try:
if not payload.issueKey:
raise BadRequestError("Missing required field: issueKey")
if not payload.summary:
raise BadRequestError("Missing required field: summary")
# Create new analysis record with initial state
new_record = create_analysis_record(db=db, payload=payload)
logger.bind(
issue_key=payload.issueKey,
record_id=new_record.id,
timestamp=datetime.now(timezone.utc).isoformat()
).info(f"[{payload.issueKey}] Received webhook and queued for processing.")
return {"status": "queued", "record_id": new_record.id}
except Exception as e:
issue_key = payload.issueKey if payload.issueKey else "N/A"
logger.error(f"[{issue_key}] Error receiving webhook: {str(e)}")
import traceback
logger.error(f"[{issue_key}] Stack trace: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
# Initialize handler
webhook_handler = JiraWebhookHandler()
@webhook_router.post("/api/jira-webhook", status_code=202)
async def receive_jira_request(payload: JiraWebhookPayload, db: Session = Depends(get_db_session)):
"""Jira webhook endpoint - receives and queues requests for processing"""
try:
result = await webhook_handler.process_jira_request(payload, db)
return result
except ValidationError as e:
raise
except BadRequestError as e:
raise ValidationError(detail=e.detail)
except Exception as e:
logger.error(f"Unexpected error in webhook endpoint: {str(e)}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")