jira-webhook-llm/jira_processor.py

170 lines
7.1 KiB
Python

import time
from datetime import datetime, timedelta, timezone
from loguru import logger
from sqlalchemy.orm import Session
import json
from database.database import SessionLocal
from database.crud import get_analysis_record, update_record_status, create_analysis_record
from database.models import JiraAnalysis
from llm.models import JiraWebhookPayload, AnalysisFlags
from llm.chains import analysis_chain, validate_response
from config import settings
# Configuration for polling and retries
POLL_INTERVAL_SECONDS = 30
MAX_RETRIES = 5
INITIAL_RETRY_DELAY_SECONDS = 60 # 1 minute
def calculate_next_retry_time(retry_count: int) -> datetime:
"""Calculates the next retry time using exponential backoff."""
delay = INITIAL_RETRY_DELAY_SECONDS * (2 ** retry_count)
return datetime.now(timezone.utc) + timedelta(seconds=delay)
async def process_single_jira_request(db: Session, record: JiraAnalysis):
"""Processes a single Jira webhook request using the LLM."""
issue_key = record.issue_key
record_id = record.id
payload = JiraWebhookPayload.model_validate(record.request_payload)
logger.bind(
issue_key=issue_key,
record_id=record_id,
timestamp=datetime.now(timezone.utc).isoformat()
).info(f"[{issue_key}] Processing webhook request.")
# Create Langfuse trace if enabled
trace = None
if settings.langfuse.enabled:
trace = settings.langfuse_client.start_span(
name="Jira Webhook Processing",
input=payload.model_dump(),
metadata={
"trace_id": f"processor-{issue_key}-{record_id}",
"issue_key": issue_key,
"record_id": record_id,
"timestamp": datetime.now(timezone.utc).isoformat()
}
)
llm_input = {
"issueKey": payload.issueKey,
"summary": payload.summary,
"description": payload.description if payload.description else "No description provided.",
"status": payload.status if payload.status else "Unknown",
"labels": ", ".join(payload.labels) if payload.labels else "None",
"assignee": payload.assignee if payload.assignee else "Unassigned",
"updated": payload.updated if payload.updated else "Unknown",
"comment": payload.comment if payload.comment else "No new comment provided."
}
llm_span = None
if settings.langfuse.enabled and trace:
llm_span = trace.start_span(
name="LLM Processing",
input=llm_input,
metadata={
"model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model
}
)
try:
raw_llm_response = await analysis_chain.ainvoke(llm_input)
if settings.langfuse.enabled and llm_span:
llm_span.update(output=raw_llm_response)
llm_span.end()
try:
AnalysisFlags(
hasMultipleEscalations=raw_llm_response.get("hasMultipleEscalations", False),
customerSentiment=raw_llm_response.get("customerSentiment", "neutral")
)
except Exception as e:
logger.error(f"[{issue_key}] Invalid LLM response structure: {e}", exc_info=True)
update_record_status(
db=db,
record_id=record_id,
analysis_result={"hasMultipleEscalations": False, "customerSentiment": "neutral"},
raw_response=json.dumps(raw_llm_response),
status="validation_failed",
error_message=f"LLM response validation failed: {e}",
last_processed_at=datetime.now(timezone.utc),
retry_count_increment=1,
next_retry_at=calculate_next_retry_time(record.retry_count + 1) if record.retry_count < MAX_RETRIES else None
)
if settings.langfuse.enabled and trace:
trace.end(status_message=f"Validation failed: {e}", status="ERROR")
raise ValueError(f"Invalid LLM response format: {e}") from e
logger.debug(f"[{issue_key}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}")
update_record_status(
db=db,
record_id=record_id,
analysis_result=raw_llm_response,
raw_response=json.dumps(raw_llm_response),
status="completed",
last_processed_at=datetime.now(timezone.utc),
next_retry_at=None # No retry needed on success
)
if settings.langfuse.enabled and trace:
trace.end(status="SUCCESS")
logger.info(f"[{issue_key}] Successfully processed and updated record {record_id}.")
except Exception as e:
logger.error(f"[{issue_key}] LLM processing failed for record {record_id}: {str(e)}")
if settings.langfuse.enabled and llm_span:
llm_span.end(status_message=str(e), status="ERROR")
new_retry_count = record.retry_count + 1
new_status = "failed"
next_retry = None
if new_retry_count <= MAX_RETRIES:
next_retry = calculate_next_retry_time(new_retry_count)
new_status = "retrying" # Indicate that it will be retried
update_record_status(
db=db,
record_id=record_id,
status=new_status,
error_message=f"LLM processing failed: {str(e)}",
last_processed_at=datetime.now(timezone.utc),
retry_count_increment=1,
next_retry_at=next_retry
)
if settings.langfuse.enabled and trace:
trace.end(status_message=str(e), status="ERROR")
logger.error(f"[{issue_key}] Record {record_id} status updated to '{new_status}'. Retry count: {new_retry_count}")
async def main_processor_loop():
"""Main loop for the Jira webhook processor."""
logger.info("Starting Jira webhook processor.")
while True:
db: Session = SessionLocal()
try:
# Fetch records that are 'pending' or 'retrying' and past their next_retry_at
# Order by created_at to process older requests first
pending_records = db.query(JiraAnalysis).filter(
(JiraAnalysis.status == "pending") |
((JiraAnalysis.status == "retrying") & (JiraAnalysis.next_retry_at <= datetime.now(timezone.utc)))
).order_by(JiraAnalysis.created_at.asc()).all()
if not pending_records:
logger.debug(f"No pending or retrying records found. Sleeping for {POLL_INTERVAL_SECONDS} seconds.")
for record in pending_records:
# Update status to 'processing' immediately to prevent other workers from picking it up
update_record_status(db, record.id, "processing", last_processed_at=datetime.now(timezone.utc))
db.refresh(record) # Refresh to get the latest state
await process_single_jira_request(db, record)
except Exception as e:
logger.error(f"Error in main processor loop: {str(e)}", exc_info=True)
finally:
db.close()
time.sleep(POLL_INTERVAL_SECONDS)
if __name__ == "__main__":
import asyncio
asyncio.run(main_processor_loop())