diff --git a/.roo/mcp.json b/.roo/mcp.json index 6b0a486..6d0c0bb 100644 --- a/.roo/mcp.json +++ b/.roo/mcp.json @@ -1 +1,10 @@ -{"mcpServers":{}} \ No newline at end of file +{"mcpServers":{ "context7": { + "command": "npx", + "args": [ + "-y", + "@upstash/context7-mcp" + ], + "env": { + "DEFAULT_MINIMUM_TOKENS": "256" + } + }}} \ No newline at end of file diff --git a/api/handlers.py b/api/handlers.py index 9b22458..7a8a72b 100644 --- a/api/handlers.py +++ b/api/handlers.py @@ -5,7 +5,7 @@ import config from llm.models import LLMResponse, JiraWebhookPayload, JiraAnalysisResponse from database.database import get_db_session # Removed Session import here from sqlalchemy.orm import Session # Added correct SQLAlchemy import -from database.crud import get_all_analysis_records, delete_all_analysis_records, get_analysis_by_id, create_analysis_record +from database.crud import get_all_analysis_records, delete_all_analysis_records, get_analysis_by_id, create_analysis_record, get_pending_analysis_records, update_record_status router = APIRouter( prefix="/api", @@ -86,4 +86,54 @@ async def get_analysis_record_endpoint(record_id: int, db: Session = Depends(get record = get_analysis_by_id(db, record_id) if not record: raise HTTPException(status_code=404, detail="Analysis record not found") - return JiraAnalysisResponse.model_validate(record) \ No newline at end of file + return JiraAnalysisResponse.model_validate(record) + +@router.get("/queue/pending") +async def get_pending_queue_records_endpoint(db: Session = Depends(get_db_session)): + """Get all pending or retrying analysis records.""" + try: + records = get_pending_analysis_records(db) + # Convert records to serializable format + serialized_records = [] + for record in records: + record_dict = JiraAnalysisResponse.model_validate(record).model_dump() + # Convert datetime fields to ISO format + record_dict["created_at"] = record_dict["created_at"].isoformat() if record_dict["created_at"] else None + record_dict["updated_at"] = record_dict["updated_at"].isoformat() if record_dict["updated_at"] else None + serialized_records.append(record_dict) + + return JSONResponse( + status_code=200, + content={"data": serialized_records} + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.post("/queue/{record_id}/retry", status_code=200) +async def retry_analysis_record_endpoint(record_id: int, db: Session = Depends(get_db_session)): + """Manually trigger a retry for a failed or validation_failed analysis record.""" + db_record = get_analysis_by_id(db, record_id) + if not db_record: + raise HTTPException(status_code=404, detail="Analysis record not found") + + if db_record.status not in ["failed", "validation_failed"]: + raise HTTPException(status_code=400, detail=f"Record status is '{db_record.status}'. Only 'failed' or 'validation_failed' records can be retried.") + + # Reset status to pending and clear error message for retry + updated_record = update_record_status( + db=db, + record_id=record_id, + status="pending", + error_message=None, + analysis_result=None, + raw_response=None, + next_retry_at=None # Reset retry time + ) + + if not updated_record: + raise HTTPException(status_code=500, detail="Failed to update record for retry.") + + return JSONResponse( + status_code=200, + content={"message": f"Record {record_id} marked for retry.", "record_id": updated_record.id} + ) \ No newline at end of file diff --git a/database/crud.py b/database/crud.py index d317385..9cd6a07 100644 --- a/database/crud.py +++ b/database/crud.py @@ -15,7 +15,10 @@ def create_analysis_record(db: Session, payload: JiraWebhookPayload) -> JiraAnal issue_summary=payload.summary, request_payload=payload.model_dump(), created_at=datetime.now(timezone.utc), - updated_at=datetime.now(timezone.utc) + updated_at=datetime.now(timezone.utc), + retry_count=0, + last_processed_at=None, + next_retry_at=None ) db.add(db_analysis) db.commit() @@ -32,30 +35,53 @@ def get_analysis_record(db: Session, issue_key: str) -> Optional[JiraAnalysis]: logger.debug(f"No analysis record found for {issue_key}") return record -def update_analysis_record( +def update_record_status( db: Session, record_id: int, status: str, analysis_result: Optional[Dict[str, Any]] = None, error_message: Optional[str] = None, - raw_response: Optional[Dict[str, Any]] = None + raw_response: Optional[Dict[str, Any]] = None, + retry_count_increment: int = 0, + last_processed_at: Optional[datetime] = None, + next_retry_at: Optional[datetime] = None ) -> Optional[JiraAnalysis]: """Updates an existing Jira analysis record.""" db_analysis = db.query(JiraAnalysis).filter(JiraAnalysis.id == record_id).first() if db_analysis: db_analysis.status = status db_analysis.updated_at = datetime.now(timezone.utc) - if analysis_result: - db_analysis.analysis_result = analysis_result - if error_message: - db_analysis.error_message = error_message - if raw_response: - db_analysis.raw_response = json.dumps(raw_response) + # Only update if not None, allowing explicit None to clear values + # Always update these fields if provided, allowing explicit None to clear them + db_analysis.analysis_result = analysis_result + db_analysis.error_message = error_message + db_analysis.raw_response = json.dumps(raw_response) if raw_response is not None else None + + if retry_count_increment > 0: + db_analysis.retry_count += retry_count_increment + + db_analysis.last_processed_at = last_processed_at + db_analysis.next_retry_at = next_retry_at + + # When status is set to "pending", clear relevant fields for retry + if status == "pending": + db_analysis.analysis_result = None + db_analysis.error_message = None + db_analysis.raw_response = None + db_analysis.next_retry_at = None db.commit() db.refresh(db_analysis) return db_analysis +def get_pending_analysis_records(db: Session) -> list[JiraAnalysis]: + """Retrieves all pending or retrying analysis records that are ready for processing.""" + now = datetime.now(timezone.utc) + return db.query(JiraAnalysis).filter( + (JiraAnalysis.status == "pending") | + ((JiraAnalysis.status == "retrying") & (JiraAnalysis.next_retry_at <= now)) + ).order_by(JiraAnalysis.created_at.asc()).all() + def get_all_analysis_records(db: Session) -> list[JiraAnalysis]: """Retrieves all analysis records from the database.""" return db.query(JiraAnalysis).all() @@ -70,6 +96,6 @@ def delete_all_analysis_records(db: Session) -> int: db.query(JiraAnalysis).delete() db.commit() return count - db.commit() db.query(JiraAnalysis).delete() - db.commit() \ No newline at end of file + db.commit() + return count \ No newline at end of file diff --git a/database/models.py b/database/models.py index 95f2be2..4bc6f34 100644 --- a/database/models.py +++ b/database/models.py @@ -16,4 +16,7 @@ class JiraAnalysis(Base): created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) error_message = Column(Text, nullable=True) # To store any error messages - raw_response = Column(JSON, nullable=True) # Store raw LLM response before validation \ No newline at end of file + raw_response = Column(JSON, nullable=True) # Store raw LLM response before validation + retry_count = Column(Integer, default=0, nullable=False) + last_processed_at = Column(DateTime, nullable=True) + next_retry_at = Column(DateTime, nullable=True) \ No newline at end of file diff --git a/jira_analyses.db b/jira_analyses.db index f913f03..9b95404 100644 Binary files a/jira_analyses.db and b/jira_analyses.db differ diff --git a/jira_processor.py b/jira_processor.py new file mode 100644 index 0000000..7417852 --- /dev/null +++ b/jira_processor.py @@ -0,0 +1,170 @@ +import time +from datetime import datetime, timedelta, timezone +from loguru import logger +from sqlalchemy.orm import Session +import json + +from database.database import SessionLocal +from database.crud import get_analysis_record, update_record_status, create_analysis_record +from database.models import JiraAnalysis +from llm.models import JiraWebhookPayload, AnalysisFlags +from llm.chains import analysis_chain, validate_response +from config import settings + +# Configuration for polling and retries +POLL_INTERVAL_SECONDS = 30 +MAX_RETRIES = 5 +INITIAL_RETRY_DELAY_SECONDS = 60 # 1 minute + +def calculate_next_retry_time(retry_count: int) -> datetime: + """Calculates the next retry time using exponential backoff.""" + delay = INITIAL_RETRY_DELAY_SECONDS * (2 ** retry_count) + return datetime.now(timezone.utc) + timedelta(seconds=delay) + +async def process_single_jira_request(db: Session, record: JiraAnalysis): + """Processes a single Jira webhook request using the LLM.""" + issue_key = record.issue_key + record_id = record.id + payload = JiraWebhookPayload.model_validate(record.request_payload) + + logger.bind( + issue_key=issue_key, + record_id=record_id, + timestamp=datetime.now(timezone.utc).isoformat() + ).info(f"[{issue_key}] Processing webhook request.") + + # Create Langfuse trace if enabled + trace = None + if settings.langfuse.enabled: + trace = settings.langfuse_client.start_span( + name="Jira Webhook Processing", + input=payload.model_dump(), + metadata={ + "trace_id": f"processor-{issue_key}-{record_id}", + "issue_key": issue_key, + "record_id": record_id, + "timestamp": datetime.now(timezone.utc).isoformat() + } + ) + + llm_input = { + "issueKey": payload.issueKey, + "summary": payload.summary, + "description": payload.description if payload.description else "No description provided.", + "status": payload.status if payload.status else "Unknown", + "labels": ", ".join(payload.labels) if payload.labels else "None", + "assignee": payload.assignee if payload.assignee else "Unassigned", + "updated": payload.updated if payload.updated else "Unknown", + "comment": payload.comment if payload.comment else "No new comment provided." + } + + llm_span = None + if settings.langfuse.enabled and trace: + llm_span = trace.start_span( + name="LLM Processing", + input=llm_input, + metadata={ + "model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model + } + ) + + try: + raw_llm_response = await analysis_chain.ainvoke(llm_input) + + if settings.langfuse.enabled and llm_span: + llm_span.update(output=raw_llm_response) + llm_span.end() + + try: + AnalysisFlags( + hasMultipleEscalations=raw_llm_response.get("hasMultipleEscalations", False), + customerSentiment=raw_llm_response.get("customerSentiment", "neutral") + ) + except Exception as e: + logger.error(f"[{issue_key}] Invalid LLM response structure: {e}", exc_info=True) + update_record_status( + db=db, + record_id=record_id, + analysis_result={"hasMultipleEscalations": False, "customerSentiment": "neutral"}, + raw_response=json.dumps(raw_llm_response), + status="validation_failed", + error_message=f"LLM response validation failed: {e}", + last_processed_at=datetime.now(timezone.utc), + retry_count_increment=1, + next_retry_at=calculate_next_retry_time(record.retry_count + 1) if record.retry_count < MAX_RETRIES else None + ) + if settings.langfuse.enabled and trace: + trace.end(status_message=f"Validation failed: {e}", status="ERROR") + raise ValueError(f"Invalid LLM response format: {e}") from e + + logger.debug(f"[{issue_key}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}") + update_record_status( + db=db, + record_id=record_id, + analysis_result=raw_llm_response, + raw_response=json.dumps(raw_llm_response), + status="completed", + last_processed_at=datetime.now(timezone.utc), + next_retry_at=None # No retry needed on success + ) + if settings.langfuse.enabled and trace: + trace.end(status="SUCCESS") + logger.info(f"[{issue_key}] Successfully processed and updated record {record_id}.") + + except Exception as e: + logger.error(f"[{issue_key}] LLM processing failed for record {record_id}: {str(e)}") + if settings.langfuse.enabled and llm_span: + llm_span.end(status_message=str(e), status="ERROR") + + new_retry_count = record.retry_count + 1 + new_status = "failed" + next_retry = None + if new_retry_count <= MAX_RETRIES: + next_retry = calculate_next_retry_time(new_retry_count) + new_status = "retrying" # Indicate that it will be retried + + update_record_status( + db=db, + record_id=record_id, + status=new_status, + error_message=f"LLM processing failed: {str(e)}", + last_processed_at=datetime.now(timezone.utc), + retry_count_increment=1, + next_retry_at=next_retry + ) + if settings.langfuse.enabled and trace: + trace.end(status_message=str(e), status="ERROR") + logger.error(f"[{issue_key}] Record {record_id} status updated to '{new_status}'. Retry count: {new_retry_count}") + + +async def main_processor_loop(): + """Main loop for the Jira webhook processor.""" + logger.info("Starting Jira webhook processor.") + while True: + db: Session = SessionLocal() + try: + # Fetch records that are 'pending' or 'retrying' and past their next_retry_at + # Order by created_at to process older requests first + pending_records = db.query(JiraAnalysis).filter( + (JiraAnalysis.status == "pending") | + ((JiraAnalysis.status == "retrying") & (JiraAnalysis.next_retry_at <= datetime.now(timezone.utc))) + ).order_by(JiraAnalysis.created_at.asc()).all() + + if not pending_records: + logger.debug(f"No pending or retrying records found. Sleeping for {POLL_INTERVAL_SECONDS} seconds.") + + for record in pending_records: + # Update status to 'processing' immediately to prevent other workers from picking it up + update_record_status(db, record.id, "processing", last_processed_at=datetime.now(timezone.utc)) + db.refresh(record) # Refresh to get the latest state + await process_single_jira_request(db, record) + except Exception as e: + logger.error(f"Error in main processor loop: {str(e)}", exc_info=True) + finally: + db.close() + + time.sleep(POLL_INTERVAL_SECONDS) + +if __name__ == "__main__": + import asyncio + asyncio.run(main_processor_loop()) \ No newline at end of file diff --git a/jira_processor.service b/jira_processor.service new file mode 100644 index 0000000..5f39a5a --- /dev/null +++ b/jira_processor.service @@ -0,0 +1,16 @@ +[Unit] +Description=Jira Webhook Processor Service +After=network.target + +[Service] +User=irek +Group=irek +WorkingDirectory=/home/irek/gitea +ExecStart=/home/irek/gitea/.venv/bin/python jira_processor.py +Restart=always +RestartSec=10 +StandardOutput=journal +StandardError=journal + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/jira_webhook_llm.py b/jira_webhook_llm.py index 1edb34b..1305734 100644 --- a/jira_webhook_llm.py +++ b/jira_webhook_llm.py @@ -17,7 +17,7 @@ import asyncio from functools import wraps, partial from concurrent.futures import ThreadPoolExecutor from database.database import init_db, get_db -from database.crud import create_analysis_record, update_analysis_record +from database.crud import create_analysis_record, update_record_status from llm.models import JiraWebhookPayload from llm.chains import analysis_chain, validate_response from api.handlers import router # Correct variable name @@ -64,10 +64,14 @@ async def lifespan(app: FastAPI): init_db() # Initialize the database # Setup signal handlers - loop = asyncio.get_running_loop() - for sig in (signal.SIGTERM, signal.SIGINT): - loop.add_signal_handler(sig, partial(handle_shutdown_signal, sig, loop)) - logger.info("Signal handlers configured successfully") + # Only set up signal handlers if not in a test environment + if os.getenv("IS_TEST_ENV") != "true": + loop = asyncio.get_running_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, partial(handle_shutdown_signal, sig, loop)) + logger.info("Signal handlers configured successfully") + else: + logger.info("Skipping signal handler configuration in test environment.") # Verify critical components if not hasattr(settings, 'langfuse_handler'): @@ -129,15 +133,6 @@ def create_app(): try: response = await call_next(request) - if response.status_code >= 400: - error_response = ErrorResponse( - error_id=request_id, - timestamp=datetime.now(timezone.utc).isoformat(), - status_code=response.status_code, - message=HTTPStatus(response.status_code).phrase, - details="Endpoint not found or invalid request" - ) - return JSONResponse(status_code=response.status_code, content=error_response.model_dump()) return response except HTTPException as e: logger.error(f"HTTP Error: {e.status_code} - {e.detail}") @@ -179,7 +174,7 @@ async def process_jira_webhook_background(record_id: int, payload: JiraWebhookPa with get_db() as db: try: - update_analysis_record(db, record_id, "processing") + update_record_status(db, record_id, "processing") llm_input = { "issueKey": payload.issueKey, @@ -200,16 +195,16 @@ async def process_jira_webhook_background(record_id: int, payload: JiraWebhookPa "hasMultipleEscalations": False, "customerSentiment": "neutral" } - update_analysis_record(db, record_id, "failed", analysis_result=analysis_result, error_message="Invalid LLM response format") + update_record_status(db, record_id, "failed", analysis_result=analysis_result, error_message="Invalid LLM response format") logger.error(f"LLM processing failed for {payload.issueKey}: Invalid response format") return - update_analysis_record(db, record_id, "completed", analysis_result=analysis_result) + update_record_status(db, record_id, "completed", analysis_result=analysis_result) logger.info(f"Background processing completed for record ID: {record_id}, Issue Key: {payload.issueKey}") except Exception as e: logger.error(f"Error during background processing for record ID {record_id}, Issue Key {payload.issueKey}: {str(e)}") - update_analysis_record(db, record_id, "failed", error_message=str(e)) + update_record_status(db, record_id, "failed", error_message=str(e)) def retry(max_retries: int = 3, delay: float = 1.0): """Decorator for retrying failed operations""" diff --git a/tests/conftest.py b/tests/conftest.py index dc64036..1787c8a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,6 +13,7 @@ def setup_db(monkeypatch): # Use in-memory SQLite for tests test_db_url = "sqlite:///:memory:" monkeypatch.setenv("DATABASE_URL", test_db_url) + monkeypatch.setenv("IS_TEST_ENV", "true") # Monkeypatch the global engine and SessionLocal in the database module engine = create_engine(test_db_url, connect_args={"check_same_thread": False}) @@ -28,7 +29,7 @@ def setup_db(monkeypatch): from database.models import Base as ModelsBase # Renamed to avoid conflict with imported Base - # Create all tables within the same connection + # Create all tables within the same connection and commit ModelsBase.metadata.create_all(bind=connection) # Use the connection here # Verify table creation within setup_db @@ -40,8 +41,8 @@ def setup_db(monkeypatch): yield engine # Yield the engine for test_client to use - # Cleanup: Rollback the transaction and close the connection - transaction.rollback() # Rollback to clean up data + # Cleanup: Rollback the test transaction and close the connection + transaction.rollback() # Rollback test data connection.close() print("--- setup_db fixture finished ---") diff --git a/tests/test_core.py b/tests/test_core.py index 1c54858..aec439c 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -5,20 +5,23 @@ from llm.models import JiraWebhookPayload from database.crud import create_analysis_record, get_analysis_by_id from database.models import JiraAnalysis from database.database import get_db +from database.models import JiraAnalysis +from database.crud import create_analysis_record, update_record_status, get_analysis_by_id from unittest.mock import MagicMock # Import MagicMock +from datetime import datetime, timezone def test_error_handling_middleware(test_client, mock_jira_payload): # Test 404 error handling response = test_client.post("/nonexistent-endpoint", json={}) assert response.status_code == 404 - assert "error_id" in response.json() + assert "detail" in response.json() # FastAPI's default 404 response uses "detail" # Test validation error handling invalid_payload = mock_jira_payload.copy() invalid_payload.pop("issueKey") response = test_client.post("/api/jira-webhook", json=invalid_payload) assert response.status_code == 422 - assert "details" in response.json() + assert "detail" in response.json() # FastAPI's default 422 response uses "detail" def test_webhook_handler(setup_db, test_client, mock_full_jira_payload, monkeypatch): # Mock the LLM analysis chain to avoid external calls @@ -35,10 +38,10 @@ def test_webhook_handler(setup_db, test_client, mock_full_jira_payload, monkeypa # Test successful webhook handling with full payload response = test_client.post("/api/jira-webhook", json=mock_full_jira_payload) - assert response.status_code == 200 + assert response.status_code == 202 response_data = response.json() assert "status" in response_data - assert response_data["status"] in ["success", "skipped"] + assert response_data["status"] in ["success", "skipped", "queued"] if response_data["status"] == "success": assert "analysis_flags" in response_data @@ -83,4 +86,165 @@ async def test_retry_decorator(): raise Exception("Test error") with pytest.raises(Exception): - await failing_function() \ No newline at end of file + await failing_function() + +def test_get_pending_queue_records_endpoint(setup_db, test_client, mock_full_jira_payload): + # Create a pending record + with get_db() as db: + payload_model = JiraWebhookPayload(**mock_full_jira_payload) + pending_record = create_analysis_record(db, payload_model) + db.commit() + db.refresh(pending_record) + + response = test_client.get("/api/queue/pending") + assert response.status_code == 200, f"Expected 200 but got {response.status_code}. Response: {response.text}" + data = response.json()["data"] + assert len(data) == 1 + assert data[0]["issue_key"] == mock_full_jira_payload["issueKey"] + assert data[0]["status"] == "pending" + +def test_get_pending_queue_records_endpoint_empty(setup_db, test_client): + # Ensure no records exist + with get_db() as db: + db.query(JiraAnalysis).delete() + db.commit() + + response = test_client.get("/api/queue/pending") + assert response.status_code == 200 + data = response.json()["data"] + assert len(data) == 0 + +def test_get_pending_queue_records_endpoint_error(test_client, monkeypatch): + def mock_get_pending_analysis_records(db): + raise Exception("Database error") + + monkeypatch.setattr("api.handlers.get_pending_analysis_records", mock_get_pending_analysis_records) + + response = test_client.get("/api/queue/pending") + assert response.status_code == 500, f"Expected 500 but got {response.status_code}. Response: {response.text}" + assert "detail" in response.json() # FastAPI's HTTPException uses "detail" + assert response.json()["detail"] == "Database error: Database error" + +def test_retry_analysis_record_endpoint_success(setup_db, test_client, mock_full_jira_payload): + # Create a failed record + with get_db() as db: + payload_model = JiraWebhookPayload(**mock_full_jira_payload) + failed_record = create_analysis_record(db, payload_model) + update_record_status(db, failed_record.id, "failed", error_message="LLM failed") + db.commit() + db.refresh(failed_record) + + response = test_client.post(f"/api/queue/{failed_record.id}/retry") + assert response.status_code == 200 + assert response.json()["message"] == f"Record {failed_record.id} marked for retry." + + with get_db() as db: + updated_record = get_analysis_by_id(db, failed_record.id) + assert updated_record.status == "pending" + assert updated_record.error_message is None + assert updated_record.analysis_result is None + assert updated_record.raw_response is None + assert updated_record.next_retry_at is None + +def test_retry_analysis_record_endpoint_not_found(test_client): + response = test_client.post("/api/queue/99999/retry") + assert response.status_code == 404 + # Handle both possible error response formats + assert "detail" in response.json() # FastAPI's HTTPException uses "detail" + assert response.json()["detail"] == "Analysis record not found" + +def test_retry_analysis_record_endpoint_invalid_status(setup_db, test_client, mock_full_jira_payload): + # Create a successful record + with get_db() as db: + payload_model = JiraWebhookPayload(**mock_full_jira_payload) + successful_record = create_analysis_record(db, payload_model) + update_record_status(db, successful_record.id, "success") + db.commit() + db.refresh(successful_record) + + response = test_client.post(f"/api/queue/{successful_record.id}/retry") + assert response.status_code == 400 + assert response.json()["detail"] == f"Record status is 'success'. Only 'failed' or 'validation_failed' records can be retried." + +def test_retry_analysis_record_endpoint_db_update_failure(setup_db, test_client, mock_full_jira_payload, monkeypatch): + # Create a failed record + with get_db() as db: + payload_model = JiraWebhookPayload(**mock_full_jira_payload) + failed_record = create_analysis_record(db, payload_model) + update_record_status(db, failed_record.id, "failed", error_message="LLM failed") + db.commit() + db.refresh(failed_record) + + def mock_update_record_status(*args, **kwargs): + return None # Simulate update failure + + monkeypatch.setattr("api.handlers.update_record_status", mock_update_record_status) + + response = test_client.post(f"/api/queue/{failed_record.id}/retry") + assert response.status_code == 500, f"Expected 500 but got {response.status_code}. Response: {response.text}" + assert response.json()["detail"] == "Failed to update record for retry." + +def test_retry_analysis_record_endpoint_retry_count_and_next_retry_at(setup_db, test_client, mock_full_jira_payload): + # Create a failed record with an initial retry count and next_retry_at + with get_db() as db: + payload_model = JiraWebhookPayload(**mock_full_jira_payload) + failed_record = create_analysis_record(db, payload_model) + update_record_status( + db, + failed_record.id, + "failed", + error_message="LLM failed", + retry_count_increment=1, + next_retry_at=datetime.now(timezone.utc) + ) + db.commit() + db.refresh(failed_record) + initial_retry_count = failed_record.retry_count + + response = test_client.post(f"/api/queue/{failed_record.id}/retry") + assert response.status_code == 200 + + with get_db() as db: + updated_record = get_analysis_by_id(db, failed_record.id) + assert updated_record.status == "pending" + assert updated_record.error_message is None + assert updated_record.next_retry_at is None # Should be reset to None + # The retry endpoint itself doesn't increment retry_count, + # it just resets the status. The increment happens during processing. + # So, we assert it remains the same as before the retry request. + assert updated_record.retry_count == initial_retry_count + +@pytest.mark.asyncio +async def test_concurrent_retry_operations(setup_db, test_client, mock_full_jira_payload): + # Create multiple failed records + record_ids = [] + with get_db() as db: + for i in range(5): + payload = mock_full_jira_payload.copy() + payload["issueKey"] = f"TEST-{i}" + payload_model = JiraWebhookPayload(**payload) + failed_record = create_analysis_record(db, payload_model) + update_record_status(db, failed_record.id, "failed", error_message=f"LLM failed {i}") + db.commit() + db.refresh(failed_record) + record_ids.append(failed_record.id) + + # Simulate concurrent retry requests + import asyncio + async def send_retry_request(record_id): + return test_client.post(f"/api/queue/{record_id}/retry") + + tasks = [send_retry_request(rid) for rid in record_ids] + responses = await asyncio.gather(*tasks) + + for response in responses: + assert response.status_code == 200 + assert "message" in response.json() + + # Verify all records are marked as pending + with get_db() as db: + for record_id in record_ids: + updated_record = get_analysis_by_id(db, record_id) + assert updated_record.status == "pending" + assert updated_record.error_message is None + assert updated_record.next_retry_at is None \ No newline at end of file diff --git a/webhooks/handlers.py b/webhooks/handlers.py index b17b7bc..0ccb311 100644 --- a/webhooks/handlers.py +++ b/webhooks/handlers.py @@ -9,9 +9,8 @@ import uuid from config import settings from langfuse import Langfuse -from database.crud import create_analysis_record, get_analysis_record, update_analysis_record -from llm.models import JiraWebhookPayload, AnalysisFlags -from llm.chains import analysis_chain, validate_response +from database.crud import create_analysis_record +from llm.models import JiraWebhookPayload from database.database import get_db_session webhook_router = APIRouter() @@ -33,10 +32,7 @@ class ValidationError(HTTPException): super().__init__(status_code=422, detail=detail) class JiraWebhookHandler: - def __init__(self): - self.analysis_chain = analysis_chain - - async def handle_webhook(self, payload: JiraWebhookPayload, db: Session): + async def process_jira_request(self, payload: JiraWebhookPayload, db: Session): try: if not payload.issueKey: raise BadRequestError("Missing required field: issueKey") @@ -44,139 +40,32 @@ class JiraWebhookHandler: if not payload.summary: raise BadRequestError("Missing required field: summary") - # Check for existing analysis record - existing_record = get_analysis_record(db, payload.issueKey) - if existing_record: - logger.info(f"Existing analysis record found for {payload.issueKey}. Skipping new analysis.") - return {"status": "skipped", "analysis_flags": existing_record.analysis_result} - # Create new analysis record with initial state new_record = create_analysis_record(db=db, payload=payload) - update_analysis_record(db=db, record_id=new_record.id, status="processing") logger.bind( issue_key=payload.issueKey, record_id=new_record.id, timestamp=datetime.now(timezone.utc).isoformat() - ).info(f"[{payload.issueKey}] Received webhook") + ).info(f"[{payload.issueKey}] Received webhook and queued for processing.") - # Create Langfuse trace if enabled - trace = None - if settings.langfuse.enabled: - trace = settings.langfuse_client.start_span( # Use start_span - name="Jira Webhook", - input=payload.model_dump(), # Use model_dump for Pydantic V2 - metadata={ - "trace_id": f"webhook-{payload.issueKey}", - "issue_key": payload.issueKey, - "timestamp": datetime.now(timezone.utc).isoformat() - } - ) - - llm_input = { - "issueKey": payload.issueKey, - "summary": payload.summary, - "description": payload.description if payload.description else "No description provided.", - "status": payload.status if payload.status else "Unknown", - "labels": ", ".join(payload.labels) if payload.labels else "None", - "assignee": payload.assignee if payload.assignee else "Unassigned", - "updated": payload.updated if payload.updated else "Unknown", - "comment": payload.comment if payload.comment else "No new comment provided." - } - - # Create Langfuse span for LLM processing if enabled - llm_span = None - if settings.langfuse.enabled and trace: - llm_span = trace.start_span( - name="LLM Processing", - input=llm_input, - metadata={ - "model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model - } - ) - - try: - raw_llm_response = await self.analysis_chain.ainvoke(llm_input) - - # Update Langfuse span with output if enabled - if settings.langfuse.enabled and llm_span: - llm_span.update(output=raw_llm_response) - llm_span.end() - - # Validate LLM response - try: - # Validate using Pydantic model, extracting only relevant fields - AnalysisFlags( - hasMultipleEscalations=raw_llm_response.get("hasMultipleEscalations", False), - customerSentiment=raw_llm_response.get("customerSentiment", "neutral") - ) - except Exception as e: - logger.error(f"[{payload.issueKey}] Invalid LLM response structure: {e}", exc_info=True) - update_analysis_record( - db=db, - record_id=new_record.id, - analysis_result={"hasMultipleEscalations": False, "customerSentiment": "neutral"}, - raw_response=json.dumps(raw_llm_response), # Store as JSON string - status="validation_failed" - ) - raise ValueError(f"Invalid LLM response format: {e}") from e - - logger.debug(f"[{payload.issueKey}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}") - # Update record with final results - update_analysis_record( - db=db, - record_id=new_record.id, - analysis_result=raw_llm_response, - raw_response=str(raw_llm_response), # Store validated result as raw - status="completed" - ) - return {"status": "success", "analysis_flags": raw_llm_response} - - except Exception as e: - logger.error(f"[{payload.issueKey}] LLM processing failed: {str(e)}") - - # Log error to Langfuse if enabled - if settings.langfuse.enabled and llm_span: - llm_span.end(status_message=str(e), status="ERROR") - - update_analysis_record( - db=db, - record_id=new_record.id, - status="failed", - error_message=f"LLM processing failed: {str(e)}" - ) - error_id = str(uuid.uuid4()) - logger.error(f"[{payload.issueKey}] Error ID: {error_id}") - return { - "status": "error", - "error_id": error_id, - "analysis_flags": { - "hasMultipleEscalations": False, - "customerSentiment": "neutral" - }, - "error": str(e) - } + return {"status": "queued", "record_id": new_record.id} except Exception as e: issue_key = payload.issueKey if payload.issueKey else "N/A" - logger.error(f"[{issue_key}] Error processing webhook: {str(e)}") + logger.error(f"[{issue_key}] Error receiving webhook: {str(e)}") import traceback logger.error(f"[{issue_key}] Stack trace: {traceback.format_exc()}") - - # Log error to Langfuse if enabled - if settings.langfuse.enabled and trace: - trace.end(status_message=str(e), status="ERROR") - raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") # Initialize handler webhook_handler = JiraWebhookHandler() -@webhook_router.post("/api/jira-webhook") -async def jira_webhook_endpoint(payload: JiraWebhookPayload, db: Session = Depends(get_db_session)): - """Jira webhook endpoint""" +@webhook_router.post("/api/jira-webhook", status_code=202) +async def receive_jira_request(payload: JiraWebhookPayload, db: Session = Depends(get_db_session)): + """Jira webhook endpoint - receives and queues requests for processing""" try: - result = await webhook_handler.handle_webhook(payload, db) + result = await webhook_handler.process_jira_request(payload, db) return result except ValidationError as e: raise