diff --git a/api/handlers.py b/api/handlers.py index 7a8a72b..15763e6 100644 --- a/api/handlers.py +++ b/api/handlers.py @@ -111,13 +111,13 @@ async def get_pending_queue_records_endpoint(db: Session = Depends(get_db_sessio @router.post("/queue/{record_id}/retry", status_code=200) async def retry_analysis_record_endpoint(record_id: int, db: Session = Depends(get_db_session)): - """Manually trigger a retry for a failed or validation_failed analysis record.""" + """Manually trigger a retry for a failed, processing or validation_failed analysis record.""" db_record = get_analysis_by_id(db, record_id) if not db_record: raise HTTPException(status_code=404, detail="Analysis record not found") - if db_record.status not in ["failed", "validation_failed"]: - raise HTTPException(status_code=400, detail=f"Record status is '{db_record.status}'. Only 'failed' or 'validation_failed' records can be retried.") + if db_record.status not in ["processing", "failed", "validation_failed"]: + raise HTTPException(status_code=400, detail=f"Record status is '{db_record.status}'. Only 'failed', 'processing' or 'validation_failed' records can be retried.") # Reset status to pending and clear error message for retry updated_record = update_record_status( diff --git a/config.py b/config.py index bfd6f28..c68903a 100644 --- a/config.py +++ b/config.py @@ -97,12 +97,23 @@ class ApiConfig(BaseSettings): extra='ignore' ) +class ProcessorConfig(BaseSettings): + poll_interval_seconds: int = 30 + max_retries: int = 5 + initial_retry_delay_seconds: int = 60 + + model_config = ConfigDict( + env_prefix='PROCESSOR_', + env_file='.env', + env_file_encoding='utf-8', + extra='ignore' + ) + class Settings: logging_ready = Event() # Event to signal logging is configured def __init__(self): try: - # logger.debug(f"Config initialization started from: {''.join(traceback.format_stack())}") logger.info("Loading configuration from application.yml and environment variables") # Load configuration from YAML file @@ -126,10 +137,9 @@ class Settings: self.api = ApiConfig(**yaml_config.get('api', {})) logger.info("ApiConfig initialized: {}", self.api.model_dump()) - # Add thread_pool_max_workers - self.thread_pool_max_workers = yaml_config.get('application', {}).get('thread_pool_max_workers', 5) - logger.info("ThreadPool max workers set to: {}", self.thread_pool_max_workers) - logger.debug(f"Thread pool initialized with {self.thread_pool_max_workers} workers") + logger.info("Initializing ProcessorConfig") + self.processor = ProcessorConfig(**yaml_config.get('processor', {})) + logger.info("ProcessorConfig initialized: {}", self.processor.model_dump()) logger.info("Validating configuration") self._validate() @@ -144,6 +154,7 @@ class Settings: logger.error("LogConfig: {}", self.log.model_dump() if hasattr(self, 'log') else 'Not initialized') logger.error("LLMConfig: {}", self.llm.model_dump() if hasattr(self, 'llm') else 'Not initialized') logger.error("LangfuseConfig: {}", self.langfuse.model_dump() if hasattr(self, 'langfuse') else 'Not initialized') + logger.error("ProcessorConfig: {}", self.processor.model_dump() if hasattr(self, 'processor') else 'Not initialized') raise def _load_yaml_config(self): @@ -233,6 +244,8 @@ class Settings: self.log = LogConfig(**yaml_config.get('log', {})) self.llm = LLMConfig(**yaml_config.get('llm', {})) self.langfuse = LangfuseConfig(**yaml_config.get('langfuse', {})) + self.api = ApiConfig(**yaml_config.get('api', {})) + self.processor = ProcessorConfig(**yaml_config.get('processor', {})) self._validate() self._init_langfuse() # Re-initialize Langfuse client if needed logger.info("Configuration reloaded successfully") diff --git a/config/application.yml b/config/application.yml index 4ad1ffb..6f1372b 100644 --- a/config/application.yml +++ b/config/application.yml @@ -41,4 +41,18 @@ langfuse: # instead of saving them in this file public_key: "pk-lf-17dfde63-93e2-4983-8aa7-2673d3ecaab8" secret_key: "sk-lf-ba41a266-6fe5-4c90-a483-bec8a7aaa321" - host: "https://cloud.langfuse.com" \ No newline at end of file + host: "https://cloud.langfuse.com" + +# Processor configuration +processor: + # Interval in seconds between polling for new Jira analysis requests + # Can be overridden by PROCESSOR_POLL_INTERVAL_SECONDS environment variable + poll_interval_seconds: 30 + + # Maximum number of retries for failed Jira analysis requests + # Can be overridden by PROCESSOR_MAX_RETRIES environment variable + max_retries: 5 + + # Initial delay in seconds before the first retry attempt (exponential backoff) + # Can be overridden by PROCESSOR_INITIAL_RETRY_DELAY_SECONDS environment variable + initial_retry_delay_seconds: 60 \ No newline at end of file diff --git a/database/models.py b/database/models.py index 4bc6f34..31b13a6 100644 --- a/database/models.py +++ b/database/models.py @@ -1,5 +1,13 @@ from sqlalchemy import Column, Integer, String, DateTime, Text, JSON from datetime import datetime +from enum import Enum + +class AnalysisFlags(str, Enum): + BUG = "bug" + FEATURE = "feature" + IMPROVEMENT = "improvement" + SUPPORT = "support" + OTHER = "other" from sqlalchemy.orm import declarative_base Base = declarative_base() diff --git a/jira_analyses.db b/jira_analyses.db index 9b95404..83bbe69 100644 Binary files a/jira_analyses.db and b/jira_analyses.db differ diff --git a/jira_processor.py b/jira_processor.py deleted file mode 100644 index 7417852..0000000 --- a/jira_processor.py +++ /dev/null @@ -1,170 +0,0 @@ -import time -from datetime import datetime, timedelta, timezone -from loguru import logger -from sqlalchemy.orm import Session -import json - -from database.database import SessionLocal -from database.crud import get_analysis_record, update_record_status, create_analysis_record -from database.models import JiraAnalysis -from llm.models import JiraWebhookPayload, AnalysisFlags -from llm.chains import analysis_chain, validate_response -from config import settings - -# Configuration for polling and retries -POLL_INTERVAL_SECONDS = 30 -MAX_RETRIES = 5 -INITIAL_RETRY_DELAY_SECONDS = 60 # 1 minute - -def calculate_next_retry_time(retry_count: int) -> datetime: - """Calculates the next retry time using exponential backoff.""" - delay = INITIAL_RETRY_DELAY_SECONDS * (2 ** retry_count) - return datetime.now(timezone.utc) + timedelta(seconds=delay) - -async def process_single_jira_request(db: Session, record: JiraAnalysis): - """Processes a single Jira webhook request using the LLM.""" - issue_key = record.issue_key - record_id = record.id - payload = JiraWebhookPayload.model_validate(record.request_payload) - - logger.bind( - issue_key=issue_key, - record_id=record_id, - timestamp=datetime.now(timezone.utc).isoformat() - ).info(f"[{issue_key}] Processing webhook request.") - - # Create Langfuse trace if enabled - trace = None - if settings.langfuse.enabled: - trace = settings.langfuse_client.start_span( - name="Jira Webhook Processing", - input=payload.model_dump(), - metadata={ - "trace_id": f"processor-{issue_key}-{record_id}", - "issue_key": issue_key, - "record_id": record_id, - "timestamp": datetime.now(timezone.utc).isoformat() - } - ) - - llm_input = { - "issueKey": payload.issueKey, - "summary": payload.summary, - "description": payload.description if payload.description else "No description provided.", - "status": payload.status if payload.status else "Unknown", - "labels": ", ".join(payload.labels) if payload.labels else "None", - "assignee": payload.assignee if payload.assignee else "Unassigned", - "updated": payload.updated if payload.updated else "Unknown", - "comment": payload.comment if payload.comment else "No new comment provided." - } - - llm_span = None - if settings.langfuse.enabled and trace: - llm_span = trace.start_span( - name="LLM Processing", - input=llm_input, - metadata={ - "model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model - } - ) - - try: - raw_llm_response = await analysis_chain.ainvoke(llm_input) - - if settings.langfuse.enabled and llm_span: - llm_span.update(output=raw_llm_response) - llm_span.end() - - try: - AnalysisFlags( - hasMultipleEscalations=raw_llm_response.get("hasMultipleEscalations", False), - customerSentiment=raw_llm_response.get("customerSentiment", "neutral") - ) - except Exception as e: - logger.error(f"[{issue_key}] Invalid LLM response structure: {e}", exc_info=True) - update_record_status( - db=db, - record_id=record_id, - analysis_result={"hasMultipleEscalations": False, "customerSentiment": "neutral"}, - raw_response=json.dumps(raw_llm_response), - status="validation_failed", - error_message=f"LLM response validation failed: {e}", - last_processed_at=datetime.now(timezone.utc), - retry_count_increment=1, - next_retry_at=calculate_next_retry_time(record.retry_count + 1) if record.retry_count < MAX_RETRIES else None - ) - if settings.langfuse.enabled and trace: - trace.end(status_message=f"Validation failed: {e}", status="ERROR") - raise ValueError(f"Invalid LLM response format: {e}") from e - - logger.debug(f"[{issue_key}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}") - update_record_status( - db=db, - record_id=record_id, - analysis_result=raw_llm_response, - raw_response=json.dumps(raw_llm_response), - status="completed", - last_processed_at=datetime.now(timezone.utc), - next_retry_at=None # No retry needed on success - ) - if settings.langfuse.enabled and trace: - trace.end(status="SUCCESS") - logger.info(f"[{issue_key}] Successfully processed and updated record {record_id}.") - - except Exception as e: - logger.error(f"[{issue_key}] LLM processing failed for record {record_id}: {str(e)}") - if settings.langfuse.enabled and llm_span: - llm_span.end(status_message=str(e), status="ERROR") - - new_retry_count = record.retry_count + 1 - new_status = "failed" - next_retry = None - if new_retry_count <= MAX_RETRIES: - next_retry = calculate_next_retry_time(new_retry_count) - new_status = "retrying" # Indicate that it will be retried - - update_record_status( - db=db, - record_id=record_id, - status=new_status, - error_message=f"LLM processing failed: {str(e)}", - last_processed_at=datetime.now(timezone.utc), - retry_count_increment=1, - next_retry_at=next_retry - ) - if settings.langfuse.enabled and trace: - trace.end(status_message=str(e), status="ERROR") - logger.error(f"[{issue_key}] Record {record_id} status updated to '{new_status}'. Retry count: {new_retry_count}") - - -async def main_processor_loop(): - """Main loop for the Jira webhook processor.""" - logger.info("Starting Jira webhook processor.") - while True: - db: Session = SessionLocal() - try: - # Fetch records that are 'pending' or 'retrying' and past their next_retry_at - # Order by created_at to process older requests first - pending_records = db.query(JiraAnalysis).filter( - (JiraAnalysis.status == "pending") | - ((JiraAnalysis.status == "retrying") & (JiraAnalysis.next_retry_at <= datetime.now(timezone.utc))) - ).order_by(JiraAnalysis.created_at.asc()).all() - - if not pending_records: - logger.debug(f"No pending or retrying records found. Sleeping for {POLL_INTERVAL_SECONDS} seconds.") - - for record in pending_records: - # Update status to 'processing' immediately to prevent other workers from picking it up - update_record_status(db, record.id, "processing", last_processed_at=datetime.now(timezone.utc)) - db.refresh(record) # Refresh to get the latest state - await process_single_jira_request(db, record) - except Exception as e: - logger.error(f"Error in main processor loop: {str(e)}", exc_info=True) - finally: - db.close() - - time.sleep(POLL_INTERVAL_SECONDS) - -if __name__ == "__main__": - import asyncio - asyncio.run(main_processor_loop()) \ No newline at end of file diff --git a/jira_processor.service b/jira_processor.service deleted file mode 100644 index 5f39a5a..0000000 --- a/jira_processor.service +++ /dev/null @@ -1,16 +0,0 @@ -[Unit] -Description=Jira Webhook Processor Service -After=network.target - -[Service] -User=irek -Group=irek -WorkingDirectory=/home/irek/gitea -ExecStart=/home/irek/gitea/.venv/bin/python jira_processor.py -Restart=always -RestartSec=10 -StandardOutput=journal -StandardError=journal - -[Install] -WantedBy=multi-user.target \ No newline at end of file diff --git a/jira_webhook_llm.py b/jira_webhook_llm.py index 1305734..53294e4 100644 --- a/jira_webhook_llm.py +++ b/jira_webhook_llm.py @@ -1,5 +1,6 @@ import os import json +import time from dotenv import load_dotenv load_dotenv() @@ -9,15 +10,16 @@ from fastapi.responses import JSONResponse from http import HTTPStatus from loguru import logger import uuid -from database.database import init_db import sys from typing import Optional -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone import asyncio -from functools import wraps, partial -from concurrent.futures import ThreadPoolExecutor -from database.database import init_db, get_db -from database.crud import create_analysis_record, update_record_status +from functools import partial, wraps +from sqlalchemy.orm import Session + +from database.database import init_db, get_db, SessionLocal +from database.crud import get_analysis_record, update_record_status, create_analysis_record +from database.models import JiraAnalysis, AnalysisFlags from llm.models import JiraWebhookPayload from llm.chains import analysis_chain, validate_response from api.handlers import router # Correct variable name @@ -34,6 +36,191 @@ from contextlib import asynccontextmanager cleanup_tasks = [] # Initialize cleanup_tasks globally +def calculate_next_retry_time(retry_count: int) -> datetime: + """Calculates the next retry time using exponential backoff.""" + delay = settings.processor.initial_retry_delay_seconds * (2 ** retry_count) + return datetime.now(timezone.utc) + timedelta(seconds=delay) + +def retry(max_retries: int = 3, initial_delay: float = 1.0): + def decorator(func): + @wraps(func) + async def wrapper(*args, **kwargs): + for i in range(max_retries + 1): + try: + return await func(*args, **kwargs) + except Exception as e: + if i == max_retries: + logger.error(f"Function {func.__name__} failed after {max_retries} retries: {e}") + raise + delay = initial_delay * (2 ** i) + logger.warning(f"Function {func.__name__} failed, retrying in {delay:.2f} seconds (attempt {i+1}/{max_retries})...") + await asyncio.sleep(delay) + return wrapper + return decorator + +async def process_single_jira_request(db: Session, record: JiraAnalysis): + """Processes a single Jira webhook request using the LLM.""" + issue_key = record.issue_key + record_id = record.id + payload = JiraWebhookPayload.model_validate(record.request_payload) + + logger.bind( + issue_key=issue_key, + record_id=record_id, + timestamp=datetime.now(timezone.utc).isoformat() + ).info(f"[{issue_key}] Processing webhook request.") + + # Create Langfuse trace if enabled + trace = None + if settings.langfuse.enabled: + trace = settings.langfuse_client.start_span( + name="Jira Webhook Processing", + input=payload.model_dump(), + metadata={ + "trace_id": f"processor-{issue_key}-{record_id}", + "issue_key": issue_key, + "record_id": record_id, + "timestamp": datetime.now(timezone.utc).isoformat() + } + ) + + llm_input = { + "issueKey": payload.issueKey, + "summary": payload.summary, + "description": payload.description if payload.description else "No description provided.", + "status": payload.status if payload.status else "Unknown", + "labels": ", ".join(payload.labels) if payload.labels else "None", + "assignee": payload.assignee if payload.assignee else "Unassigned", + "updated": payload.updated if payload.updated else "Unknown", + "comment": payload.comment if payload.comment else "No new comment provided." + } + + llm_span = None + if settings.langfuse.enabled and trace: + llm_span = trace.start_span( + name="LLM Processing", + input=llm_input, + metadata={ + "model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model + } + ) + + try: + raw_llm_response = await analysis_chain.ainvoke(llm_input) + + if settings.langfuse.enabled and llm_span: + llm_span.update(output=raw_llm_response) + llm_span.end() + + # Validate response structure before processing + if not validate_response(raw_llm_response): + error_msg = f"Invalid LLM response structure: {raw_llm_response}" + logger.error(f"[{issue_key}] {error_msg}") + update_record_status( + db=db, + record_id=record_id, + analysis_result={"hasMultipleEscalations": False, "customerSentiment": "neutral"}, + raw_response=json.dumps(raw_llm_response), + status="validation_failed", + error_message=error_msg, + last_processed_at=datetime.now(timezone.utc), + retry_count_increment=1, + next_retry_at=calculate_next_retry_time(record.retry_count + 1) if record.retry_count < settings.processor.max_retries else None + ) + if settings.langfuse.enabled and trace: + trace.end() + raise ValueError(error_msg) + + try: + AnalysisFlags( + hasMultipleEscalations=raw_llm_response.get("hasMultipleEscalations", False), + customerSentiment=raw_llm_response.get("customerSentiment", "neutral") + ) + except Exception as e: + logger.error(f"[{issue_key}] Invalid LLM response structure: {e}", exc_info=True) + update_record_status( + db=db, + record_id=record_id, + analysis_result={"hasMultipleEscalations": False, "customerSentiment": "neutral"}, + raw_response=json.dumps(raw_llm_response), + status="validation_failed", + error_message=f"LLM response validation failed: {e}", + last_processed_at=datetime.now(timezone.utc), + retry_count_increment=1, + next_retry_at=calculate_next_retry_time(record.retry_count + 1) if record.retry_count < settings.processor.max_retries else None + ) + if settings.langfuse.enabled and trace: + trace.end() + raise ValueError(f"Invalid LLM response format: {e}") from e + + logger.debug(f"[{issue_key}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}") + update_record_status( + db=db, + record_id=record_id, + analysis_result=raw_llm_response, + raw_response=json.dumps(raw_llm_response), + status="completed", + last_processed_at=datetime.now(timezone.utc), + next_retry_at=None # No retry needed on success + ) + if settings.langfuse.enabled and trace: + trace.end() + logger.info(f"[{issue_key}] Successfully processed and updated record {record_id}.") + + except Exception as e: + logger.error(f"[{issue_key}] LLM processing failed for record {record_id}: {str(e)}") + if settings.langfuse.enabled and llm_span: + llm_span.end() + + new_retry_count = record.retry_count + 1 + new_status = "failed" + next_retry = None + if new_retry_count <= settings.processor.max_retries: + next_retry = calculate_next_retry_time(new_retry_count) + new_status = "retrying" # Indicate that it will be retried + + update_record_status( + db=db, + record_id=record_id, + status=new_status, + error_message=f"LLM processing failed: {str(e)}", + last_processed_at=datetime.now(timezone.utc), + retry_count_increment=1, + next_retry_at=next_retry + ) + if settings.langfuse.enabled and trace: + trace.end() + logger.error(f"[{issue_key}] Record {record_id} status updated to '{new_status}'. Retry count: {new_retry_count}") + + +async def main_processor_loop(): + """Main loop for the Jira webhook processor.""" + logger.info("Starting Jira webhook processor.") + while True: # This loop will run indefinitely until the app shuts down + db: Session = SessionLocal() # Get a new session for each loop iteration + try: + # Fetch records that are 'pending' or 'retrying' and past their next_retry_at + # Order by created_at to process older requests first + pending_records = db.query(JiraAnalysis).filter( + (JiraAnalysis.status == "pending") | + ((JiraAnalysis.status == "retrying") & (JiraAnalysis.next_retry_at <= datetime.now(timezone.utc))) + ).order_by(JiraAnalysis.created_at.asc()).all() + + if not pending_records: + logger.debug(f"No pending or retrying records found. Sleeping for {settings.processor.poll_interval_seconds} seconds.") + + for record in pending_records: + # Update status to 'processing' immediately to prevent other workers from picking it up + update_record_status(db, record.id, "processing", last_processed_at=datetime.now(timezone.utc)) + db.refresh(record) # Refresh to get the latest state + await process_single_jira_request(db, record) + except Exception as e: + logger.error(f"Error in main processor loop: {str(e)}", exc_info=True) + finally: + db.close() # Ensure the session is closed + + await asyncio.sleep(settings.processor.poll_interval_seconds) # Use asyncio.sleep for non-blocking sleep + # Setup async-compatible signal handling def handle_shutdown_signal(signum, loop): """Graceful shutdown signal handler""" @@ -52,10 +239,6 @@ async def lifespan(app: FastAPI): Context manager for managing the lifespan of the FastAPI application. Initializes the database, sets up signal handlers, and handles cleanup. """ - # Initialize ThreadPoolExecutor for background tasks - executor = ThreadPoolExecutor(max_workers=settings.thread_pool_max_workers) - app.state.executor = executor - # Flag to track if initialization succeeded init_success = False @@ -73,6 +256,15 @@ async def lifespan(app: FastAPI): else: logger.info("Skipping signal handler configuration in test environment.") + # Start the background processor task only if not in a test environment + processor_task = None + if os.getenv("IS_TEST_ENV") != "true": + processor_task = asyncio.create_task(main_processor_loop()) + cleanup_tasks.append(processor_task) + logger.info("Background Jira processor started.") + else: + logger.info("Skipping background Jira processor in test environment.") + # Verify critical components if not hasattr(settings, 'langfuse_handler'): logger.error("Langfuse handler not found in settings") @@ -95,6 +287,17 @@ async def lifespan(app: FastAPI): if hasattr(loop, '_shutdown'): logger.info("Shutdown initiated, starting cleanup...") + # Cancel the processor task + if processor_task: + logger.info("Cancelling background Jira processor task...") + processor_task.cancel() + try: + await processor_task + except asyncio.CancelledError: + logger.info("Background Jira processor task cancelled.") + except Exception as e: + logger.error(f"Error cancelling processor task: {str(e)}") + # Close langfuse with retry if hasattr(settings, 'langfuse_handler') and hasattr(settings.langfuse_handler, 'close'): try: @@ -108,7 +311,10 @@ async def lifespan(app: FastAPI): # Execute any other cleanup tasks if cleanup_tasks: try: - await asyncio.gather(*cleanup_tasks) + # Filter out the processor_task if it's already handled + remaining_cleanup_tasks = [task for task in cleanup_tasks if task != processor_task] + if remaining_cleanup_tasks: + await asyncio.gather(*remaining_cleanup_tasks) except Exception as e: logger.error(f"Error during additional cleanup tasks: {str(e)}") def create_app(): @@ -161,69 +367,6 @@ from api.handlers import test_llm_endpoint app = create_app() - -async def process_jira_webhook_background(record_id: int, payload: JiraWebhookPayload): - """ - Background task to process Jira webhook and perform LLM analysis. - """ - try: - logger.info(f"Starting background processing for record ID: {record_id}, Issue Key: {payload.issueKey}") - except Exception as e: - logger.error(f"Failed to start background processing for record {record_id}: {str(e)}") - raise - - with get_db() as db: - try: - update_record_status(db, record_id, "processing") - - llm_input = { - "issueKey": payload.issueKey, - "summary": payload.summary, - "description": payload.description if payload.description else "No description provided.", - "status": payload.status if payload.status else "Unknown", - "labels": ", ".join(payload.labels) if payload.labels else "None", - "assignee": payload.assignee if payload.assignee else "Unassigned", - "updated": payload.updated if payload.updated else "Unknown", - "comment": payload.comment if payload.comment else "No new comment provided." - } - - analysis_result = await analysis_chain.ainvoke(llm_input) - - if not validate_response(analysis_result): - logger.warning(f"Invalid LLM response format for {payload.issueKey}") - analysis_result = { - "hasMultipleEscalations": False, - "customerSentiment": "neutral" - } - update_record_status(db, record_id, "failed", analysis_result=analysis_result, error_message="Invalid LLM response format") - logger.error(f"LLM processing failed for {payload.issueKey}: Invalid response format") - return - - update_record_status(db, record_id, "completed", analysis_result=analysis_result) - logger.info(f"Background processing completed for record ID: {record_id}, Issue Key: {payload.issueKey}") - - except Exception as e: - logger.error(f"Error during background processing for record ID {record_id}, Issue Key {payload.issueKey}: {str(e)}") - update_record_status(db, record_id, "failed", error_message=str(e)) - -def retry(max_retries: int = 3, delay: float = 1.0): - """Decorator for retrying failed operations""" - def decorator(func): - @wraps(func) - async def wrapper(*args, **kwargs): - last_error = None - for attempt in range(max_retries): - try: - return await func(*args, **kwargs) - except Exception as e: - last_error = e - logger.warning(f"Attempt {attempt + 1} failed: {str(e)}") - if attempt < max_retries - 1: - await asyncio.sleep(delay * (attempt + 1)) - raise last_error - return wrapper - return decorator - class ErrorResponse(BaseModel): error_id: str timestamp: str diff --git a/llm/chains.py b/llm/chains.py index 68592d0..6e17e4f 100644 --- a/llm/chains.py +++ b/llm/chains.py @@ -60,10 +60,14 @@ elif settings.llm.mode == 'ollama': # top_p=0.2 ) - # Test connection - logger.debug("Testing Ollama connection...") - llm.invoke("test") # Simple test request - logger.info("Ollama connection established successfully") + # Test connection only if not in a test environment + import os + if os.getenv("IS_TEST_ENV") != "true": + logger.debug("Testing Ollama connection...") + llm.invoke("test") # Simple test request + logger.info("Ollama connection established successfully") + else: + logger.info("Skipping Ollama connection test in test environment.") except Exception as e: error_msg = f"Failed to initialize Ollama: {str(e)}" @@ -91,7 +95,7 @@ if llm is None: parser = JsonOutputParser(pydantic_object=AnalysisFlags) # Load prompt template from file -def load_prompt_template(version="v1.1.0"): +def load_prompt_template(version="v1.2.0"): try: with open(f"llm/prompts/jira_analysis_{version}.txt", "r") as f: template_content = f.read() diff --git a/llm/models.py b/llm/models.py index 82acf84..3dc2ffe 100644 --- a/llm/models.py +++ b/llm/models.py @@ -1,4 +1,5 @@ from typing import Optional, List, Union +from enum import Enum from loguru import logger from pydantic import BaseModel, ConfigDict, field_validator, Field from config import settings @@ -7,6 +8,12 @@ class LLMResponse(BaseModel): status: str message: str +class CustomerSentiment(str, Enum): + NEUTRAL = "neutral" + FRUSTRATED = "frustrated" + CALM = "calm" + # Add other sentiments as needed + class JiraWebhookPayload(BaseModel): model_config = ConfigDict(alias_generator=lambda x: ''.join(word.capitalize() if i > 0 else word for i, word in enumerate(x.split('_'))), populate_by_name=True) @@ -29,7 +36,7 @@ class JiraWebhookPayload(BaseModel): class AnalysisFlags(BaseModel): hasMultipleEscalations: bool = Field(description="Is there evidence of multiple escalation attempts?") - customerSentiment: Optional[str] = Field(description="Overall customer sentiment (e.g., 'neutral', 'frustrated', 'calm').") + customerSentiment: Optional[CustomerSentiment] = Field(description="Overall customer sentiment (e.g., 'neutral', 'frustrated', 'calm').") def __init__(self, **data): super().__init__(**data) @@ -48,7 +55,7 @@ class AnalysisFlags(BaseModel): "model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model, "analysis_flags": { "hasMultipleEscalations": self.hasMultipleEscalations, - "customerSentiment": self.customerSentiment + "customerSentiment": self.customerSentiment.value if self.customerSentiment else None } } ).end() # End the trace immediately as it's just for tracking model usage diff --git a/llm/prompts/jira_analysis_v1.2.0.txt b/llm/prompts/jira_analysis_v1.2.0.txt new file mode 100644 index 0000000..018614a --- /dev/null +++ b/llm/prompts/jira_analysis_v1.2.0.txt @@ -0,0 +1,24 @@ +SYSTEM: +You are an AI assistant designed to analyze Jira ticket details containing email correspondence and extract key flags and sentiment, +outputting information in a strict JSON format. + +Your output MUST be ONLY a valid JSON object. Do NOT include any conversational text, explanations, or markdown outside the JSON. + +The JSON structure MUST follow this exact schema. If a field cannot be determined, use `null` for strings/numbers or empty list `[]` for arrays. + +- Determine if there are signs of multiple questions attempts asking to respond, and provide information from MDM HUB team. Questions directed to other teams are not considered. +-- Usually multiple requests one after another in span of days asking for immediate help of HUB team. Normal discussion, responses back and forth, are not considered as an escalation. +- Assess if the issue requires urgent attention based on language or context from the summary, description, or latest comment. +-- Usually means that Customer is asking for help due to upcoming deadlines, other high priority issues which are blocked due to our stall. + +USER: +Issue Key: {issueKey} +Summary: {summary} +Description: {description} +Status: {status} +Existing Labels: {labels} +Assignee: {assignee} +Last Updated: {updated} +Latest Comment (if applicable): {comment} + +{format_instructions} \ No newline at end of file diff --git a/tests/test_core.py b/tests/test_core.py index aec439c..4590060 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -164,7 +164,7 @@ def test_retry_analysis_record_endpoint_invalid_status(setup_db, test_client, mo response = test_client.post(f"/api/queue/{successful_record.id}/retry") assert response.status_code == 400 - assert response.json()["detail"] == f"Record status is 'success'. Only 'failed' or 'validation_failed' records can be retried." + assert response.json()["detail"] == f"Record status is 'success'. Only 'failed', 'processing' or 'validation_failed' records can be retried." def test_retry_analysis_record_endpoint_db_update_failure(setup_db, test_client, mock_full_jira_payload, monkeypatch): # Create a failed record