jira-webhook-llm/jira-webhook-llm.py
Ireneusz Bachanowicz 0c468c0a69 feat: Implement Jira Webhook Handler with LLM Integration
- Added FastAPI application to handle Jira webhooks.
- Created Pydantic models for Jira payload and LLM output.
- Integrated LangChain with OpenAI and Ollama for LLM processing.
- Set up Langfuse for tracing and monitoring.
- Implemented analysis logic for Jira tickets, including sentiment analysis and label suggestions.
- Added test endpoint for LLM integration.
- Updated requirements.txt to include necessary dependencies and versions.
2025-07-13 11:44:19 +02:00

279 lines
11 KiB
Python

import os
import json
from typing import Optional, List, Union
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field, ConfigDict, validator
from loguru import logger
# Import your new settings object
from config import settings
# LangChain imports
from langchain_ollama import OllamaLLM
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel as LCBaseModel
from pydantic import field_validator
# Langfuse imports
from langfuse import Langfuse, get_client
from langfuse.langchain import CallbackHandler
# LANGFUSE_PUBLIC_KEY="pk_lf_..."
# LANGFUSE_SECRET_KEY="sk_lf_..."
# LANGFUSE_HOST="https://cloud.langfuse.com" # Or "https://us.cloud.langfuse.com" for US region, or your self-hosted instance
langfuse = Langfuse(
secret_key="sk-lf-55d5fa70-e2d3-44d0-ae76-48181126d7ed",
public_key="pk-lf-0f6178ee-e6aa-4cb7-a433-6c00c6512874",
host="https://cloud.langfuse.com"
)
# Initialize Langfuse client (optional, get_client() uses environment variables by default)
# It's good practice to initialize it early to ensure connection.
try:
langfuse_client = get_client()
if langfuse_client.auth_check():
logger.info("Langfuse client authenticated successfully.")
else:
logger.warning("Langfuse authentication failed. Check your API keys and host.")
except Exception as e:
logger.error(f"Failed to initialize Langfuse client: {e}")
# Depending on your tolerance, you might want to exit or continue without tracing
# For now, we'll just log and continue, but traces won't be sent.
# --- Pydantic Models for Jira Payload and LLM Output ---
# Configuration for Pydantic to handle camelCase to snake_case conversion
class JiraWebhookPayload(BaseModel):
model_config = ConfigDict(alias_generator=lambda x: ''.join(word.capitalize() if i > 0 else word for i, word in enumerate(x.split('_'))), populate_by_name=True)
issueKey: str
summary: str
description: Optional[str] = None
comment: Optional[str] = None # Assuming this is the *new* comment that triggered the webhook
labels: Optional[Union[List[str], str]] = []
@field_validator('labels', mode='before') # `pre=True` becomes `mode='before'`
@classmethod # V2 validators must be classmethods
def convert_labels_to_list(cls, v):
if isinstance(v, str):
return [v]
return v or [] # Return an empty list if v is None/empty
status: Optional[str] = None
assignee: Optional[str] = None
updated: Optional[str] = None # Timestamp string
# Define the structure of the LLM's expected JSON output
class AnalysisFlags(LCBaseModel):
hasMultipleEscalations: bool = Field(description="Is there evidence of multiple escalation attempts or channels?")
requiresUrgentAttention: bool = Field(description="Does the issue convey a sense of urgency beyond standard priority?")
customerSentiment: Optional[str] = Field(description="Overall customer sentiment (e.g., 'neutral', 'frustrated', 'calm').")
suggestedLabels: List[str] = Field(description="List of suggested Jira labels, e.g., ['escalated', 'high-customer-impact'].")
summaryOfConcerns: Optional[str] = Field(description="A concise summary of the key concerns or problems in the ticket.")
# --- LLM Setup (Now dynamic based on config) ---
llm = None
if settings.llm_mode == 'openai':
logger.info(f"Initializing ChatOpenAI with model: {settings.openai_model}")
llm = ChatOpenAI(
model=settings.openai_model,
temperature=0.7,
max_tokens=2000,
api_key=settings.openai_api_key,
base_url=settings.openai_api_base_url
)
elif settings.llm_mode == 'ollama':
logger.info(f"Initializing OllamaLLM with model: {settings.ollama_model} at {settings.ollama_base_url}")
llm = OllamaLLM(
model=settings.ollama_model,
base_url=settings.ollama_base_url,
streaming=False
)
# This check is now redundant because config.py would have exited, but it's good for clarity.
if llm is None:
logger.error("LLM could not be initialized. Exiting.")
sys.exit(1)
app = FastAPI()
# Set up Output Parser for structured JSON
parser = JsonOutputParser(pydantic_object=AnalysisFlags)
# Prompt Template for LLM
prompt_template = PromptTemplate(
template="""
You are an AI assistant designed to analyze Jira ticket details and extract key flags and sentiment.
Analyze the following Jira ticket information and provide your analysis in a JSON format.
Ensure the JSON strictly adheres to the specified schema.
Consider the overall context of the ticket and specifically the latest comment if provided.
Issue Key: {issueKey}
Summary: {summary}
Description: {description}
Status: {status}
Existing Labels: {labels}
Assignee: {assignee}
Last Updated: {updated}
Latest Comment (if applicable): {comment}
**Analysis Request:**
- Determine if there are signs of multiple escalation attempts in the descriptions or comments.
- Assess if the issue requires urgent attention based on language or context from the summary, description, or latest comment.
- Summarize the overall customer sentiment evident in the issue.
- Suggest relevant Jira labels that should be applied to this issue.
- Provide a concise summary of the key concerns or problems described in the ticket.
- Generate a concise, objective comment (max 2-3 sentences) suitable for directly adding to the Jira ticket, summarizing the AI's findings.
{format_instructions}
""",
input_variables=[
"issueKey", "summary", "description", "status", "labels",
"assignee", "updated", "comment"
],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
# Chain for LLM invocation
analysis_chain = prompt_template | llm | parser
# --- Webhook Endpoint ---
@app.post("/jira-webhook")
async def jira_webhook_handler(payload: JiraWebhookPayload):
# Initialize Langfuse CallbackHandler for this request
# This ensures each webhook invocation gets its own trace in Langfuse
langfuse_handler = CallbackHandler()
try:
logger.info(f"Received webhook for Jira issue: {payload.issueKey}")
# Prepare payload for LangChain:
# 1. Use the 'comment' field directly if it exists, as it's typically the trigger.
# 2. Convert Optional fields to usable strings for the prompt.
# This mapping handles potential None values in the payload
llm_input = {
"issueKey": payload.issueKey,
"summary": payload.summary,
"description": payload.description if payload.description else "No description provided.",
"status": payload.status if payload.status else "Unknown",
"labels": ", ".join(payload.labels) if payload.labels else "None",
"assignee": payload.assignee if payload.assignee else "Unassigned",
"updated": payload.updated if payload.updated else "Unknown",
"comment": payload.comment if payload.comment else "No new comment provided."
}
# Pass data to LangChain for analysis
# Using ainvoke for async execution
# Add the Langfuse handler to the config of the ainvoke call
analysis_result = await analysis_chain.ainvoke(
llm_input,
config={
"callbacks": [langfuse_handler],
"metadata": {
"trace_name": f"JiraWebhook-{payload.issueKey}"
}
}
)
logger.debug(f"LLM Analysis Result for {payload.issueKey}: {json.dumps(analysis_result, indent=2)}")
return {"status": "success", "analysis_flags": analysis_result}
except Exception as e:
logger.error(f"Error processing webhook: {e}")
import traceback
traceback.print_exc() # Print full traceback for debugging
# In case of an error, you might want to log it to Langfuse as well
# You can update the trace with an error
if langfuse_handler.trace: # Check if the trace was started
langfuse_handler.trace.update(
status_message=f"Error: {str(e)}",
level="ERROR"
)
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
finally:
# It's good practice to flush the Langfuse client to ensure all events are sent
# This is especially important in short-lived processes or serverless functions
# For a long-running FastAPI app, the client's internal queue usually handles this
# but explicit flush can be useful for immediate visibility or during testing.
if langfuse_client:
langfuse_client.flush()
# To run this:
# 1. Set OPENAI_API_KEY, LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST environment variables
# 2. Start FastAPI: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
@app.post("/test-llm")
async def test_llm():
"""Test endpoint for LLM integration"""
# Correctly initialize the Langfuse CallbackHandler.
# It inherits the client configuration from the global 'langfuse' instance.
# If you need to name the trace, you do so in the 'ainvoke' call's metadata.
langfuse_handler = CallbackHandler(
# The constructor does not take 'trace_name'.
# Remove it from here.
)
test_payload = {
"issueKey": "TEST-123",
"summary": "Test issue",
"description": "This is a test issue for LLM integration",
"comment": "Testing OpenAI integration with Langfuse",
"labels": ["test"],
"status": "Open",
"assignee": "Tester",
"updated": "2025-07-04T21:40:00Z"
}
try:
llm_input = {
"issueKey": test_payload["issueKey"],
"summary": test_payload["summary"],
"description": test_payload["description"],
"status": test_payload["status"],
"labels": ", ".join(test_payload["labels"]),
"assignee": test_payload["assignee"],
"updated": test_payload["updated"],
"comment": test_payload["comment"]
}
# To name the trace, you pass it in the config's metadata
result = await analysis_chain.ainvoke(
llm_input,
config={
"callbacks": [langfuse_handler],
"metadata": {
"trace_name": "TestLLM" # Correct way to name the trace
}
}
)
return {
"status": "success",
"result": result
}
except Exception as e:
if langfuse_handler.trace:
langfuse_handler.trace.update(
status_message=f"Error in test-llm: {str(e)}",
level="ERROR"
)
logger.error(f"Error in /test-llm: {e}")
return {
"status": "error",
"message": str(e)
}
finally:
if langfuse_client:
langfuse_client.flush()