Refactor Jira Webhook LLM integration
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run

- Simplified the FastAPI application structure and improved error handling with middleware.
- Introduced a retry decorator for asynchronous functions to enhance reliability.
- Modularized the LLM initialization and prompt loading into separate functions for better maintainability.
- Updated Pydantic models for Jira webhook payload and analysis flags to ensure proper validation and structure.
- Implemented a structured logging configuration for better traceability and debugging.
- Added comprehensive unit tests for prompt loading, response validation, and webhook handling.
- Established a CI/CD pipeline with GitHub Actions for automated testing and coverage reporting.
- Enhanced the prompt template for LLM analysis to include specific instructions for handling escalations.
This commit is contained in:
Ireneusz Bachanowicz 2025-07-13 13:19:10 +02:00
parent 0c468c0a69
commit 2763b40b60
18 changed files with 588 additions and 1440 deletions

33
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,33 @@
name: CI/CD Pipeline
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run tests with coverage
run: |
pytest --cov=./ --cov-report=xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: ./coverage.xml

129
config.py
View File

@ -1,75 +1,90 @@
import os
import sys
import yaml
from loguru import logger
from typing import Optional
from pydantic_settings import BaseSettings
from pydantic import validator, ConfigDict
from loguru import logger
from watchfiles import watch, Change
from threading import Thread
# Define a custom exception for configuration errors
class AppConfigError(Exception):
pass
class LogConfig(BaseSettings):
level: str = 'INFO'
model_config = ConfigDict(
env_prefix='LOG_',
extra='ignore'
)
class LLMConfig(BaseSettings):
mode: str = 'ollama'
# OpenAI settings
openai_api_key: Optional[str] = None
openai_api_base_url: Optional[str] = None
openai_model: Optional[str] = None
# Ollama settings
ollama_base_url: Optional[str] = None
ollama_model: Optional[str] = None
@validator('mode')
def validate_mode(cls, v):
if v not in ['openai', 'ollama']:
raise ValueError("LLM mode must be either 'openai' or 'ollama'")
return v
model_config = ConfigDict(
env_prefix='LLM_',
env_file='.env',
env_file_encoding='utf-8',
extra='ignore'
)
class Settings:
def __init__(self, config_path: str = "config/application.yml"):
"""
Loads configuration from a YAML file and overrides with environment variables.
"""
# --- Load from YAML file ---
try:
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
except FileNotFoundError:
raise AppConfigError(f"Configuration file not found at '{config_path}'.")
except yaml.YAMLError as e:
raise AppConfigError(f"Error parsing YAML file: {e}")
# --- Read and Combine Settings (Environment variables take precedence) ---
llm_config = config.get('llm', {})
# General settings
self.llm_mode: str = os.getenv("LLM_MODE", llm_config.get('mode', 'openai')).lower()
# OpenAI settings
openai_config = llm_config.get('openai', {})
self.openai_api_key: Optional[str] = os.getenv("OPENAI_API_KEY", openai_config.get('api_key'))
self.openai_api_base_url: Optional[str] = os.getenv("OPENAI_API_BASE_URL", openai_config.get('api_base_url'))
self.openai_model: Optional[str] = os.getenv("OPENAI_MODEL", openai_config.get('model'))
# Ollama settings
ollama_config = llm_config.get('ollama', {})
self.ollama_base_url: Optional[str] = os.getenv("OLLAMA_BASE_URL", ollama_config.get('base_url'))
self.ollama_model: Optional[str] = os.getenv("OLLAMA_MODEL", ollama_config.get('model'))
def __init__(self):
self.log = LogConfig()
self.llm = LLMConfig()
self._validate()
self._start_watcher()
def _validate(self):
"""
Validates that required configuration variables are set.
"""
logger.info(f"LLM mode set to: '{self.llm_mode}'")
logger.info(f"LLM mode set to: '{self.llm.mode}'")
if self.llm.mode == 'openai':
if not self.llm.openai_api_key:
raise ValueError("LLM mode is 'openai', but OPENAI_API_KEY is not set.")
if not self.llm.openai_api_base_url:
raise ValueError("LLM mode is 'openai', but OPENAI_API_BASE_URL is not set.")
if not self.llm.openai_model:
raise ValueError("LLM mode is 'openai', but OPENAI_MODEL is not set.")
if self.llm_mode == 'openai':
if not self.openai_api_key:
raise AppConfigError("LLM mode is 'openai', but OPENAI_API_KEY is not set.")
if not self.openai_api_base_url:
raise AppConfigError("LLM mode is 'openai', but OPENAI_API_BASE_URL is not set.")
if not self.openai_model:
raise AppConfigError("LLM mode is 'openai', but OPENAI_MODEL is not set.")
elif self.llm_mode == 'ollama':
if not self.ollama_base_url:
raise AppConfigError("LLM mode is 'ollama', but OLLAMA_BASE_URL is not set.")
if not self.ollama_model:
raise AppConfigError("LLM mode is 'ollama', but OLLAMA_MODEL is not set.")
else:
raise AppConfigError(f"Invalid LLM_MODE: '{self.llm_mode}'. Must be 'openai' or 'ollama'.")
elif self.llm.mode == 'ollama':
if not self.llm.ollama_base_url:
raise ValueError("LLM mode is 'ollama', but OLLAMA_BASE_URL is not set.")
if not self.llm.ollama_model:
raise ValueError("LLM mode is 'ollama', but OLLAMA_MODEL is not set.")
logger.info("Configuration validated successfully.")
def _start_watcher(self):
def watch_config():
for changes in watch('config/application.yml'):
for change in changes:
if change[0] == Change.modified:
logger.info("Configuration file modified, reloading settings...")
try:
self.llm = LLMConfig()
self._validate()
logger.info("Configuration reloaded successfully")
except Exception as e:
logger.error(f"Error reloading configuration: {e}")
Thread(target=watch_config, daemon=True).start()
# Create a single, validated instance of the settings to be imported by other modules.
try:
settings = Settings()
except AppConfigError as e:
except ValueError as e:
logger.error(f"FATAL: {e}")
logger.error("Application shutting down due to configuration error.")
sys.exit(1) # Exit the application if configuration is invalid
sys.exit(1)

View File

@ -10,7 +10,7 @@ llm:
# It's HIGHLY recommended to set this via an environment variable
# instead of saving it in this file.
# Can be overridden by OPENAI_API_KEY
api_key: "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"
api_key: "sk-or-v1-..."
# Can be overridden by OPENAI_API_BASE_URL
api_base_url: "https://openrouter.ai/api/v1"
@ -21,10 +21,11 @@ llm:
# Settings for Ollama
ollama:
# Can be overridden by OLLAMA_BASE_URL
base_url: "http://localhost:11434"
base_url: "http://192.168.0.122:11434"
# base_url: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
# Can be overridden by OLLAMA_MODEL
# model: "phi4-mini:latest"
model: "phi4-mini:latest"
# model: "qwen3:1.7b"
model: "smollm:360m"
# model: "mollm:360m"

File diff suppressed because one or more lines are too long

View File

@ -1,65 +1,33 @@
name: jira-llm-stack
name: jira-webhook-stack
services:
# Service for the Ollama server
ollama:
image: ollama/ollama:latest
# Map port 11434 from the container to the host machine
# This allows you to access Ollama directly from your host if needed (e.g., via curl http://localhost:11434)
ollama-jira:
image: artifactory.pfizer.com/mdmhub-docker-dev/mdmtools/ollama/ollama-preloaded:0.0.1
ports:
- "11434:11434"
# Mount a volume to persist Ollama models and data
# This prevents redownloading models every time the container restarts
volumes:
- ollama_data:/root/.ollama
# CORRECTED COMMAND:
# We explicitly tell Docker to use 'bash -c' to execute the string.
# This ensures that 'ollama pull' and 'ollama serve' are run sequentially.
entrypoint: ["sh"]
command: ["-c", "ollama serve && ollama pull phi4-mini:latest"]
# Restart the container if it exits unexpectedly
restart: unless-stopped
# Service for your FastAPI application
app:
# Build the Docker image for your app from the current directory (where Dockerfile is located)
build: .
# Map port 8000 from the container to the host machine
# This allows you to access your FastAPI app at http://localhost:8000
jira-webhook-llm:
image: artifactory.pfizer.com/mdmhub-docker-dev/mdmtools/ollama/jira-webhook-llm:0.1.8
ports:
- "8000:8000"
# Define environment variables for your FastAPI application
# These will be read by pydantic-settings in your app
environment:
# Set the LLM mode to 'ollama'
# Set the LLM mode to 'ollama' or 'openai'
LLM_MODE: ollama
# Point to the Ollama service within the Docker Compose network
# 'ollama' is the service name, which acts as a hostname within the network
OLLAMA_BASE_URL: http://192.168.0.122:11434
OLLAMA_BASE_URL: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
# Specify the model to use
OLLAMA_MODEL: gemma3:1b
# If you have an OpenAI API key in your settings, but want to ensure it's not used
# when LLM_MODE is ollama, you can explicitly set it to empty or omit it.
# OPENAI_API_KEY: ""
# OPENAI_MODEL: ""
OLLAMA_MODEL: phi4-mini:latest
# Ensure the Ollama service starts and is healthy before starting the app
depends_on:
- ollama
# Restart the container if it exits unexpectedly
- ollama-jira
restart: unless-stopped
# Mount your current project directory into the container
# This is useful for development, as changes to your code will be reflected
# without rebuilding the image (if you're using a hot-reloading server like uvicorn --reload)
# For production, you might remove this and rely solely on the Dockerfile copy.
volumes:
- .:/app
# Command to run your FastAPI application using Uvicorn
# --host 0.0.0.0 is crucial for the app to be accessible from outside the container
# --reload is good for development; remove for production
command: uvicorn jira-webhook-llm:app --host 0.0.0.0 --port 8000 --reload
# Define named volumes for persistent data
volumes:
ollama_data:
driver: local
command: uvicorn jira-webhook-llm:app --host 0.0.0.0 --port 8000

File diff suppressed because one or more lines are too long

View File

@ -1,279 +1,102 @@
import os
import json
from typing import Optional, List, Union
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field, ConfigDict, validator
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from fastapi.responses import JSONResponse
from loguru import logger
import uuid
import sys
from typing import Optional
from datetime import datetime
import asyncio
from functools import wraps
# Import your new settings object
from config import settings
from webhooks.handlers import JiraWebhookHandler
from llm.models import JiraWebhookPayload
from logging_config import configure_logging
# LangChain imports
from langchain_ollama import OllamaLLM
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel as LCBaseModel
from pydantic import field_validator
# Langfuse imports
from langfuse import Langfuse, get_client
from langfuse.langchain import CallbackHandler
# LANGFUSE_PUBLIC_KEY="pk_lf_..."
# LANGFUSE_SECRET_KEY="sk_lf_..."
# LANGFUSE_HOST="https://cloud.langfuse.com" # Or "https://us.cloud.langfuse.com" for US region, or your self-hosted instance
langfuse = Langfuse(
secret_key="sk-lf-55d5fa70-e2d3-44d0-ae76-48181126d7ed",
public_key="pk-lf-0f6178ee-e6aa-4cb7-a433-6c00c6512874",
host="https://cloud.langfuse.com"
)
# Initialize Langfuse client (optional, get_client() uses environment variables by default)
# It's good practice to initialize it early to ensure connection.
try:
langfuse_client = get_client()
if langfuse_client.auth_check():
logger.info("Langfuse client authenticated successfully.")
else:
logger.warning("Langfuse authentication failed. Check your API keys and host.")
app = FastAPI()
logger.info("FastAPI application initialized")
except Exception as e:
logger.error(f"Failed to initialize Langfuse client: {e}")
# Depending on your tolerance, you might want to exit or continue without tracing
# For now, we'll just log and continue, but traces won't be sent.
logger.error(f"Error initializing FastAPI: {str(e)}")
raise
# Initialize logging
configure_logging(log_level=settings.log.level)
# --- Pydantic Models for Jira Payload and LLM Output ---
# Configuration for Pydantic to handle camelCase to snake_case conversion
class JiraWebhookPayload(BaseModel):
model_config = ConfigDict(alias_generator=lambda x: ''.join(word.capitalize() if i > 0 else word for i, word in enumerate(x.split('_'))), populate_by_name=True)
def retry(max_retries: int = 3, delay: float = 1.0):
"""Decorator for retrying failed operations"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
last_error = e
logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
await asyncio.sleep(delay * (attempt + 1))
raise last_error
return wrapper
return decorator
issueKey: str
summary: str
description: Optional[str] = None
comment: Optional[str] = None # Assuming this is the *new* comment that triggered the webhook
labels: Optional[Union[List[str], str]] = []
class ErrorResponse(BaseModel):
error_id: str
timestamp: str
status_code: int
message: str
details: Optional[str] = None
@app.middleware("http")
async def error_handling_middleware(request: Request, call_next):
request_id = str(uuid.uuid4())
logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}")
@field_validator('labels', mode='before') # `pre=True` becomes `mode='before'`
@classmethod # V2 validators must be classmethods
def convert_labels_to_list(cls, v):
if isinstance(v, str):
return [v]
return v or [] # Return an empty list if v is None/empty
status: Optional[str] = None
assignee: Optional[str] = None
updated: Optional[str] = None # Timestamp string
try:
response = await call_next(request)
return response
except HTTPException as e:
logger.error(f"HTTP Error: {e.status_code} - {e.detail}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.utcnow().isoformat(),
status_code=e.status_code,
message=e.detail,
details=str(e)
)
return JSONResponse(status_code=e.status_code, content=error_response.model_dump())
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.utcnow().isoformat(),
status_code=500,
message="Internal Server Error",
details=str(e)
)
return JSONResponse(status_code=500, content=error_response.model_dump())
webhook_handler = JiraWebhookHandler()
# Define the structure of the LLM's expected JSON output
class AnalysisFlags(LCBaseModel):
hasMultipleEscalations: bool = Field(description="Is there evidence of multiple escalation attempts or channels?")
requiresUrgentAttention: bool = Field(description="Does the issue convey a sense of urgency beyond standard priority?")
customerSentiment: Optional[str] = Field(description="Overall customer sentiment (e.g., 'neutral', 'frustrated', 'calm').")
suggestedLabels: List[str] = Field(description="List of suggested Jira labels, e.g., ['escalated', 'high-customer-impact'].")
summaryOfConcerns: Optional[str] = Field(description="A concise summary of the key concerns or problems in the ticket.")
# --- LLM Setup (Now dynamic based on config) ---
llm = None
if settings.llm_mode == 'openai':
logger.info(f"Initializing ChatOpenAI with model: {settings.openai_model}")
llm = ChatOpenAI(
model=settings.openai_model,
temperature=0.7,
max_tokens=2000,
api_key=settings.openai_api_key,
base_url=settings.openai_api_base_url
)
elif settings.llm_mode == 'ollama':
logger.info(f"Initializing OllamaLLM with model: {settings.ollama_model} at {settings.ollama_base_url}")
llm = OllamaLLM(
model=settings.ollama_model,
base_url=settings.ollama_base_url,
streaming=False
)
# This check is now redundant because config.py would have exited, but it's good for clarity.
if llm is None:
logger.error("LLM could not be initialized. Exiting.")
sys.exit(1)
app = FastAPI()
# Set up Output Parser for structured JSON
parser = JsonOutputParser(pydantic_object=AnalysisFlags)
# Prompt Template for LLM
prompt_template = PromptTemplate(
template="""
You are an AI assistant designed to analyze Jira ticket details and extract key flags and sentiment.
Analyze the following Jira ticket information and provide your analysis in a JSON format.
Ensure the JSON strictly adheres to the specified schema.
Consider the overall context of the ticket and specifically the latest comment if provided.
Issue Key: {issueKey}
Summary: {summary}
Description: {description}
Status: {status}
Existing Labels: {labels}
Assignee: {assignee}
Last Updated: {updated}
Latest Comment (if applicable): {comment}
**Analysis Request:**
- Determine if there are signs of multiple escalation attempts in the descriptions or comments.
- Assess if the issue requires urgent attention based on language or context from the summary, description, or latest comment.
- Summarize the overall customer sentiment evident in the issue.
- Suggest relevant Jira labels that should be applied to this issue.
- Provide a concise summary of the key concerns or problems described in the ticket.
- Generate a concise, objective comment (max 2-3 sentences) suitable for directly adding to the Jira ticket, summarizing the AI's findings.
{format_instructions}
""",
input_variables=[
"issueKey", "summary", "description", "status", "labels",
"assignee", "updated", "comment"
],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
# Chain for LLM invocation
analysis_chain = prompt_template | llm | parser
# --- Webhook Endpoint ---
@app.post("/jira-webhook")
async def jira_webhook_handler(payload: JiraWebhookPayload):
# Initialize Langfuse CallbackHandler for this request
# This ensures each webhook invocation gets its own trace in Langfuse
langfuse_handler = CallbackHandler()
return await webhook_handler.handle_webhook(payload)
try:
logger.info(f"Received webhook for Jira issue: {payload.issueKey}")
# Prepare payload for LangChain:
# 1. Use the 'comment' field directly if it exists, as it's typically the trigger.
# 2. Convert Optional fields to usable strings for the prompt.
# This mapping handles potential None values in the payload
llm_input = {
"issueKey": payload.issueKey,
"summary": payload.summary,
"description": payload.description if payload.description else "No description provided.",
"status": payload.status if payload.status else "Unknown",
"labels": ", ".join(payload.labels) if payload.labels else "None",
"assignee": payload.assignee if payload.assignee else "Unassigned",
"updated": payload.updated if payload.updated else "Unknown",
"comment": payload.comment if payload.comment else "No new comment provided."
}
# Pass data to LangChain for analysis
# Using ainvoke for async execution
# Add the Langfuse handler to the config of the ainvoke call
analysis_result = await analysis_chain.ainvoke(
llm_input,
config={
"callbacks": [langfuse_handler],
"metadata": {
"trace_name": f"JiraWebhook-{payload.issueKey}"
}
}
)
logger.debug(f"LLM Analysis Result for {payload.issueKey}: {json.dumps(analysis_result, indent=2)}")
return {"status": "success", "analysis_flags": analysis_result}
except Exception as e:
logger.error(f"Error processing webhook: {e}")
import traceback
traceback.print_exc() # Print full traceback for debugging
# In case of an error, you might want to log it to Langfuse as well
# You can update the trace with an error
if langfuse_handler.trace: # Check if the trace was started
langfuse_handler.trace.update(
status_message=f"Error: {str(e)}",
level="ERROR"
)
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")
finally:
# It's good practice to flush the Langfuse client to ensure all events are sent
# This is especially important in short-lived processes or serverless functions
# For a long-running FastAPI app, the client's internal queue usually handles this
# but explicit flush can be useful for immediate visibility or during testing.
if langfuse_client:
langfuse_client.flush()
# To run this:
# 1. Set OPENAI_API_KEY, LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST environment variables
# 2. Start FastAPI: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
@app.post("/test-llm")
async def test_llm():
"""Test endpoint for LLM integration"""
# Correctly initialize the Langfuse CallbackHandler.
# It inherits the client configuration from the global 'langfuse' instance.
# If you need to name the trace, you do so in the 'ainvoke' call's metadata.
langfuse_handler = CallbackHandler(
# The constructor does not take 'trace_name'.
# Remove it from here.
test_payload = JiraWebhookPayload(
issueKey="TEST-123",
summary="Test issue",
description="This is a test issue for LLM integration",
comment="Testing OpenAI integration with Langfuse",
labels=["test"],
status="Open",
assignee="Tester",
updated="2025-07-04T21:40:00Z"
)
return await webhook_handler.handle_webhook(test_payload)
test_payload = {
"issueKey": "TEST-123",
"summary": "Test issue",
"description": "This is a test issue for LLM integration",
"comment": "Testing OpenAI integration with Langfuse",
"labels": ["test"],
"status": "Open",
"assignee": "Tester",
"updated": "2025-07-04T21:40:00Z"
}
try:
llm_input = {
"issueKey": test_payload["issueKey"],
"summary": test_payload["summary"],
"description": test_payload["description"],
"status": test_payload["status"],
"labels": ", ".join(test_payload["labels"]),
"assignee": test_payload["assignee"],
"updated": test_payload["updated"],
"comment": test_payload["comment"]
}
# To name the trace, you pass it in the config's metadata
result = await analysis_chain.ainvoke(
llm_input,
config={
"callbacks": [langfuse_handler],
"metadata": {
"trace_name": "TestLLM" # Correct way to name the trace
}
}
)
return {
"status": "success",
"result": result
}
except Exception as e:
if langfuse_handler.trace:
langfuse_handler.trace.update(
status_message=f"Error in test-llm: {str(e)}",
level="ERROR"
)
logger.error(f"Error in /test-llm: {e}")
return {
"status": "error",
"message": str(e)
}
finally:
if langfuse_client:
langfuse_client.flush()
# To run this:
# 1. Start FastAPI: uvicorn main:app --host 0.0.0.0 --port 8000 --reload

74
llm/chains.py Normal file
View File

@ -0,0 +1,74 @@
from langchain_ollama import OllamaLLM
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from loguru import logger
from config import settings
from .models import AnalysisFlags
# Initialize LLM
llm = None
if settings.llm.mode == 'openai':
logger.info(f"Initializing ChatOpenAI with model: {settings.openai_model}")
llm = ChatOpenAI(
model=settings.openai_model,
temperature=0.7,
max_tokens=2000,
api_key=settings.openai_api_key,
base_url=settings.openai_api_base_url
)
elif settings.llm.mode == 'ollama':
logger.info(f"Initializing OllamaLLM with model: {settings.llm.ollama_model} at {settings.llm.ollama_base_url}")
llm = OllamaLLM(
model=settings.llm.ollama_model,
base_url=settings.llm.ollama_base_url,
streaming=False
)
if llm is None:
logger.error("LLM could not be initialized. Exiting.")
sys.exit(1)
# Set up Output Parser for structured JSON
parser = JsonOutputParser(pydantic_object=AnalysisFlags)
# Load prompt template from file
def load_prompt_template(version="v1.0.0"):
try:
with open(f"llm/prompts/jira_analysis_{version}.txt", "r") as f:
template = f.read()
return PromptTemplate(
template=template,
input_variables=[
"issueKey", "summary", "description", "status", "labels",
"assignee", "updated", "comment"
],
partial_variables={"format_instructions": parser.get_format_instructions()},
)
except Exception as e:
logger.error(f"Failed to load prompt template: {str(e)}")
raise
# Fallback prompt template
FALLBACK_PROMPT = PromptTemplate(
template="Please analyze this Jira ticket and provide a basic summary.",
input_variables=["issueKey", "summary"]
)
# Create chain with fallback mechanism
def create_analysis_chain():
try:
prompt_template = load_prompt_template()
return prompt_template | llm | parser
except Exception as e:
logger.warning(f"Using fallback prompt due to error: {str(e)}")
return FALLBACK_PROMPT | llm | parser
# Initialize analysis chain
analysis_chain = create_analysis_chain()
# Response validation function
def validate_response(response: dict) -> bool:
required_fields = ["hasMultipleEscalations", "customerSentiment"]
return all(field in response for field in required_fields)

26
llm/models.py Normal file
View File

@ -0,0 +1,26 @@
from typing import Optional, List, Union
from pydantic import BaseModel, ConfigDict, field_validator, Field
class JiraWebhookPayload(BaseModel):
model_config = ConfigDict(alias_generator=lambda x: ''.join(word.capitalize() if i > 0 else word for i, word in enumerate(x.split('_'))), populate_by_name=True)
issueKey: str
summary: str
description: Optional[str] = None
comment: Optional[str] = None
labels: Optional[Union[List[str], str]] = []
@field_validator('labels', mode='before')
@classmethod
def convert_labels_to_list(cls, v):
if isinstance(v, str):
return [v]
return v or []
status: Optional[str] = None
assignee: Optional[str] = None
updated: Optional[str] = None
class AnalysisFlags(BaseModel):
hasMultipleEscalations: bool = Field(description="Is there evidence of multiple escalation attempts?")
customerSentiment: Optional[str] = Field(description="Overall customer sentiment (e.g., 'neutral', 'frustrated', 'calm').")

29
llm/prompt_tests.py Normal file
View File

@ -0,0 +1,29 @@
import unittest
from llm.chains import load_prompt_template, validate_response
from llm.models import AnalysisFlags
class PromptTests(unittest.TestCase):
def test_prompt_loading(self):
"""Test that prompt template loads correctly"""
try:
template = load_prompt_template()
self.assertIsNotNone(template)
self.assertIn("issueKey", template.input_variables)
except Exception as e:
self.fail(f"Prompt loading failed: {str(e)}")
def test_response_validation(self):
"""Test response validation logic"""
valid_response = {
"hasMultipleEscalations": False,
"customerSentiment": "neutral"
}
invalid_response = {
"customerSentiment": "neutral"
}
self.assertTrue(validate_response(valid_response))
self.assertFalse(validate_response(invalid_response))
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,23 @@
You are an AI assistant designed to analyze Jira ticket details containe email correspondence and extract key flags and sentiment.
Analyze the following Jira ticket information and provide your analysis in a JSON format.
Ensure the JSON strictly adheres to the specified schema.
Consider the overall context of the ticket and specifically the latest comment if provided.
Issue Key: {issueKey}
Summary: {summary}
Description: {description}
Status: {status}
Existing Labels: {labels}
Assignee: {assignee}
Last Updated: {updated}
Latest Comment (if applicable): {comment}
**Analysis Request:**
- Determine if there are signs of multiple escalation attempts in the descriptions or comments with regards to HUB team. Escalation to other teams are not considered.
-- Usually multiple requests one after another are being called by the same user in span of hours or days asking for immediate help of HUB team. Normall discussion, responses back and forth, are not considered as a escalation.
- Assess if the issue requires urgent attention based on language or context from the summary, description, or latest comment.
-- Usually means that Customer is asking for help due to upcoming deadlines, other high priority issues which are blocked due to our stall.
- Summarize the overall customer sentiment evident in the issue. Analyse tone of responses, happiness, gratefullnes, iritation, etc.
{format_instructions}

46
logging_config.py Normal file
View File

@ -0,0 +1,46 @@
import sys
from loguru import logger
logger.configure(
extra={"request_id": "N/A"}
)
import os
from pathlib import Path
from datetime import datetime
from typing import Optional
def configure_logging(log_level: str = "INFO", log_dir: Optional[str] = None):
"""Configure structured logging for the application"""
# Default log directory
if not log_dir:
log_dir = os.getenv("LOG_DIR", "logs")
# Create log directory if it doesn't exist
Path(log_dir).mkdir(parents=True, exist_ok=True)
# Log file path with timestamp
log_file = Path(log_dir) / f"jira-webhook-llm_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
# Remove default logger
logger.remove()
# Add console logger
logger.add(
sys.stdout,
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {extra[request_id]} | {message}",
colorize=True
)
# Add file logger
logger.add(
str(log_file),
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {extra[request_id]} | {message}",
rotation="100 MB",
retention="30 days",
compression="zip"
)
logger.info("Logging configured successfully")

View File

@ -1,10 +1,18 @@
fastapi==0.111.0
pydantic==2.9.0 # Changed from 2.7.4 to meet ollama's requirement
pydantic-settings==2.0.0
langchain==0.3.26
langchain-ollama==0.3.3
langchain-openai==0.3.27
langchain-core==0.3.68
langfuse==3.1.3
uvicorn==0.30.1
python-multipart==0.0.9 # Good to include for FastAPI forms
loguru==0.7.3
langfuse==3.1.3
# Testing dependencies
unittest2>=1.1.0
# Testing dependencies
pytest==8.2.0
pytest-asyncio==0.23.5
pytest-cov==4.1.0
httpx==0.27.0

1
tests/__init__.py Normal file
View File

@ -0,0 +1 @@
# Initialize tests package

20
tests/conftest.py Normal file
View File

@ -0,0 +1,20 @@
import pytest
from fastapi.testclient import TestClient
from jira_webhook_llm import app
@pytest.fixture
def test_client():
return TestClient(app)
@pytest.fixture
def mock_jira_payload():
return {
"issueKey": "TEST-123",
"summary": "Test Issue",
"description": "Test Description",
"comment": "Test Comment",
"labels": ["test"],
"status": "Open",
"assignee": "Tester",
"updated": "2025-07-13T12:00:00Z"
}

38
tests/test_core.py Normal file
View File

@ -0,0 +1,38 @@
import pytest
from fastapi import HTTPException
from jira_webhook_llm import app
from llm.models import JiraWebhookPayload
def test_error_handling_middleware(test_client, mock_jira_payload):
# Test 404 error handling
response = test_client.post("/nonexistent-endpoint", json={})
assert response.status_code == 404
assert "error_id" in response.json()
# Test validation error handling
invalid_payload = mock_jira_payload.copy()
invalid_payload.pop("issueKey")
response = test_client.post("/jira-webhook", json=invalid_payload)
assert response.status_code == 422
assert "details" in response.json()
def test_webhook_handler(test_client, mock_jira_payload):
# Test successful webhook handling
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
assert "response" in response.json()
def test_llm_test_endpoint(test_client):
# Test LLM test endpoint
response = test_client.post("/test-llm")
assert response.status_code == 200
assert "response" in response.json()
def test_retry_decorator():
# Test retry decorator functionality
@app.retry(max_retries=3)
async def failing_function():
raise Exception("Test error")
with pytest.raises(Exception):
failing_function()

View File

@ -0,0 +1,34 @@
import pytest
from fastapi.testclient import TestClient
from jira_webhook_llm import app
from llm.models import JiraWebhookPayload
def test_llm_response_format(test_client, mock_jira_payload):
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
response_data = response.json()
# Validate response structure
assert "response" in response_data
assert "analysis" in response_data["response"]
assert "recommendations" in response_data["response"]
assert "status" in response_data["response"]
def test_llm_response_content_validation(test_client, mock_jira_payload):
response = test_client.post("/jira-webhook", json=mock_jira_payload)
response_data = response.json()
# Validate content types
assert isinstance(response_data["response"]["analysis"], str)
assert isinstance(response_data["response"]["recommendations"], list)
assert isinstance(response_data["response"]["status"], str)
def test_llm_error_handling(test_client):
# Test with invalid payload
invalid_payload = {"invalid": "data"}
response = test_client.post("/jira-webhook", json=invalid_payload)
assert response.status_code == 422
# Test with empty payload
response = test_client.post("/jira-webhook", json={})
assert response.status_code == 422

81
webhooks/handlers.py Normal file
View File

@ -0,0 +1,81 @@
from fastapi import HTTPException
from loguru import logger
import json
from typing import Optional, List, Union
from pydantic import BaseModel, ConfigDict, field_validator
from datetime import datetime
from config import settings
from llm.models import JiraWebhookPayload, AnalysisFlags
from llm.chains import analysis_chain
class BadRequestError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=400, detail=detail)
class RateLimitError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=429, detail=detail)
class ValidationError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=422, detail=detail)
class JiraWebhookHandler:
def __init__(self):
self.analysis_chain = analysis_chain
async def handle_webhook(self, payload: JiraWebhookPayload):
try:
if not payload.issueKey:
raise BadRequestError("Missing required field: issueKey")
if not payload.summary:
raise BadRequestError("Missing required field: summary")
logger.bind(
issue_key=payload.issueKey,
timestamp=datetime.utcnow().isoformat()
).info("Received webhook")
llm_input = {
"issueKey": payload.issueKey,
"summary": payload.summary,
"description": payload.description if payload.description else "No description provided.",
"status": payload.status if payload.status else "Unknown",
"labels": ", ".join(payload.labels) if payload.labels else "None",
"assignee": payload.assignee if payload.assignee else "Unassigned",
"updated": payload.updated if payload.updated else "Unknown",
"comment": payload.comment if payload.comment else "No new comment provided."
}
try:
analysis_result = await self.analysis_chain.ainvoke(llm_input)
# Validate LLM response
if not validate_response(analysis_result):
logger.warning(f"Invalid LLM response format for {payload.issueKey}")
analysis_result = {
"hasMultipleEscalations": False,
"customerSentiment": "neutral"
}
logger.debug(f"LLM Analysis Result for {payload.issueKey}: {json.dumps(analysis_result, indent=2)}")
return {"status": "success", "analysis_flags": analysis_result}
except Exception as e:
logger.error(f"LLM processing failed for {payload.issueKey}: {str(e)}")
return {
"status": "error",
"analysis_flags": {
"hasMultipleEscalations": False,
"customerSentiment": "neutral"
},
"error": str(e)
}
except Exception as e:
logger.error(f"Error processing webhook: {str(e)}")
import traceback
logger.error(f"Stack trace: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")