From 935a8a49aec0e15c78a2b05fc68ef73118de2d7b Mon Sep 17 00:00:00 2001 From: Ireneusz Bachanowicz Date: Thu, 17 Jul 2025 02:21:56 +0200 Subject: [PATCH] Almost stable tests --- =3.2.0 | 32 ------ Dockerfile | 2 +- api/handlers.py | 75 +++++++++++++ config.py | 65 ++++++----- database/crud.py | 76 +++++++++++++ database/database.py | 35 ++++++ database/models.py | 20 ++++ jira-webhook-llm.py | 204 ----------------------------------- jira_analyses.db | Bin 0 -> 36864 bytes jira_webhook_llm.py | 250 +++++++++++++++++++++++++++++++++++++++++++ llm/chains.py | 56 +++++----- llm/models.py | 10 +- logging_config.py | 61 ++++------- requirements.txt | 16 +-- tests/conftest.py | 72 ++++++++++++- tests/test_core.py | 30 ++++-- webhooks/handlers.py | 100 +++++++++++++---- 17 files changed, 734 insertions(+), 370 deletions(-) delete mode 100644 =3.2.0 create mode 100644 api/handlers.py create mode 100644 database/crud.py create mode 100644 database/database.py create mode 100644 database/models.py delete mode 100644 jira-webhook-llm.py create mode 100644 jira_analyses.db create mode 100644 jira_webhook_llm.py diff --git a/=3.2.0 b/=3.2.0 deleted file mode 100644 index d35e897..0000000 --- a/=3.2.0 +++ /dev/null @@ -1,32 +0,0 @@ -Requirement already satisfied: langfuse in ./venv/lib/python3.12/site-packages (3.1.3) -Requirement already satisfied: backoff>=1.10.0 in ./venv/lib/python3.12/site-packages (from langfuse) (2.2.1) -Requirement already satisfied: httpx<1.0,>=0.15.4 in ./venv/lib/python3.12/site-packages (from langfuse) (0.27.0) -Requirement already satisfied: opentelemetry-api<2.0.0,>=1.33.1 in ./venv/lib/python3.12/site-packages (from langfuse) (1.34.1) -Requirement already satisfied: opentelemetry-exporter-otlp<2.0.0,>=1.33.1 in ./venv/lib/python3.12/site-packages (from langfuse) (1.34.1) -Requirement already satisfied: opentelemetry-sdk<2.0.0,>=1.33.1 in ./venv/lib/python3.12/site-packages (from langfuse) (1.34.1) -Requirement already satisfied: packaging<25.0,>=23.2 in ./venv/lib/python3.12/site-packages (from langfuse) (24.2) -Requirement already satisfied: pydantic<3.0,>=1.10.7 in ./venv/lib/python3.12/site-packages (from langfuse) (2.9.0) -Requirement already satisfied: requests<3,>=2 in ./venv/lib/python3.12/site-packages (from langfuse) (2.32.4) -Requirement already satisfied: wrapt<2.0,>=1.14 in ./venv/lib/python3.12/site-packages (from langfuse) (1.17.2) -Requirement already satisfied: anyio in ./venv/lib/python3.12/site-packages (from httpx<1.0,>=0.15.4->langfuse) (4.9.0) -Requirement already satisfied: certifi in ./venv/lib/python3.12/site-packages (from httpx<1.0,>=0.15.4->langfuse) (2025.6.15) -Requirement already satisfied: httpcore==1.* in ./venv/lib/python3.12/site-packages (from httpx<1.0,>=0.15.4->langfuse) (1.0.9) -Requirement already satisfied: idna in ./venv/lib/python3.12/site-packages (from httpx<1.0,>=0.15.4->langfuse) (3.10) -Requirement already satisfied: sniffio in ./venv/lib/python3.12/site-packages (from httpx<1.0,>=0.15.4->langfuse) (1.3.1) -Requirement already satisfied: h11>=0.16 in ./venv/lib/python3.12/site-packages (from httpcore==1.*->httpx<1.0,>=0.15.4->langfuse) (0.16.0) -Requirement already satisfied: importlib-metadata<8.8.0,>=6.0 in ./venv/lib/python3.12/site-packages (from opentelemetry-api<2.0.0,>=1.33.1->langfuse) (8.7.0) -Requirement already satisfied: typing-extensions>=4.5.0 in ./venv/lib/python3.12/site-packages (from opentelemetry-api<2.0.0,>=1.33.1->langfuse) (4.14.1) -Requirement already satisfied: opentelemetry-exporter-otlp-proto-grpc==1.34.1 in ./venv/lib/python3.12/site-packages (from opentelemetry-exporter-otlp<2.0.0,>=1.33.1->langfuse) (1.34.1) -Requirement already satisfied: opentelemetry-exporter-otlp-proto-http==1.34.1 in ./venv/lib/python3.12/site-packages (from opentelemetry-exporter-otlp<2.0.0,>=1.33.1->langfuse) (1.34.1) -Requirement already satisfied: googleapis-common-protos~=1.52 in ./venv/lib/python3.12/site-packages (from opentelemetry-exporter-otlp-proto-grpc==1.34.1->opentelemetry-exporter-otlp<2.0.0,>=1.33.1->langfuse) (1.70.0) -Requirement already satisfied: grpcio<2.0.0,>=1.63.2 in ./venv/lib/python3.12/site-packages (from opentelemetry-exporter-otlp-proto-grpc==1.34.1->opentelemetry-exporter-otlp<2.0.0,>=1.33.1->langfuse) (1.73.1) -Requirement already satisfied: opentelemetry-exporter-otlp-proto-common==1.34.1 in ./venv/lib/python3.12/site-packages (from opentelemetry-exporter-otlp-proto-grpc==1.34.1->opentelemetry-exporter-otlp<2.0.0,>=1.33.1->langfuse) (1.34.1) -Requirement already satisfied: opentelemetry-proto==1.34.1 in ./venv/lib/python3.12/site-packages (from opentelemetry-exporter-otlp-proto-grpc==1.34.1->opentelemetry-exporter-otlp<2.0.0,>=1.33.1->langfuse) (1.34.1) -Requirement already satisfied: protobuf<6.0,>=5.0 in ./venv/lib/python3.12/site-packages (from opentelemetry-proto==1.34.1->opentelemetry-exporter-otlp-proto-grpc==1.34.1->opentelemetry-exporter-otlp<2.0.0,>=1.33.1->langfuse) (5.29.5) -Requirement already satisfied: opentelemetry-semantic-conventions==0.55b1 in ./venv/lib/python3.12/site-packages (from opentelemetry-sdk<2.0.0,>=1.33.1->langfuse) (0.55b1) -Requirement already satisfied: annotated-types>=0.4.0 in ./venv/lib/python3.12/site-packages (from pydantic<3.0,>=1.10.7->langfuse) (0.7.0) -Requirement already satisfied: pydantic-core==2.23.2 in ./venv/lib/python3.12/site-packages (from pydantic<3.0,>=1.10.7->langfuse) (2.23.2) -Requirement already satisfied: tzdata in ./venv/lib/python3.12/site-packages (from pydantic<3.0,>=1.10.7->langfuse) (2025.2) -Requirement already satisfied: charset_normalizer<4,>=2 in ./venv/lib/python3.12/site-packages (from requests<3,>=2->langfuse) (3.4.2) -Requirement already satisfied: urllib3<3,>=1.21.1 in ./venv/lib/python3.12/site-packages (from requests<3,>=2->langfuse) (2.5.0) -Requirement already satisfied: zipp>=3.20 in ./venv/lib/python3.12/site-packages (from importlib-metadata<8.8.0,>=6.0->opentelemetry-api<2.0.0,>=1.33.1->langfuse) (3.23.0) diff --git a/Dockerfile b/Dockerfile index 4a29d22..1725a8a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,7 +42,7 @@ ENV PYTHONDONTWRITEBYTECODE=1 \ COPY config ./config # Copy your application source code. -COPY jira-webhook-llm.py . +COPY jira_webhook_llm.py . COPY config.py . # Expose the port your application listens on. diff --git a/api/handlers.py b/api/handlers.py new file mode 100644 index 0000000..5eaf8fd --- /dev/null +++ b/api/handlers.py @@ -0,0 +1,75 @@ +from fastapi import APIRouter, Request, HTTPException, Depends +from fastapi.responses import JSONResponse +from typing import Dict, Any +import config +from llm.models import LLMResponse +from database.database import get_db_session # Removed Session import here +from sqlalchemy.orm import Session # Added correct SQLAlchemy import +from database.crud import get_all_analysis_records, delete_all_analysis_records, get_analysis_by_id + +router = APIRouter( + prefix="/api", + tags=["API"] +) + + +@router.get("/requests") +async def get_analysis_records_endpoint(db: Session = Depends(get_db_session)): + """Get analysis records""" + try: + records = get_all_analysis_records(db) + return JSONResponse( + status_code=200, + content={"data": records} + ) + except Exception as e: + return JSONResponse( + status_code=500, + content={"error": str(e)} + ) + +@router.post("/test-llm") +async def test_llm_endpoint(db: Session = Depends(get_db_session)): + """Test endpoint for LLM integration""" + try: + from llm.chains import llm + test_prompt = "What is 1 + 1? Respond only with the number." + response = llm.invoke(test_prompt) + + return { + "status": "success", + "message": "LLM integration test successful", + "response": str(response) + } + except Exception as e: + return JSONResponse( + status_code=500, + content={ + "status": "error", + "message": f"LLM test failed: {str(e)}" + } + ) + +@router.delete("/requests") +async def delete_analysis_records_endpoint(db: Session = Depends(get_db_session)): + """Delete analysis records""" + try: + deleted_count = delete_all_analysis_records(db) + return JSONResponse( + status_code=200, + content={"message": f"Successfully deleted {deleted_count} records", "deleted_count": deleted_count} + ) + except Exception as e: + return JSONResponse( + status_code=500, + content={"error": str(e)}) +@router.get("/requests/{record_id}") +async def get_analysis_record_endpoint(record_id: int, db: Session = Depends(get_db_session)): + """Get specific analysis record by ID""" + record = get_analysis_by_id(db, record_id) + if not record: + raise HTTPException(status_code=404, detail="Analysis record not found") + return JSONResponse( + status_code=200, + content=record.dict() # Ensure proper data serialization + ) \ No newline at end of file diff --git a/config.py b/config.py index b1d4bcb..86b125a 100644 --- a/config.py +++ b/config.py @@ -1,11 +1,12 @@ import os import sys +import traceback from typing import Optional from pydantic_settings import BaseSettings -from pydantic import validator, ConfigDict +from pydantic import field_validator, ConfigDict from loguru import logger from watchfiles import watch, Change -from threading import Thread +from threading import Thread, Event from langfuse import Langfuse from langfuse.langchain import CallbackHandler import yaml @@ -17,7 +18,7 @@ class LangfuseConfig(BaseSettings): secret_key: Optional[str] = None host: Optional[str] = None - @validator('host') + @field_validator('host') def validate_host(cls, v): if v and not v.startswith(('http://', 'https://')): raise ValueError("Langfuse host must start with http:// or https://") @@ -73,7 +74,7 @@ class LLMConfig(BaseSettings): ollama_base_url: Optional[str] = None ollama_model: Optional[str] = None - @validator('mode') + @field_validator('mode') def validate_mode(cls, v): if v not in ['openai', 'ollama']: raise ValueError("LLM mode must be either 'openai' or 'ollama'") @@ -86,14 +87,27 @@ class LLMConfig(BaseSettings): extra='ignore' ) +class ApiConfig(BaseSettings): + api_key: Optional[str] = None + + model_config = ConfigDict( + env_prefix='API_', + env_file='.env', + env_file_encoding='utf-8', + extra='ignore' + ) + class Settings: + logging_ready = Event() # Event to signal logging is configured + def __init__(self): try: + # logger.debug(f"Config initialization started from: {''.join(traceback.format_stack())}") logger.info("Loading configuration from application.yml and environment variables") # Load configuration from YAML file yaml_config = self._load_yaml_config() - logger.info("Loaded YAML config: {}", yaml_config) # Add this log line + logger.info("Loaded YAML config: {}", yaml_config) # Initialize configurations, allowing environment variables to override YAML logger.info("Initializing LogConfig") @@ -108,6 +122,15 @@ class Settings: self.langfuse = LangfuseConfig(**yaml_config.get('langfuse', {})) logger.info("LangfuseConfig initialized: {}", self.langfuse.model_dump()) + logger.info("Initializing ApiConfig") + self.api = ApiConfig(**yaml_config.get('api', {})) + logger.info("ApiConfig initialized: {}", self.api.model_dump()) + + # Add thread_pool_max_workers + self.thread_pool_max_workers = yaml_config.get('application', {}).get('thread_pool_max_workers', 5) + logger.info("ThreadPool max workers set to: {}", self.thread_pool_max_workers) + logger.debug(f"Thread pool initialized with {self.thread_pool_max_workers} workers") + logger.info("Validating configuration") self._validate() logger.info("Starting config watcher") @@ -179,29 +202,14 @@ class Settings: logger.error("Langfuse connection test failed: {}", e) raise - # Initialize CallbackHandler + # Initialize CallbackHandler with debug logging + logger.debug("Langfuse client attributes: {}", vars(self.langfuse_client)) try: - self.langfuse_handler = CallbackHandler( - public_key=self.langfuse.public_key, - secret_key=self.langfuse.secret_key, - host=self.langfuse.host - ) - except TypeError: - try: - # Fallback for older versions of langfuse.langchain.CallbackHandler - self.langfuse_handler = CallbackHandler( - public_key=self.langfuse.public_key, - host=self.langfuse.host - ) - logger.warning("Using fallback CallbackHandler initialization - secret_key parameter not supported") - except TypeError: - # Fallback for even older versions - self.langfuse_handler = CallbackHandler( - public_key=self.langfuse.public_key - ) - logger.warning("Using minimal CallbackHandler initialization - only public_key parameter supported") - logger.info("Langfuse client and handler initialized successfully") - + self.langfuse_handler = CallbackHandler() + logger.debug("CallbackHandler initialized successfully") + except Exception as e: + logger.error("CallbackHandler initialization failed: {}", e) + raise logger.info("Langfuse client and handler initialized successfully") except ValueError as e: logger.warning("Langfuse configuration error: {}. Disabling Langfuse.", e) @@ -212,6 +220,9 @@ class Settings: def _start_watcher(self): def watch_config(): + # Wait for logging to be fully configured + self.logging_ready.wait() + for changes in watch('config/application.yml'): for change in changes: if change[0] == Change.modified: diff --git a/database/crud.py b/database/crud.py new file mode 100644 index 0000000..1200c46 --- /dev/null +++ b/database/crud.py @@ -0,0 +1,76 @@ +from loguru import logger +from sqlalchemy.orm import Session +from datetime import datetime +import json +from typing import Dict, Any, Optional + +from database.models import JiraAnalysis +from llm.models import JiraWebhookPayload + +def create_analysis_record(db: Session, payload: JiraWebhookPayload) -> JiraAnalysis: + """Creates a new Jira analysis record in the database.""" + db_analysis = JiraAnalysis( + issue_key=payload.issueKey, + project_key=payload.projectKey, + status="pending", + issue_summary=payload.summary, + request_payload=payload.model_dump(), + created_at=datetime.utcnow(), + updated_at=datetime.utcnow() + ) + db.add(db_analysis) + db.commit() + db.refresh(db_analysis) + return db_analysis + +def get_analysis_record(db: Session, issue_key: str) -> Optional[JiraAnalysis]: + """Retrieves the latest analysis record for a given Jira issue key.""" + logger.debug(f"Attempting to retrieve analysis record for issue key: {issue_key}") + record = db.query(JiraAnalysis).filter(JiraAnalysis.issue_key == issue_key).order_by(JiraAnalysis.created_at.desc()).first() + if record: + logger.debug(f"Found analysis record for {issue_key}: {record.id}") + else: + logger.debug(f"No analysis record found for {issue_key}") + return record + +def update_analysis_record( + db: Session, + record_id: int, + status: str, + analysis_result: Optional[Dict[str, Any]] = None, + error_message: Optional[str] = None, + raw_response: Optional[Dict[str, Any]] = None +) -> Optional[JiraAnalysis]: + """Updates an existing Jira analysis record.""" + db_analysis = db.query(JiraAnalysis).filter(JiraAnalysis.id == record_id).first() + if db_analysis: + db_analysis.status = status + db_analysis.updated_at = datetime.utcnow() + if analysis_result: + db_analysis.analysis_result = analysis_result + if error_message: + db_analysis.error_message = error_message + if raw_response: + db_analysis.raw_response = json.dumps(raw_response) + + db.commit() + db.refresh(db_analysis) + return db_analysis + +def get_all_analysis_records(db: Session) -> list[JiraAnalysis]: + """Retrieves all analysis records from the database.""" + return db.query(JiraAnalysis).all() + +def get_analysis_by_id(db: Session, record_id: int) -> Optional[JiraAnalysis]: + """Retrieves an analysis record by its unique database ID.""" + return db.query(JiraAnalysis).filter(JiraAnalysis.id == record_id).first() + +def delete_all_analysis_records(db: Session) -> int: + """Deletes all analysis records from the database and returns count of deleted records.""" + count = db.query(JiraAnalysis).count() + db.query(JiraAnalysis).delete() + db.commit() + return count + db.commit() + db.query(JiraAnalysis).delete() + db.commit() \ No newline at end of file diff --git a/database/database.py b/database/database.py new file mode 100644 index 0000000..932b28a --- /dev/null +++ b/database/database.py @@ -0,0 +1,35 @@ +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker, Session +from sqlalchemy.ext.declarative import declarative_base +from contextlib import contextmanager +from loguru import logger + +from database.models import Base + +DATABASE_URL = "sqlite:///./jira_analyses.db" + +engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +def init_db(): + """Initializes the database by creating all tables.""" + logger.info("Initializing database...") + Base.metadata.create_all(bind=engine) + logger.info("Database initialized successfully.") + +@contextmanager +def get_db(): + """Context manager to get a database session.""" + db = SessionLocal() + try: + yield db + finally: + db.close() + +def get_db_session(): + """FastAPI dependency to get a database session.""" + db = SessionLocal() + try: + yield db + finally: + db.close() \ No newline at end of file diff --git a/database/models.py b/database/models.py new file mode 100644 index 0000000..2429c86 --- /dev/null +++ b/database/models.py @@ -0,0 +1,20 @@ +from sqlalchemy import Column, Integer, String, DateTime, Text, JSON +from datetime import datetime + +from sqlalchemy.orm import declarative_base +Base = declarative_base() + +class JiraAnalysis(Base): + __tablename__ = "jira_analyses" + + id = Column(Integer, primary_key=True, index=True) + issue_key = Column(String, index=True, nullable=False) + project_key = Column(String, index=True, nullable=False) + status = Column(String, default="pending", nullable=False) # pending, processing, completed, failed + issue_summary = Column(Text, nullable=False) + request_payload = Column(JSON, nullable=False) # Store the original Jira webhook payload + analysis_result = Column(JSON, nullable=True) # Store the structured LLM output + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + error_message = Column(Text, nullable=True) # To store any error messages + raw_response = Column(JSON, nullable=True) # Store raw LLM response before validation \ No newline at end of file diff --git a/jira-webhook-llm.py b/jira-webhook-llm.py deleted file mode 100644 index 903aa4f..0000000 --- a/jira-webhook-llm.py +++ /dev/null @@ -1,204 +0,0 @@ -import os -from dotenv import load_dotenv -load_dotenv() - -from fastapi import FastAPI, Request, HTTPException -from pydantic import BaseModel -from fastapi.responses import JSONResponse -from loguru import logger -import uuid -import sys -from typing import Optional -from datetime import datetime -import asyncio -from functools import wraps, partial - -from config import settings -from webhooks.handlers import JiraWebhookHandler -from llm.models import JiraWebhookPayload -from logging_config import configure_logging - -# Initialize logging first -configure_logging(log_level="DEBUG") - -import signal - -from contextlib import asynccontextmanager - - - -# Setup async-compatible signal handling -def handle_shutdown_signal(signum, loop): - """Graceful shutdown signal handler""" - logger.info(f"Received signal {signum}, initiating shutdown...") - # Set shutdown flag and remove signal handlers to prevent reentrancy - if not hasattr(loop, '_shutdown'): - loop._shutdown = True - - # Prevent further signal handling - for sig in (signal.SIGTERM, signal.SIGINT): - loop.remove_signal_handler(sig) -@asynccontextmanager -async def lifespan(app: FastAPI): - """Handle startup and shutdown events""" - # Startup - try: - logger.info("Initializing application...") - - # Initialize event loop - loop = asyncio.get_running_loop() - logger.debug("Event loop initialized") - - # Setup signal handlers - for sig in (signal.SIGTERM, signal.SIGINT): - loop.add_signal_handler(sig, partial(handle_shutdown_signal, sig, loop)) - logger.info("Signal handlers configured successfully") - - # Verify critical components - if not hasattr(settings, 'langfuse_handler'): - logger.error("Langfuse handler not found in settings") - raise RuntimeError("Langfuse handler not initialized") - - logger.info("Application initialized successfully") - yield - - # Check shutdown flag before cleanup - loop = asyncio.get_running_loop() - if hasattr(loop, '_shutdown'): - logger.info("Shutdown initiated, starting cleanup...") - except Exception as e: - logger.critical(f"Application initialization failed: {str(e)}") - raise - finally: - # Shutdown - logger.info("Shutting down application...") - try: - # Cleanup sequence with async safety - cleanup_tasks = [] - shutdown_success = True - - # Close langfuse with retry - if hasattr(settings, 'langfuse_handler') and hasattr(settings.langfuse_handler, 'close'): - async def close_langfuse(): - try: - await asyncio.wait_for(settings.langfuse_handler.close(), timeout=5.0) - logger.info("Langfuse client closed successfully") - except asyncio.TimeoutError: - logger.warning("Timeout while closing Langfuse client") - except Exception as e: - logger.error(f"Error closing Langfuse client: {str(e)}") - cleanup_tasks.append(close_langfuse()) - - # Remove confirm_shutdown entirely - # Execute all cleanup tasks with timeout - try: - await asyncio.wait_for(asyncio.gather(*cleanup_tasks), timeout=10.0) - except asyncio.TimeoutError: - logger.warning("Timeout during cleanup sequence") - loop.stop() # Explicit loop stop after cleanup - # Cancel all pending tasks - async def cancel_pending_tasks(): - try: - pending = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] - for task in pending: - task.cancel() - await asyncio.gather(*pending, return_exceptions=True) - logger.info("Pending tasks cancelled successfully") - except Exception as e: - logger.error(f"Error cancelling pending tasks: {str(e)}") - cleanup_tasks.append(cancel_pending_tasks()) - - # Execute all cleanup tasks with timeout - try: - await asyncio.wait_for(asyncio.gather(*cleanup_tasks), timeout=10.0) - except asyncio.TimeoutError: - logger.warning("Timeout during cleanup sequence") - loop.stop() # Add explicit loop stop after cleanup - - except Exception as e: - logger.error(f"Error during shutdown: {str(e)}") - raise - -# Initialize FastAPI app after lifespan definition -app = FastAPI(lifespan=lifespan) - -def retry(max_retries: int = 3, delay: float = 1.0): - """Decorator for retrying failed operations""" - def decorator(func): - @wraps(func) - async def wrapper(*args, **kwargs): - last_error = None - for attempt in range(max_retries): - try: - return await func(*args, **kwargs) - except Exception as e: - last_error = e - logger.warning(f"Attempt {attempt + 1} failed: {str(e)}") - if attempt < max_retries - 1: - await asyncio.sleep(delay * (attempt + 1)) - raise last_error - return wrapper - return decorator - -class ErrorResponse(BaseModel): - error_id: str - timestamp: str - status_code: int - message: str - details: Optional[str] = None - -@app.middleware("http") -async def error_handling_middleware(request: Request, call_next): - request_id = str(uuid.uuid4()) - logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}") - - try: - response = await call_next(request) - return response - except HTTPException as e: - logger.error(f"HTTP Error: {e.status_code} - {e.detail}") - error_response = ErrorResponse( - error_id=request_id, - timestamp=datetime.utcnow().isoformat(), - status_code=e.status_code, - message=e.detail, - details=str(e) - ) - return JSONResponse(status_code=e.status_code, content=error_response.model_dump()) - except Exception as e: - logger.error(f"Unexpected error: {str(e)}") - error_response = ErrorResponse( - error_id=request_id, - timestamp=datetime.utcnow().isoformat(), - status_code=500, - message="Internal Server Error", - details=str(e) - ) - return JSONResponse(status_code=500, content=error_response.model_dump()) -webhook_handler = JiraWebhookHandler() - -@app.post("/jira-webhook") -async def jira_webhook_handler(payload: JiraWebhookPayload): - logger.info(f"Received webhook payload: {payload.model_dump()}") - try: - response = await webhook_handler.handle_webhook(payload) - logger.info(f"Webhook processed successfully") - return response - except Exception as e: - logger.error(f"Error processing webhook: {str(e)}") - raise HTTPException(status_code=500, detail=str(e)) - -@app.post("/test-llm") -async def test_llm(): - """Test endpoint for LLM integration""" - test_payload = JiraWebhookPayload( - issueKey="TEST-123", - summary="Test issue", - description="This is a test issue for LLM integration", - comment="Testing OpenAI integration with Langfuse", - labels=["test"], - status="Open", - assignee="Tester", - updated="2025-07-04T21:40:00Z" - ) - return await webhook_handler.handle_webhook(test_payload) diff --git a/jira_analyses.db b/jira_analyses.db new file mode 100644 index 0000000000000000000000000000000000000000..52d57ae29634a88b3ae77a95ee64e992139ddcca GIT binary patch literal 36864 zcmeI5&u<&Y6~{?YiY%uIb196VFz}!rWGOVW%U>edstzU5m26RRC|a@Mx{KlNP~3RA z%kB?KwBdt`UV5wlg8nPL_0&VIJqIXS^w0uDFYTe}tejbBWF2`~XBzyz286JP>NfC(@GCh#^1bTjW=DK4eQwrj~_``Gx> z_Jtu_;fwXT4(9XTanDlu~+t;wo^K3f5Qsxbo#wrcs`XBLyVYpl zr`yf@@qk@ko`cTKX8)=*Zz21y>_4-ApCsOFKw{YUms**|9gfKz@k0Vco%m;e)C0!)AjFaajO z1egF5U;^(Lf$wFmrnmN6*K6fkaqe_t_H-h1Ix+Kp=ITOHX0rbOI+gu5e*6Dh_AlA5 z-?0vIbC>`VU;<2l2`~XBzyz286JP>NfC(^xZ$w}&lTOdgspIUdI%YEJI5U&UEG($l z{Pet)!XLkw025#WOn?b6fj3Ct`N!$^Q&&I!?d?>f@$4}1y!V9dNbAXB99wu>ju-2+ znB*v8#bqr)4eQIu4Lvf&FpPXbmZYI@L*Y1rOwT36Z`!6XQC0@XbkxNc zEtp6>I43nvJ09*9A&!W7jAiLAilS|%x;Uo5b8ORw;GySJPmFLBLvM_6cK5a#VuJn<2 zrksk?tW!ZNl&@>6*R@hdFV_oty;RFrkM~dU#Be(-JY~QM_r5(Iui;X9mvs+ zNQgz&OQa`|FpK~yNxCLVh3Gs+7mpXw4P}m8Pn8`h zr8&7l6_3$=dgrF+o3eeJrw5;{3t)D=BinqOqn0hr$KwElqn8Teh^}q>^3n3c9CiA3 zK(Qj@h$EAROuv0HIG^l?P|%Pbgh7( zg#k`%_mIY36c;*)d8L6E4%KzkkE|T|_F+G~L3?g|mxua&LZdAu3e5o=I|Qn}7daMn zq3VdW#YL#rbcgTd=?m2aGr6%3LDE&CaBEzb3Ai_CL%4CHf@lau4$%YE7Sj>F-5aY$ z0Bs!i3j+#nsD8PR;sU5fSR<$uMquTH?)CCq~ISeT(Ro@fO|RH};uD5qj(P3fZA_KdbR?sT?0`9iHw!pOsL)W!V+r3J96 zah%+9C1z+CV+6y<)AkY!$Lve}IP#k)x%!m-54y8>j$gyF4UN6A`qg0}{mj z?7)mBBq3JP5vE3EQKmOS(#k_S50$dMYO0`9Dnu(uZ$l{tD|8(%dhqj`Ul_*D-a%lf zaj`>38as_nqh&xq07W+RveB9hrg0EYD8}756h1Ep&O-dqf#TPpS5)aeVqL&>4@N}?4 zfE@z=%MO7ji||^v;10Eij%)^|fU8vb=io@_sb#xzRYyVS4Wz%X9G8jH+LaMpyv_w4 z1-+Qpi?vFraL%KEoMNR|t`@dk_+GKK;m%GAP69X%T=)l+XF-I#{y+cqjIS^OCcp%k z026pC1fE~Y@cN%02$=q{R6F_clOG7+^*^uwC(q{a0|ER%z<29`fH$}PuaKtIb-k`> zd96^aR;w@8|4B}Eh{^#{S2Ly0H z00#tcKtLQ{^nVc$@J84FwR*8oFO~AORjpFgPp|)xQ$+AtUf0Scz53eq|5vH`ufEkc z7`~YaFaajO1egF5c$WzLCO3CEb@d1FW&^1tv_OSaBQlUT8{o|bc(VcZKVtJ{1Jm1E naBKm`7Vu^R-<8b= 400: + error_response = ErrorResponse( + error_id=request_id, + timestamp=datetime.now(timezone.utc).isoformat(), + status_code=response.status_code, + message=HTTPStatus(response.status_code).phrase, + details="Endpoint not found or invalid request" + ) + return JSONResponse(status_code=response.status_code, content=error_response.model_dump()) + return response + except HTTPException as e: + logger.error(f"HTTP Error: {e.status_code} - {e.detail}") + error_response = ErrorResponse( + error_id=request_id, + timestamp=datetime.now(timezone.utc).isoformat(), + status_code=e.status_code, + message=e.detail, + details=str(e) + ) + return JSONResponse(status_code=e.status_code, content=error_response.model_dump()) + except Exception as e: + logger.error(f"Unexpected error: {str(e)}") + error_response = ErrorResponse( + error_id=request_id, + timestamp=datetime.now(timezone.utc).isoformat(), + status_code=500, + message="Internal Server Error", + details=str(e) + ) + return JSONResponse(status_code=500, content=error_response.model_dump()) + + return _app + +app = create_app() + +async def process_jira_webhook_background(record_id: int, payload: JiraWebhookPayload): + """ + Background task to process Jira webhook and perform LLM analysis. + """ + try: + logger.info(f"Starting background processing for record ID: {record_id}, Issue Key: {payload.issueKey}") + except Exception as e: + logger.error(f"Failed to start background processing for record {record_id}: {str(e)}") + raise + + with get_db() as db: + try: + update_analysis_record(db, record_id, "processing") + + llm_input = { + "issueKey": payload.issueKey, + "summary": payload.summary, + "description": payload.description if payload.description else "No description provided.", + "status": payload.status if payload.status else "Unknown", + "labels": ", ".join(payload.labels) if payload.labels else "None", + "assignee": payload.assignee if payload.assignee else "Unassigned", + "updated": payload.updated if payload.updated else "Unknown", + "comment": payload.comment if payload.comment else "No new comment provided." + } + + analysis_result = await analysis_chain.ainvoke(llm_input) + + if not validate_response(analysis_result): + logger.warning(f"Invalid LLM response format for {payload.issueKey}") + analysis_result = { + "hasMultipleEscalations": False, + "customerSentiment": "neutral" + } + update_analysis_record(db, record_id, "failed", analysis_result=analysis_result, error_message="Invalid LLM response format") + logger.error(f"LLM processing failed for {payload.issueKey}: Invalid response format") + return + + update_analysis_record(db, record_id, "completed", analysis_result=analysis_result) + logger.info(f"Background processing completed for record ID: {record_id}, Issue Key: {payload.issueKey}") + + except Exception as e: + logger.error(f"Error during background processing for record ID {record_id}, Issue Key {payload.issueKey}: {str(e)}") + update_analysis_record(db, record_id, "failed", error_message=str(e)) + +def retry(max_retries: int = 3, delay: float = 1.0): + """Decorator for retrying failed operations""" + def decorator(func): + @wraps(func) + async def wrapper(*args, **kwargs): + last_error = None + for attempt in range(max_retries): + try: + return await func(*args, **kwargs) + except Exception as e: + last_error = e + logger.warning(f"Attempt {attempt + 1} failed: {str(e)}") + if attempt < max_retries - 1: + await asyncio.sleep(delay * (attempt + 1)) + raise last_error + return wrapper + return decorator + +class ErrorResponse(BaseModel): + error_id: str + timestamp: str + status_code: int + message: str + details: Optional[str] = None + diff --git a/llm/chains.py b/llm/chains.py index 8a95245..68592d0 100644 --- a/llm/chains.py +++ b/llm/chains.py @@ -1,3 +1,12 @@ +import json +from typing import Union +from loguru import logger +from langchain_core.prompts import ChatPromptTemplate +from langchain_core.output_parsers import JsonOutputParser +from langchain_core.runnables import RunnablePassthrough +from llm.models import AnalysisFlags +from config import settings +import json from typing import Union from langchain_ollama import OllamaLLM from langchain_openai import ChatOpenAI @@ -53,7 +62,7 @@ elif settings.llm.mode == 'ollama': # Test connection logger.debug("Testing Ollama connection...") - # llm.invoke("test") # Simple test request + llm.invoke("test") # Simple test request logger.info("Ollama connection established successfully") except Exception as e: @@ -88,8 +97,13 @@ def load_prompt_template(version="v1.1.0"): template_content = f.read() # Split system and user parts - system_template, user_template = template_content.split("\n\nUSER:\n") - system_template = system_template.replace("SYSTEM:\n", "").strip() + if "\n\nUSER:\n" in template_content: + system_template, user_template = template_content.split("\n\nUSER:\n") + system_template = system_template.replace("SYSTEM:\n", "").strip() + else: + # Handle legacy format + system_template = template_content + user_template = "Analyze this Jira ticket: {issueKey}" return ChatPromptTemplate.from_messages([ SystemMessagePromptTemplate.from_template(system_template), @@ -152,40 +166,32 @@ def create_analysis_chain(): analysis_chain = create_analysis_chain() # Enhanced response validation function -def validate_response(response: Union[dict, str]) -> bool: +def validate_response(response: Union[dict, str], issue_key: str = "N/A") -> bool: """Validate the JSON response structure and content""" try: # If response is a string, attempt to parse it as JSON if isinstance(response, str): + logger.debug(f"[{issue_key}] Raw LLM response (string): {response}") try: response = json.loads(response) - except json.JSONDecodeError: - logger.error(f"Invalid JSON response: {response}") - raise ValueError("Invalid JSON response format") - + except json.JSONDecodeError as e: + logger.error(f"[{issue_key}] JSONDecodeError: {e}. Raw response: {response}") + return False + # Ensure response is a dictionary if not isinstance(response, dict): + logger.error(f"[{issue_key}] Response is not a dictionary: {type(response)}") return False - - # Check required fields - required_fields = ["hasMultipleEscalations", "customerSentiment"] - if not all(field in response for field in required_fields): - return False - - # Validate field types - if not isinstance(response["hasMultipleEscalations"], bool): - return False - - if response["customerSentiment"] is not None: - if not isinstance(response["customerSentiment"], str): - return False - + + logger.debug(f"[{issue_key}] Parsed LLM response (JSON): {json.dumps(response)}") + # Validate against schema using AnalysisFlags model try: AnalysisFlags.model_validate(response) return True - except Exception: + except Exception as e: + logger.error(f"[{issue_key}] Pydantic validation error: {e}. Invalid response: {response}") return False - - except Exception: + except Exception as e: + logger.error(f"[{issue_key}] Unexpected error during response validation: {e}. Response: {response}") return False \ No newline at end of file diff --git a/llm/models.py b/llm/models.py index e094af6..f15d3cc 100644 --- a/llm/models.py +++ b/llm/models.py @@ -1,11 +1,17 @@ from typing import Optional, List, Union +from loguru import logger from pydantic import BaseModel, ConfigDict, field_validator, Field from config import settings +class LLMResponse(BaseModel): + status: str + message: str + class JiraWebhookPayload(BaseModel): model_config = ConfigDict(alias_generator=lambda x: ''.join(word.capitalize() if i > 0 else word for i, word in enumerate(x.split('_'))), populate_by_name=True) - + issueKey: str + projectKey: Optional[str] = None # Added missing field summary: str description: Optional[str] = None comment: Optional[str] = None @@ -17,7 +23,7 @@ class JiraWebhookPayload(BaseModel): if isinstance(v, str): return [v] return v or [] - + status: Optional[str] = None assignee: Optional[str] = None updated: Optional[str] = None diff --git a/logging_config.py b/logging_config.py index ae3b881..75e3190 100644 --- a/logging_config.py +++ b/logging_config.py @@ -4,6 +4,7 @@ from pathlib import Path from datetime import datetime from typing import Optional from loguru import logger +from config import Settings # Basic fallback logging configuration logger.remove() @@ -13,7 +14,6 @@ def configure_logging(log_level: str = "INFO", log_dir: Optional[str] = None): """Configure structured logging for the application with fallback handling""" try: # Log that we're attempting to configure logging - logger.warning("Attempting to configure logging...") # Default log directory if not log_dir: @@ -51,52 +51,27 @@ def configure_logging(log_level: str = "INFO", log_dir: Optional[str] = None): ) # Configure default extras - logger.configure(extra={"request_id": "N/A"}) + # Configure thread-safe defaults + logger.configure( + extra={"request_id": "N/A"}, + patcher=lambda record: record["extra"].update( + thread_id = record["thread"].id if hasattr(record.get("thread"), 'id') else "main" + ) + ) logger.info("Logging configured successfully") + settings = Settings() + # Removed duplicate logging_ready.set() call + logger.debug("Signaled logging_ready event") except Exception as e: # Fallback to basic logging if configuration fails logger.remove() logger.add(sys.stderr, level="WARNING", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}") logger.error(f"Failed to configure logging: {str(e)}. Using fallback logging configuration.") - """Configure structured logging for the application""" - - # Default log directory - if not log_dir: - log_dir = os.getenv("LOG_DIR", "logs") - - # Create log directory if it doesn't exist - Path(log_dir).mkdir(parents=True, exist_ok=True) - - # Log file path with timestamp - log_file = Path(log_dir) / f"jira-webhook-llm_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" - - # Remove any existing loggers - logger.remove() - - # Add console logger - logger.add( - sys.stdout, - level=log_level, - format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {extra[request_id]} | {message}", - colorize=True, - backtrace=True, - diagnose=True - ) - - # Add file logger - logger.add( - str(log_file), - level=log_level, - format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {extra[request_id]} | {message}", - rotation="100 MB", - retention="30 days", - compression="zip", - backtrace=True, - diagnose=True - ) - - # Configure default extras - logger.configure(extra={"request_id": "N/A"}) - - logger.info("Logging configured successfully") \ No newline at end of file + settings = Settings() + try: + settings.logging_ready.set() + logger.debug("Signaled logging_ready event") + except Exception as inner_e: + logger.error(f"Failed to signal logging_ready: {str(inner_e)}") + raise # Re-raise the original exception \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index d4771f7..f820755 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,11 @@ fastapi==0.111.0 -pydantic>=2.9.0 +pydantic==2.7.1 pydantic-settings>=2.0.0 -langchain==0.3.26 -langchain-ollama==0.3.3 -langchain-openai==0.3.27 -langchain-core==0.3.68 -langfuse==3.1.3 +langchain>=0.1.0 +langchain-ollama>=0.1.0 +langchain-openai>=0.1.0 +langchain-core>=0.1.0 +langfuse>=3.0.0 uvicorn==0.30.1 python-multipart==0.0.9 # Good to include for FastAPI forms loguru==0.7.3 @@ -16,4 +16,6 @@ pytest==8.2.0 pytest-asyncio==0.23.5 pytest-cov==4.1.0 httpx==0.27.0 -PyYAML>=6.0.2 \ No newline at end of file +PyYAML>=6.0.2 +SQLAlchemy==2.0.30 +alembic==1.13.1 \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 86f535e..b17cd59 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,14 +1,70 @@ import pytest from fastapi.testclient import TestClient -from jira_webhook_llm import app +from jira_webhook_llm import create_app, app # Assuming app is created via factory +from database.database import engine, Base # Add import + +from sqlalchemy.orm import sessionmaker +import os +from sqlalchemy import create_engine + +@pytest.fixture(scope="function") +def setup_db(monkeypatch): + # Use in-memory SQLite for tests + test_db_url = "sqlite:///:memory:" + monkeypatch.setenv("DATABASE_URL", test_db_url) + + from database import database as db + from database.models import Base # Import Base from models + + test_engine = create_engine(test_db_url, connect_args={"check_same_thread": False}) + # Update the global engine and SessionLocal + db.engine = test_engine + db.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=test_engine) + + # Create all tables + Base.metadata.create_all(bind=test_engine) + + yield test_engine + + # Cleanup + Base.metadata.drop_all(bind=test_engine) + +@pytest.fixture +def mock_full_jira_payload(setup_db): + mock_data = { + "issueKey": "PROJ-123", + "projectKey": "PROJ", + "summary": "Test Issue", + "description": "Test Description", + "comment": "Test Comment", + "labels": ["test"], + "status": "open", + "assignee": "Tester", + "updated": "2025-07-13T12:00:00Z", + "payloadData": {"key1": "value1"} + } + return mock_data @pytest.fixture -def test_client(): +def test_client(setup_db): + # Ensure the database is set up before creating the app + test_engine = setup_db + + # Import and patch the database module to use test database + from database import database as db + db.engine = test_engine + db.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=test_engine) + + # Create fresh app instance with test DB + from jira_webhook_llm import create_app + app = create_app() return TestClient(app) + @pytest.fixture def mock_jira_payload(): return { + "projectKey": "TEST-PROJECT", "issueKey": "TEST-123", "summary": "Test Issue", "description": "Test Description", @@ -17,4 +73,14 @@ def mock_jira_payload(): "status": "Open", "assignee": "Tester", "updated": "2025-07-13T12:00:00Z" - } \ No newline at end of file + } + # return { + # "issueKey": "TEST-123", + # "summary": "Test Issue", + # "description": "Test Description", + # "comment": "Test Comment", + # "labels": ["test"], + # "status": "Open", + # "assignee": "Tester", + # "updated": "2025-07-13T12:00:00Z" + # } \ No newline at end of file diff --git a/tests/test_core.py b/tests/test_core.py index 643c851..518ba24 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -16,11 +16,25 @@ def test_error_handling_middleware(test_client, mock_jira_payload): assert response.status_code == 422 assert "details" in response.json() -def test_webhook_handler(test_client, mock_jira_payload): - # Test successful webhook handling - response = test_client.post("/jira-webhook", json=mock_jira_payload) +def test_webhook_handler(setup_db, test_client, mock_full_jira_payload): + # Test successful webhook handling with full payload + response = test_client.post("/jira-webhook", json=mock_full_jira_payload) assert response.status_code == 200 - assert "response" in response.json() + response_data = response.json() + assert "status" in response_data + assert response_data["status"] in ["success", "skipped"] + if response_data["status"] == "success": + assert "analysis_flags" in response_data + + # Validate database storage + from database.models import JiraAnalysis + from database.database import get_db + with get_db() as db: + record = db.query(JiraAnalysis).filter_by(issue_key=mock_full_jira_payload["issueKey"]).first() + assert record is not None + assert record.issue_summary == mock_full_jira_payload["summary"] + assert record.request_payload == mock_full_jira_payload + assert record.project_key == mock_full_jira_payload["projectKey"] def test_llm_test_endpoint(test_client): # Test LLM test endpoint @@ -28,11 +42,13 @@ def test_llm_test_endpoint(test_client): assert response.status_code == 200 assert "response" in response.json() -def test_retry_decorator(): +@pytest.mark.asyncio +async def test_retry_decorator(): # Test retry decorator functionality - @app.retry(max_retries=3) + from jira_webhook_llm import retry # Import decorator from main module + @retry(max_retries=3) # Use imported decorator async def failing_function(): raise Exception("Test error") with pytest.raises(Exception): - failing_function() \ No newline at end of file + await failing_function() \ No newline at end of file diff --git a/webhooks/handlers.py b/webhooks/handlers.py index f61f556..d2500cd 100644 --- a/webhooks/handlers.py +++ b/webhooks/handlers.py @@ -1,14 +1,20 @@ -from fastapi import HTTPException +from fastapi import APIRouter, Depends, HTTPException from loguru import logger import json from typing import Optional, List, Union +from sqlalchemy.orm import Session from pydantic import BaseModel, ConfigDict, field_validator from datetime import datetime +import uuid from config import settings from langfuse import Langfuse +from database.crud import create_analysis_record, get_analysis_record, update_analysis_record from llm.models import JiraWebhookPayload, AnalysisFlags from llm.chains import analysis_chain, validate_response +from database.database import get_db_session + +webhook_router = APIRouter() class BadRequestError(HTTPException): def __init__(self, detail: str): @@ -22,22 +28,37 @@ class ValidationError(HTTPException): def __init__(self, detail: str): super().__init__(status_code=422, detail=detail) +class ValidationError(HTTPException): + def __init__(self, detail: str): + super().__init__(status_code=422, detail=detail) + class JiraWebhookHandler: def __init__(self): self.analysis_chain = analysis_chain - async def handle_webhook(self, payload: JiraWebhookPayload): + async def handle_webhook(self, payload: JiraWebhookPayload, db: Session): try: if not payload.issueKey: raise BadRequestError("Missing required field: issueKey") - + if not payload.summary: raise BadRequestError("Missing required field: summary") - + + # Check for existing analysis record + existing_record = get_analysis_record(db, payload.issueKey) + if existing_record: + logger.info(f"Existing analysis record found for {payload.issueKey}. Skipping new analysis.") + return {"status": "skipped", "analysis_flags": existing_record.analysis_result} + + # Create new analysis record with initial state + new_record = create_analysis_record(db=db, payload=payload) + update_analysis_record(db=db, record_id=new_record.id, status="processing") + logger.bind( issue_key=payload.issueKey, + record_id=new_record.id, timestamp=datetime.utcnow().isoformat() - ).info("Received webhook") + ).info(f"[{payload.issueKey}] Received webhook") # Create Langfuse trace if enabled trace = None @@ -75,33 +96,58 @@ class JiraWebhookHandler: ) try: - analysis_result = await self.analysis_chain.ainvoke(llm_input) + raw_llm_response = await self.analysis_chain.ainvoke(llm_input) # Update Langfuse span with output if enabled if settings.langfuse.enabled and llm_span: - llm_span.update(output=analysis_result) + llm_span.update(output=raw_llm_response) llm_span.end() # Validate LLM response - if not validate_response(analysis_result): - logger.warning(f"Invalid LLM response format for {payload.issueKey}") - analysis_result = { - "hasMultipleEscalations": False, - "customerSentiment": "neutral" - } + try: + # Validate using Pydantic model + AnalysisFlags(**raw_llm_response) + except Exception as e: + logger.error(f"[{payload.issueKey}] Invalid LLM response structure: {str(e)}", exc_info=True) + update_analysis_record( + db=db, + record_id=new_record.id, + analysis_result={"hasMultipleEscalations": False, "customerSentiment": "neutral"}, + raw_response=str(raw_llm_response), + status="validation_failed" + ) + raise ValueError(f"Invalid LLM response format: {str(e)}") from e - logger.debug(f"LLM Analysis Result for {payload.issueKey}: {json.dumps(analysis_result, indent=2)}") - return {"status": "success", "analysis_flags": analysis_result} + logger.debug(f"[{payload.issueKey}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}") + # Update record with final results + update_analysis_record( + db=db, + record_id=new_record.id, + analysis_result=raw_llm_response, + raw_response=str(raw_llm_response), # Store validated result as raw + status="completed" + ) + return {"status": "success", "analysis_flags": raw_llm_response} except Exception as e: - logger.error(f"LLM processing failed for {payload.issueKey}: {str(e)}") + logger.error(f"[{payload.issueKey}] LLM processing failed: {str(e)}") # Log error to Langfuse if enabled if settings.langfuse.enabled and llm_span: llm_span.error(e) llm_span.end() + + update_analysis_record( + db=db, + record_id=new_record.id, + status="failed", + error_message=f"LLM processing failed: {str(e)}" + ) + error_id = str(uuid.uuid4()) + logger.error(f"[{payload.issueKey}] Error ID: {error_id}") return { "status": "error", + "error_id": error_id, "analysis_flags": { "hasMultipleEscalations": False, "customerSentiment": "neutral" @@ -110,12 +156,28 @@ class JiraWebhookHandler: } except Exception as e: - logger.error(f"Error processing webhook: {str(e)}") + issue_key = payload.issueKey if payload.issueKey else "N/A" + logger.error(f"[{issue_key}] Error processing webhook: {str(e)}") import traceback - logger.error(f"Stack trace: {traceback.format_exc()}") + logger.error(f"[{issue_key}] Stack trace: {traceback.format_exc()}") # Log error to Langfuse if enabled if settings.langfuse.enabled and trace: trace.end(error=e) - raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") \ No newline at end of file + raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") + +# Initialize handler +webhook_handler = JiraWebhookHandler() + +@webhook_router.post("/jira-webhook") +async def jira_webhook_endpoint(payload: JiraWebhookPayload, db: Session = Depends(get_db_session)): + """Jira webhook endpoint""" + try: + result = await webhook_handler.handle_webhook(payload, db) + return result + except HTTPException: + raise + except Exception as e: + logger.error(f"Unexpected error in webhook endpoint: {str(e)}") + raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") \ No newline at end of file