Compare commits

..

16 Commits

Author SHA1 Message Date
Ireneusz Bachanowicz
d1fa9385e7 Release for qwen3:4b model
Some checks failed
CI/CD Pipeline / test (push) Has been cancelled
2025-08-01 18:15:57 +02:00
Ireneusz Bachanowicz
6f5e817011 Another synchro
Some checks failed
CI/CD Pipeline / test (push) Has been cancelled
2025-07-28 09:40:04 +02:00
Ireneusz Bachanowicz
6a57d91b7e 0.2.0 - conf for sandbox
Some checks failed
CI/CD Pipeline / test (push) Has been cancelled
2025-07-22 18:24:02 +02:00
9e698e40f9 STABLE feat: Implement Gemini integration; update configuration for Gemini API and model; enhance Jira webhook processing; refactor application structure and dependencies
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-22 00:41:17 +02:00
Ireneusz Bachanowicz
79bf65265d feat: Refactor Jira webhook handling; introduce new routes for request processing and response retrieval; update configuration settings and dependencies
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-21 22:42:56 +02:00
cbe1a430ad feat: Refactor API handlers and consolidate webhook processing; implement new webhook routes and enhance request handling
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-21 16:16:20 +02:00
8c1ab79eeb Refactor LLM analysis chain and models; remove deprecated prompt files
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
- Updated `chains.py` to streamline imports and improve error handling for LLM initialization.
- Modified `models.py` to enhance the `AnalysisFlags` model with field aliases and added datetime import.
- Deleted outdated prompt files (`jira_analysis_v1.0.0.txt`, `jira_analysis_v1.1.0.txt`, `jira_analysis_v1.2.0.txt`) to clean up the repository.
- Introduced a new prompt file `jira_analysis_v1.2.0.txt` with updated instructions for analysis.
- Removed `logging_config.py` and test files to simplify the codebase.
- Updated webhook handler to improve error handling and logging.
- Added a new shared store for managing processing requests in a thread-safe manner.
2025-07-21 01:06:45 +02:00
a1bec4f674 feat: Enhance Jira webhook processing with retry logic and configuration; update analysis record handling and validation
Some checks failed
CI/CD Pipeline / test (push) Has been cancelled
2025-07-19 02:15:13 +02:00
e73b43e001 feat: Implement Jira webhook processor with retry logic and service configuration; enhance database interaction for analysis records
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-18 23:05:18 +02:00
041ba14424 All test pytests working
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-18 20:55:21 +02:00
ff66181768 feat: Update .gitignore to exclude .venv and modify config path for application.yml; enhance test setup with detailed logging and mock LLM analysis
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-18 20:49:21 +02:00
1ff74e3ffb feat: Add JiraAnalysisResponse model and update handlers to use it for analysis record serialization
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-18 01:44:17 +02:00
1de9f46517 feat: Add endpoint to create Jira analysis records and update existing endpoints for consistency
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-18 00:57:28 +02:00
935a8a49ae Almost stable tests
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-17 02:21:56 +02:00
de4758a26f feat: Improve graceful shutdown handling and enhance application initialization logging
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-16 00:27:45 +02:00
aa416f3652 feat: Add Ollama configuration file and update .gitignore to exclude .env
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-15 23:00:53 +02:00
34 changed files with 1397 additions and 1524 deletions

22
.env
View File

@ -1,14 +1,16 @@
# Ollama configuration
LLM_OLLAMA_BASE_URL=http://192.168.0.140:11434
LLM_OLLAMA_MODEL=phi4-mini:latest
# LLM_OLLAMA_MODEL=smollm:360m
# LLM_OLLAMA_MODEL=qwen3:0.6b
# LLM_OLLAMA_MODEL=qwen3:1.7b
# Logging configuration
# LLM_OLLAMA_BASE_URL="https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
LLM_OLLAMA_BASE_URL=http://ollama-jira:11434
# LLM_OLLAMA_MODEL=phi4-mini:latest
LLM_OLLAMA_MODEL=qwen3:4b
LOG_LEVEL=DEBUG
# Ollama API Key (required when using Ollama mode)
# Langfuse configuration
LANGFUSE_ENABLED=true
LANGFUSE_PUBLIC_KEY="pk-lf-17dfde63-93e2-4983-8aa7-2673d3ecaab8"
LANGFUSE_SECRET_KEY="sk-lf-ba41a266-6fe5-4c90-a483-bec8a7aaa321"
LANGFUSE_HOST="https://cloud.langfuse.com"
LANGFUSE_ENABLED=false
# LANGFUSE_PUBLIC_KEY="pk-lf-"
# LANGFUSE_SECRET_KEY="sk-lf-"
# LANGFUSE_HOST="https://cloud.langfuse.com"
# Gemini configuration
LLM_GEMINI_API_KEY=""
LLM_GEMINI_MODEL="gemini-2.5-flash"
LLM_MODE=ollama

18
.env-cnf_local Normal file
View File

@ -0,0 +1,18 @@
# Ollama configuration
# LLM_OLLAMA_BASE_URL=http://192.168.0.140:11434
LLM_OLLAMA_BASE_URL=http://192.168.0.122:11434
# LLM_OLLAMA_BASE_URL="https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
# LLM_OLLAMA_MODEL=phi4-mini:latest
LLM_OLLAMA_MODEL=qwen3:4b
# Logging configuration
LOG_LEVEL=DEBUG
# Ollama API Key (required when using Ollama mode)
# Langfuse configuration
LANGFUSE_ENABLED=true
LANGFUSE_PUBLIC_KEY="pk-lf-38238bc3-ffa1-48d3-8c8f-2048ecf8cc54"
LANGFUSE_SECRET_KEY="sk-lf-36f6bd37-b0e2-4656-9c63-fe98915e8872"
LANGFUSE_HOST="http://192.168.0.122:3000"
# Gemini configuration
LLM_GEMINI_API_KEY=""
LLM_GEMINI_MODEL="gemini-2.5-flash"
LLM_MODE=ollama

16
.env_cnf_amer Normal file
View File

@ -0,0 +1,16 @@
# Ollama configuration
# LLM_OLLAMA_BASE_URL="https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
LLM_OLLAMA_BASE_URL=http://ollama-jira:11434
# LLM_OLLAMA_MODEL=phi4-mini:latest
LLM_OLLAMA_MODEL=qwen3:4b
LOG_LEVEL=DEBUG
# Ollama API Key (required when using Ollama mode)
# Langfuse configuration
LANGFUSE_ENABLED=false
# LANGFUSE_PUBLIC_KEY="pk-lf-"
# LANGFUSE_SECRET_KEY="sk-lf-"
# LANGFUSE_HOST="https://cloud.langfuse.com"
# Gemini configuration
LLM_GEMINI_API_KEY=""
LLM_GEMINI_MODEL="gemini-2.5-flash"
LLM_MODE=ollama

4
.gitignore vendored
View File

@ -12,10 +12,12 @@ __pycache__/
.Python
env/
venv/
.venv/
*.egg
*.egg-info/
build/
dist/
.roo/*
# Editor files (e.g., Visual Studio Code, Sublime Text, Vim)
.vscode/
@ -45,5 +47,5 @@ obj/
*.class
# Miscellaneous
.env
#.env
.DS_Store

10
.roo/mcp.json Normal file
View File

@ -0,0 +1,10 @@
{"mcpServers":{ "context7": {
"command": "npx",
"args": [
"-y",
"@upstash/context7-mcp"
],
"env": {
"DEFAULT_MINIMUM_TOKENS": "256"
}
}}}

View File

@ -40,13 +40,17 @@ ENV PYTHONDONTWRITEBYTECODE=1 \
# Copy the configuration directory first.
# If only code changes, this layer remains cached.
COPY config ./config
COPY llm ./llm
COPY app ./app
# Copy your application source code.
COPY jira-webhook-llm.py .
COPY main.py .
COPY config.py .
COPY shared_store.py .
# Expose the port your application listens on.
EXPOSE 8000
# Define the command to run your application.
CMD ["uvicorn", "jira-webhook-llm:app", "--host", "0.0.0.0", "--port", "8000"]
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@ -51,88 +51,3 @@ The following events are tracked:
### Viewing Data
Visit your Langfuse dashboard to view the collected metrics and traces.
## Deployment Guide
### Redis Configuration
The application requires Redis for caching and queue management. Configure Redis in `application.yml`:
```yaml
redis:
host: "localhost"
port: 6379
password: ""
db: 0
```
Environment variables can also be used:
```bash
REDIS_HOST="localhost"
REDIS_PORT=6379
REDIS_PASSWORD=""
REDIS_DB=0
```
### Worker Process Management
The application uses Celery for background task processing. Configure workers in `application.yml`:
```yaml
celery:
workers: 4
concurrency: 2
max_tasks_per_child: 100
```
Start workers with:
```bash
celery -A jira-webhook-llm worker --loglevel=info
```
### Monitoring Setup
The application provides Prometheus metrics endpoint at `/metrics`. Configure monitoring:
1. Add Prometheus scrape config:
```yaml
scrape_configs:
- job_name: 'jira-webhook-llm'
static_configs:
- targets: ['localhost:8000']
```
2. Set up Grafana dashboard using the provided template
### Rate Limiting
Rate limiting is configured in `application.yml`:
```yaml
rate_limiting:
enabled: true
requests_per_minute: 60
burst_limit: 100
```
### Health Check Endpoint
The application provides a health check endpoint at `/health` that returns:
```json
{
"status": "OK",
"timestamp": "2025-07-14T01:59:42Z",
"components": {
"database": "OK",
"redis": "OK",
"celery": "OK"
}
}
```
### System Requirements
Minimum system requirements:
- Python 3.9+
- Redis 6.0+
- 2 CPU cores
- 4GB RAM
- 10GB disk space
Required Python packages are listed in `requirements.txt`

60
app/handlers.py Normal file
View File

@ -0,0 +1,60 @@
from datetime import datetime, timezone
from fastapi import APIRouter, Request, HTTPException, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from llm.models import JiraWebhookPayload
from shared_store import requests_queue, ProcessingRequest
from loguru import logger
jira_router = APIRouter(
prefix="/jira",
tags=["Jira"]
)
queue_router = APIRouter(
prefix="/queue",
tags=["Queue"]
)
@jira_router.post("/sendRequest", status_code=201)
async def send_jira_request(payload: JiraWebhookPayload):
"""Send requests to add to queue for further processing"""
request_id = requests_queue.add_request(payload.model_dump())
return {"request_id": request_id}
class GetResponseRequest(BaseModel):
issueKey: str
@jira_router.post("/getResponse")
async def get_jira_response(request: GetResponseRequest):
"""Get response attribute provided by ollama for a given issueKey."""
if requests_queue.are_there_any_open_requests_for_issue_key(request.issueKey):
raise HTTPException(
status_code=409,
detail=f"There are still pending or processing requests for issueKey: {request.issueKey}. Please wait for them to be processed."
)
matched_request = requests_queue.get_latest_completed_by_issue_key(request.issueKey)
if not matched_request:
raise HTTPException(status_code=404, detail=f"No completed request found for issueKey: {request.issueKey}")
return matched_request.response if matched_request.response else "No response yet"
@queue_router.get("/getAll")
async def get_all_requests_in_queue():
"""Gets all requests"""
all_requests = requests_queue.get_all_requests()
return {"requests": all_requests}
@queue_router.get("/getPending")
async def get_pending_requests_in_queue():
"""Gets all requests waiting to be processed"""
all_requests = requests_queue.get_all_requests()
pending = [req for req in all_requests if req.status == "pending"]
return {"requests": pending}
@queue_router.delete("/clearAll")
async def clear_all_requests_in_queue():
"""Clear all the requests from the queue"""
requests_queue.clear_all_requests()
return {"status": "cleared"}

303
config.py
View File

@ -1,63 +1,25 @@
import os
import logging
import sys
from typing import Optional
from pydantic_settings import BaseSettings
from pydantic import validator, ConfigDict
from loguru import logger
from watchfiles import watch, Change
from threading import Thread
from langfuse import Langfuse
from langfuse.langchain import CallbackHandler
from langfuse._client.client import Langfuse
from pydantic import field_validator
from pydantic_settings import SettingsConfigDict
import yaml
_logger = logging.getLogger(__name__)
from pathlib import Path
class LangfuseConfig(BaseSettings):
enabled: bool = True
public_key: Optional[str] = None
enabled: bool = False
secret_key: Optional[str] = None
public_key: Optional[str] = None
host: Optional[str] = None
@validator('host')
def validate_host(cls, v):
if v and not v.startswith(('http://', 'https://')):
raise ValueError("Langfuse host must start with http:// or https://")
return v
def __init__(self, **data):
try:
logger.info("Initializing LangfuseConfig with data: {}", data)
logger.info("Environment variables:")
logger.info("LANGFUSE_PUBLIC_KEY: {}", os.getenv('LANGFUSE_PUBLIC_KEY'))
logger.info("LANGFUSE_SECRET_KEY: {}", os.getenv('LANGFUSE_SECRET_KEY'))
logger.info("LANGFUSE_HOST: {}", os.getenv('LANGFUSE_HOST'))
super().__init__(**data)
logger.info("LangfuseConfig initialized successfully")
logger.info("Public Key: {}", self.public_key)
logger.info("Secret Key: {}", self.secret_key)
logger.info("Host: {}", self.host)
except Exception as e:
logger.error("Failed to initialize LangfuseConfig: {}", e)
logger.error("Current environment variables:")
logger.error("LANGFUSE_PUBLIC_KEY: {}", os.getenv('LANGFUSE_PUBLIC_KEY'))
logger.error("LANGFUSE_SECRET_KEY: {}", os.getenv('LANGFUSE_SECRET_KEY'))
logger.error("LANGFUSE_HOST: {}", os.getenv('LANGFUSE_HOST'))
raise
model_config = ConfigDict(
model_config = SettingsConfigDict(
env_prefix='LANGFUSE_',
env_file='.env',
env_file_encoding='utf-8',
extra='ignore',
env_nested_delimiter='__',
case_sensitive=True
)
class LogConfig(BaseSettings):
level: str = 'INFO'
model_config = ConfigDict(
env_prefix='LOG_',
extra='ignore'
)
@ -73,27 +35,32 @@ class LLMConfig(BaseSettings):
ollama_base_url: Optional[str] = None
ollama_model: Optional[str] = None
@validator('mode')
# Gemini settings
gemini_api_key: Optional[str] = None
gemini_model: Optional[str] = None
gemini_api_base_url: Optional[str] = None # Add this for Gemini
@field_validator('mode')
def validate_mode(cls, v):
if v not in ['openai', 'ollama']:
raise ValueError("LLM mode must be either 'openai' or 'ollama'")
if v not in ['openai', 'ollama', 'gemini']: # Add 'gemini'
raise ValueError("LLM mode must be 'openai', 'ollama', or 'gemini'")
return v
model_config = ConfigDict(
model_config = SettingsConfigDict(
env_prefix='LLM_',
env_file='.env',
env_file_encoding='utf-8',
extra='ignore'
)
class RedisConfig(BaseSettings):
enabled: bool = True
url: str = "redis://localhost:6379/0"
rate_limit_window: int = 60
rate_limit_max_requests: int = 100
model_config = ConfigDict(
env_prefix='REDIS_',
class ProcessorConfig(BaseSettings):
poll_interval_seconds: int = 10
max_retries: int = 5
initial_retry_delay_seconds: int = 60
model_config = SettingsConfigDict(
env_prefix='PROCESSOR_',
env_file='.env',
env_file_encoding='utf-8',
extra='ignore'
@ -102,155 +69,101 @@ class RedisConfig(BaseSettings):
class Settings:
def __init__(self):
try:
logger.info("Loading configuration from application.yml and environment variables")
# Load configuration from YAML file
# Load settings from YAML file as a fallback
yaml_config = self._load_yaml_config()
logger.info("Loaded YAML config: {}", yaml_config) # Add this log line
# Initialize configurations, allowing environment variables to override YAML
logger.info("Initializing LogConfig")
self.log = LogConfig(**yaml_config.get('log', {}))
logger.info("LogConfig initialized: {}", self.log.model_dump())
# Load settings from environment and .env file first
self.llm = LLMConfig()
self.processor = ProcessorConfig()
self.langfuse = LangfuseConfig()
logger.info("Initializing LLMConfig")
self.llm = LLMConfig(**yaml_config.get('llm', {}))
logger.info("LLMConfig initialized: {}", self.llm.model_dump())
# Apply YAML configuration for any values not set by the environment
self._apply_yaml_fallback(yaml_config)
logger.info("Initializing LangfuseConfig")
self.langfuse = LangfuseConfig(**yaml_config.get('langfuse', {}))
logger.info("LangfuseConfig initialized: {}", self.langfuse.model_dump())
logger.info("Initializing RedisConfig")
self.redis = RedisConfig(**yaml_config.get('redis', {}))
logger.info("RedisConfig initialized: {}", self.redis.model_dump())
logger.info("Validating configuration")
self._validate()
logger.info("Starting config watcher")
self._start_watcher()
logger.info("Initializing Langfuse")
self._init_langfuse()
logger.info("Configuration initialized successfully")
except Exception as e:
logger.error("Configuration initialization failed: {}", e)
logger.error("Current configuration state:")
logger.error("LogConfig: {}", self.log.model_dump() if hasattr(self, 'log') else 'Not initialized')
logger.error("LLMConfig: {}", self.llm.model_dump() if hasattr(self, 'llm') else 'Not initialized')
logger.error("LangfuseConfig: {}", self.langfuse.model_dump() if hasattr(self, 'langfuse') else 'Not initialized')
raise
def _load_yaml_config(self):
config_path = Path('/root/development/jira-webhook-llm/config/application.yml')
if not config_path.exists():
logger.warning("Configuration file not found at {}", config_path)
return {}
try:
with open(config_path, 'r') as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.error("Error loading configuration from {}: {}", config_path, e)
return {}
def _validate(self):
logger.info("LLM mode set to: '{}'", self.llm.mode)
if self.llm.mode == 'openai':
if not self.llm.openai_api_key:
raise ValueError("LLM mode is 'openai', but OPENAI_API_KEY is not set.")
if not self.llm.openai_api_base_url:
raise ValueError("LLM mode is 'openai', but OPENAI_API_BASE_URL is not set.")
if not self.llm.openai_model:
raise ValueError("LLM mode is 'openai', but OPENAI_MODEL is not set.")
elif self.llm.mode == 'ollama':
if not self.llm.ollama_base_url:
raise ValueError("LLM mode is 'ollama', but OLLAMA_BASE_URL is not set.")
if not self.llm.ollama_model:
raise ValueError("LLM mode is 'ollama', but OLLAMA_MODEL is not set.")
logger.info("Configuration validated successfully.")
def _init_langfuse(self):
# Initialize Langfuse client if enabled
self.langfuse_client: Optional[Langfuse] = None
if self.langfuse.enabled:
try:
# Verify all required credentials are present
if not all([self.langfuse.public_key, self.langfuse.secret_key, self.langfuse.host]):
raise ValueError("Missing required Langfuse credentials")
logger.debug("Initializing Langfuse client with:")
logger.debug("Public Key: {}", self.langfuse.public_key)
logger.debug("Secret Key: {}", self.langfuse.secret_key)
logger.debug("Host: {}", self.langfuse.host)
# Initialize Langfuse client
if self.langfuse.secret_key and self.langfuse.public_key and self.langfuse.host:
self.langfuse_client = Langfuse(
public_key=self.langfuse.public_key,
secret_key=self.langfuse.secret_key,
host=self.langfuse.host
)
else:
_logger.warning("Langfuse is enabled but missing one or more of LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, or LANGFUSE_HOST. Langfuse client will not be initialized.")
# Test Langfuse connection
try:
self.langfuse_client.auth_check()
logger.debug("Langfuse connection test successful")
except Exception as e:
logger.error("Langfuse connection test failed: {}", e)
raise
# Initialize CallbackHandler
try:
self.langfuse_handler = CallbackHandler(
public_key=self.langfuse.public_key,
secret_key=self.langfuse.secret_key,
host=self.langfuse.host
)
except TypeError:
try:
# Fallback for older versions of langfuse.langchain.CallbackHandler
self.langfuse_handler = CallbackHandler(
public_key=self.langfuse.public_key,
host=self.langfuse.host
)
logger.warning("Using fallback CallbackHandler initialization - secret_key parameter not supported")
except TypeError:
# Fallback for even older versions
self.langfuse_handler = CallbackHandler(
public_key=self.langfuse.public_key
)
logger.warning("Using minimal CallbackHandler initialization - only public_key parameter supported")
logger.info("Langfuse client and handler initialized successfully")
logger.info("Langfuse client and handler initialized successfully")
except ValueError as e:
logger.warning("Langfuse configuration error: {}. Disabling Langfuse.", e)
self.langfuse.enabled = False
except Exception as e:
logger.error("Failed to initialize Langfuse: {}", e)
self.langfuse.enabled = False
def _start_watcher(self):
def watch_config():
for changes in watch('config/application.yml'):
for change in changes:
if change[0] == Change.modified:
logger.info("Configuration file modified, reloading settings...")
try:
# Reload YAML config and re-initialize all settings
yaml_config = self._load_yaml_config()
self.log = LogConfig(**yaml_config.get('log', {}))
self.llm = LLMConfig(**yaml_config.get('llm', {}))
self.langfuse = LangfuseConfig(**yaml_config.get('langfuse', {}))
self._validate()
self._init_langfuse() # Re-initialize Langfuse client if needed
logger.info("Configuration reloaded successfully")
except Exception as e:
logger.error("Error reloading configuration: {}", e)
Thread(target=watch_config, daemon=True).start()
# Create a single, validated instance of the settings to be imported by other modules.
try:
settings = Settings()
except ValueError as e:
logger.error("FATAL: {}", e)
logger.error("Application shutting down due to configuration error.")
print(f"Configuration initialization failed: {e}")
sys.exit(1)
def _apply_yaml_fallback(self, yaml_config: dict):
"""Applies YAML config values as a fallback to settings not set by environment."""
# --- LLM Configuration ---
llm_yaml_config = yaml_config.get('llm', {})
if llm_yaml_config:
# Flatten nested YAML structure to match LLMConfig fields
flat_llm_yaml = {
'mode': llm_yaml_config.get('mode'),
**{f'openai_{k}': v for k, v in (llm_yaml_config.get('openai') or {}).items()},
**{f'ollama_{k}': v for k, v in (llm_yaml_config.get('ollama') or {}).items()},
**{f'gemini_{k}': v for k, v in (llm_yaml_config.get('gemini') or {}).items()}
}
for field_name in self.llm.model_fields:
if field_name not in self.llm.model_fields_set:
yaml_value = flat_llm_yaml.get(field_name)
if yaml_value is not None:
setattr(self.llm, field_name, yaml_value)
# --- Processor Configuration ---
processor_yaml_config = yaml_config.get('processor', {})
if processor_yaml_config:
for field_name in self.processor.model_fields:
if field_name not in self.processor.model_fields_set:
yaml_value = processor_yaml_config.get(field_name)
if yaml_value is not None:
setattr(self.processor, field_name, yaml_value)
# --- Langfuse Configuration ---
langfuse_yaml_config = yaml_config.get('langfuse', {})
if langfuse_yaml_config:
for field_name in self.langfuse.model_fields:
if field_name not in self.langfuse.model_fields_set:
yaml_value = langfuse_yaml_config.get(field_name)
if yaml_value is not None:
setattr(self.langfuse, field_name, yaml_value)
def _load_yaml_config(self):
config_path = Path('config/application.yml')
if not config_path.exists():
return {}
try:
with open(config_path, 'r') as f:
return yaml.safe_load(f) or {}
except Exception as e:
return {}
def _validate(self):
if self.llm.mode == 'openai':
if not self.llm.openai_api_key:
raise ValueError("OPENAI_API_KEY is not set.")
if not self.llm.openai_api_base_url:
raise ValueError("OPENAI_API_BASE_URL is not set.")
if not self.llm.openai_model:
raise ValueError("OPENAI_MODEL is not set.")
elif self.llm.mode == 'ollama':
if not self.llm.ollama_base_url:
raise ValueError("OLLAMA_BASE_URL is not set.")
if not self.llm.ollama_model:
raise ValueError("OLLAMA_MODEL is not set.")
elif self.llm.mode == 'gemini': # New: Add validation for Gemini mode
if not self.llm.gemini_api_key:
raise ValueError("GEMINI_API_KEY is not set.")
if not self.llm.gemini_model:
raise ValueError("GEMINI_MODEL is not set.")
# Create settings instance
settings = Settings()

View File

@ -0,0 +1,77 @@
# Default application configuration
llm:
# The mode to run the application in.
# Can be 'openai' or 'ollama'.
# This can be overridden by the LLM_MODE environment variable.
mode: ollama # Change mode to gemini
# Settings for OpenAI-compatible APIs (like OpenRouter)
openai:
# It's HIGHLY recommended to set this via an environment variable
# instead of saving it in this file.
# Can be overridden by OPENAI_API_KEY
# api_key: "sk-or-v1-..."
# api_key: "your-openai-api-key" # Keep this commented out or set to a placeholder
# Can be overridden by OPENAI_API_BASE_URL
# api_base_url: "https://openrouter.ai/api/v1"
# api_base_url: "https://api.openai.com/v1" # Remove or comment out this line
# Can be overridden by OPENAI_MODEL
# model: "deepseek/deepseek-chat:free"
# model: "gpt-4o" # Keep this commented out or set to a placeholder
# Settings for Gemini
gemini:
# It's HIGHLY recommended to set this via an environment variable
# instead of saving it in this file.
# Can be overridden by GEMINI_API_KEY
api_key: ""
# Can be overridden by GEMINI_MODEL
# model: "gemini-2.5-flash"
model: "gemini-2.5-flash-lite-preview-06-17"
# Can be overridden by GEMINI_API_BASE_URL
api_base_url: "https://generativelanguage.googleapis.com/v1beta/" # Add for Gemini
# Settings for Ollama
ollama:
# Can be overridden by OLLAMA_BASE_URL
base_url: "http://ollama-jira:11434"
# base_url: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
# Can be overridden by OLLAMA_MODEL
# model: "phi4-mini:latest"
# model: "qwen3:1.7b"
# model: "smollm:360m"
# model: "qwen3:0.6b"
model: "qwen3:4b"
# Langfuse configuration for observability and analytics
langfuse:
# Enable or disable Langfuse integration
# Can be overridden by LANGFUSE_ENABLED environment variable
enabled: false
# Langfuse API credentials
# It's HIGHLY recommended to set these via environment variables
# instead of saving them in this file
public_key: "pk-lf-"
secret_key: "sk-lf-"
# host: "https://cloud.langfuse.com"
# host: "http://192.168.0.122:3000"
# Processor configuration
processor:
# Interval in seconds between polling for new Jira analysis requests
# Can be overridden by PROCESSOR_POLL_INTERVAL_SECONDS environment variable
poll_interval_seconds: 10
# Maximum number of retries for failed Jira analysis requests
# Can be overridden by PROCESSOR_MAX_RETRIES environment variable
max_retries: 0
# Initial delay in seconds before the first retry attempt (exponential backoff)
# Can be overridden by PROCESSOR_INITIAL_RETRY_DELAY_SECONDS environment variable
initial_retry_delay_seconds: 60

View File

@ -0,0 +1,77 @@
# Default application configuration
llm:
# The mode to run the application in.
# Can be 'openai' or 'ollama'.
# This can be overridden by the LLM_MODE environment variable.
mode: ollama # Change mode to gemini
# Settings for OpenAI-compatible APIs (like OpenRouter)
openai:
# It's HIGHLY recommended to set this via an environment variable
# instead of saving it in this file.
# Can be overridden by OPENAI_API_KEY
# api_key: "sk-or-v1-..."
# api_key: "your-openai-api-key" # Keep this commented out or set to a placeholder
# Can be overridden by OPENAI_API_BASE_URL
# api_base_url: "https://openrouter.ai/api/v1"
# api_base_url: "https://api.openai.com/v1" # Remove or comment out this line
# Can be overridden by OPENAI_MODEL
# model: "deepseek/deepseek-chat:free"
# model: "gpt-4o" # Keep this commented out or set to a placeholder
# Settings for Gemini
gemini:
# It's HIGHLY recommended to set this via an environment variable
# instead of saving it in this file.
# Can be overridden by GEMINI_API_KEY
api_key: ""
# Can be overridden by GEMINI_MODEL
# model: "gemini-2.5-flash"
model: "gemini-2.5-flash-lite-preview-06-17"
# Can be overridden by GEMINI_API_BASE_URL
api_base_url: "https://generativelanguage.googleapis.com/v1beta/" # Add for Gemini
# Settings for Ollama
ollama:
# Can be overridden by OLLAMA_BASE_URL
base_url: "http://192.168.0.122:11434"
# base_url: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
# Can be overridden by OLLAMA_MODEL
# model: "phi4-mini:latest"
# model: "qwen3:1.7b"
# model: "smollm:360m"
# model: "qwen3:0.6b"
model: "qwen3:4b"
# Langfuse configuration for observability and analytics
langfuse:
# Enable or disable Langfuse integration
# Can be overridden by LANGFUSE_ENABLED environment variable
enabled: true
# Langfuse API credentials
# It's HIGHLY recommended to set these via environment variables
# instead of saving them in this file
public_key: "pk-lf-38238bc3-ffa1-48d3-8c8f-2048ecf8cc54"
secret_key: "sk-lf-36f6bd37-b0e2-4656-9c63-fe98915e8872"
# host: "https://cloud.langfuse.com"
host: "http://192.168.0.122:3000"
# Processor configuration
processor:
# Interval in seconds between polling for new Jira analysis requests
# Can be overridden by PROCESSOR_POLL_INTERVAL_SECONDS environment variable
poll_interval_seconds: 10
# Maximum number of retries for failed Jira analysis requests
# Can be overridden by PROCESSOR_MAX_RETRIES environment variable
max_retries: 0
# Initial delay in seconds before the first retry attempt (exponential backoff)
# Can be overridden by PROCESSOR_INITIAL_RETRY_DELAY_SECONDS environment variable
initial_retry_delay_seconds: 60

View File

@ -3,59 +3,75 @@ llm:
# The mode to run the application in.
# Can be 'openai' or 'ollama'.
# This can be overridden by the LLM_MODE environment variable.
mode: ollama
mode: ollama # Change mode to gemini
# Settings for OpenAI-compatible APIs (like OpenRouter)
openai:
# It's HIGHLY recommended to set this via an environment variable
# instead of saving it in this file.
# Can be overridden by OPENAI_API_KEY
api_key: "sk-or-v1-..."
# api_key: "sk-or-v1-..."
# api_key: "your-openai-api-key" # Keep this commented out or set to a placeholder
# Can be overridden by OPENAI_API_BASE_URL
api_base_url: "https://openrouter.ai/api/v1"
# api_base_url: "https://openrouter.ai/api/v1"
# api_base_url: "https://api.openai.com/v1" # Remove or comment out this line
# Can be overridden by OPENAI_MODEL
model: "deepseek/deepseek-chat:free"
# model: "deepseek/deepseek-chat:free"
# model: "gpt-4o" # Keep this commented out or set to a placeholder
# Settings for Gemini
gemini:
# It's HIGHLY recommended to set this via an environment variable
# instead of saving it in this file.
# Can be overridden by GEMINI_API_KEY
api_key: ""
# Can be overridden by GEMINI_MODEL
# model: "gemini-2.5-flash"
model: "gemini-2.5-flash-lite-preview-06-17"
# Can be overridden by GEMINI_API_BASE_URL
api_base_url: "https://generativelanguage.googleapis.com/v1beta/" # Add for Gemini
# Settings for Ollama
ollama:
# Can be overridden by OLLAMA_BASE_URL
base_url: "http://192.168.0.140:11434"
base_url: "http://ollama-jira:11434"
# base_url: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
# Can be overridden by OLLAMA_MODEL
model: "phi4-mini:latest"
# model: "phi4-mini:latest"
# model: "qwen3:1.7b"
# model: "smollm:360m"
# model: "qwen3:0.6b"
model: "qwen3:4b"
# Langfuse configuration for observability and analytics
langfuse:
# Enable or disable Langfuse integration
# Can be overridden by LANGFUSE_ENABLED environment variable
enabled: true
enabled: false
# Langfuse API credentials
# It's HIGHLY recommended to set these via environment variables
# instead of saving them in this file
public_key: "pk-lf-17dfde63-93e2-4983-8aa7-2673d3ecaab8"
secret_key: "sk-lf-ba41a266-6fe5-4c90-a483-bec8a7aaa321"
host: "https://cloud.langfuse.com"
public_key: "pk-lf-"
secret_key: "sk-lf-"
# host: "https://cloud.langfuse.com"
# host: "http://192.168.0.122:3000"
# Redis configuration for rate limiting
redis:
# Enable or disable rate limiting
# Can be overridden by REDIS_ENABLED environment variable
enabled: true
# Processor configuration
processor:
# Interval in seconds between polling for new Jira analysis requests
# Can be overridden by PROCESSOR_POLL_INTERVAL_SECONDS environment variable
poll_interval_seconds: 10
# Redis connection settings
# Can be overridden by REDIS_URL environment variable
url: "redis://localhost:6379/0"
# Maximum number of retries for failed Jira analysis requests
# Can be overridden by PROCESSOR_MAX_RETRIES environment variable
max_retries: 0
# Rate limiting settings
rate_limit:
# Time window in seconds
window: 60
# Maximum requests per window
max_requests: 100
# Initial delay in seconds before the first retry attempt (exponential backoff)
# Can be overridden by PROCESSOR_INITIAL_RETRY_DELAY_SECONDS environment variable
initial_retry_delay_seconds: 60

View File

@ -1,104 +0,0 @@
from fastapi import APIRouter
from fastapi.responses import HTMLResponse
from langfuse import Langfuse
from config import settings
import datetime
import json
router = APIRouter()
@router.get("/dashboard", response_class=HTMLResponse)
async def get_dashboard():
if not settings.langfuse.enabled:
return "<h1>Langfuse monitoring is disabled</h1>"
langfuse = settings.langfuse_client
# Get real-time metrics
queue_depth = await get_queue_depth(langfuse)
latency_metrics = await get_latency_metrics(langfuse)
rate_limits = await get_rate_limits(langfuse)
worker_health = await get_worker_health(langfuse)
historical_data = await get_historical_data(langfuse)
return f"""
<html>
<head>
<title>System Monitoring Dashboard</title>
<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
</head>
<body>
<h1>System Monitoring Dashboard</h1>
<div id="queue-depth" style="width:100%;height:300px;"></div>
<div id="latency" style="width:100%;height:300px;"></div>
<div id="rate-limits" style="width:100%;height:300px;"></div>
<div id="worker-health" style="width:100%;height:300px;"></div>
<div id="historical" style="width:100%;height:300px;"></div>
<script>
const queueData = {json.dumps(queue_depth)};
const latencyData = {json.dumps(latency_metrics)};
const rateLimitData = {json.dumps(rate_limits)};
const workerHealthData = {json.dumps(worker_health)};
const historicalData = {json.dumps(historical_data)};
Plotly.newPlot('queue-depth', queueData);
Plotly.newPlot('latency', latencyData);
Plotly.newPlot('rate-limits', rateLimitData);
Plotly.newPlot('worker-health', workerHealthData);
Plotly.newPlot('historical', historicalData);
</script>
</body>
</html>
"""
async def get_queue_depth(langfuse):
# Get current queue depth from Langfuse
return {
'data': [{
'values': [10, 15, 13, 17],
'labels': ['Pending', 'Processing', 'Completed', 'Failed'],
'type': 'pie'
}]
}
async def get_latency_metrics(langfuse):
# Get latency metrics from Langfuse
return {
'data': [{
'x': [datetime.datetime.now() - datetime.timedelta(minutes=i) for i in range(60)],
'y': [i * 0.1 for i in range(60)],
'type': 'scatter'
}]
}
async def get_rate_limits(langfuse):
# Get rate limit statistics from Langfuse
return {
'data': [{
'x': ['Requests', 'Errors', 'Success'],
'y': [100, 5, 95],
'type': 'bar'
}]
}
async def get_worker_health(langfuse):
# Get worker health status from Langfuse
return {
'data': [{
'values': [80, 15, 5],
'labels': ['Healthy', 'Warning', 'Critical'],
'type': 'pie'
}]
}
async def get_historical_data(langfuse):
# Get historical performance data from Langfuse
return {
'data': [{
'x': [datetime.datetime.now() - datetime.timedelta(hours=i) for i in range(24)],
'y': [i * 0.5 for i in range(24)],
'type': 'scatter'
}]
}

89
data/DCR_mappings_ALL.md Normal file
View File

@ -0,0 +1,89 @@
The `/dcr` API serves as a canonical interface for initiating data change requests. The JSON payload sent to this endpoint contains a standardized representation of the desired changes. The MDM HUB is then responsible for validating this request and mapping its attributes to the distinct models of the target systems.
***
### **`/dcr` API to Target System Attribute Mapping**
The following sections break down the mapping from the attributes in the `/dcr` request payload to their corresponding fields in OneKey and Veeva.
---
#### **1. Request-Level Attributes**
These attributes are defined at the top level of each object within the `DCRRequests` array in the JSON payload.
| HUB /dcr Attribute | OneKey Mapping | Veeva OpenData (VOD) Mapping | Notes / Description |
| :--- | :--- | :--- | :--- |
| **`extDCRRequestId`** | Used to populate the `DCRID` in the Reltio DCR tracking entity. OneKey's `validation.clientRequestId` is typically a HUB-generated internal ID, but `extDCRRequestId` is the primary key for client-side tracking. | **`dcr_key`** (in all CSV files: `change_request.csv`, `change_request_hco.csv`, etc.) | **This is the primary external identifier for the entire DCR.** It is crucial for clients like PforceRx to track the request's status and is used as the main correlation ID across all systems and files. |
| **`extDCRComment`** | **`validation.requestComment`** | **`description`** (in `change_request.csv`) | A free-text field for the requester to provide context or justification for the DCR. For OneKey, it has a special function: due to API limitations, requests to **remove** an attribute are specified in this comment field (e.g., "Please remove attributes: [Address: ...]"). |
| **`country`** | **`isoCod2`** | **`primary_country__v`** (in `change_request_hcp.csv` and `change_request_hco.csv`) and **`country__v`** (in `change_request_address.csv`) | The mandatory two-letter ISO country code. This is a critical routing attribute that determines which validator instance to use and, for Veeva, which S3/SFTP directory the DCR files are placed in. |
| **`action`** (within HCP/HCO object) | This determines the logic. `update` and `insert` map to a `submitVR` call. An `update` requires a valid `individual.individualEid` or `workplace.workplaceEid`. `delete` is handled by updating the entity's end date in Reltio. | **`change_request_type`** (in `change_request.csv`). Mapped to **`ADD_REQUEST`** for an `insert` action, and **`CHANGE_REQUEST`** for an `update` action. | This defines the fundamental operation being performed on the entity. |
| **`refId`** (within HCP/HCO object) | Used to query Reltio to find the OneKey crosswalk (`individual.individualEid` or `workplace.workplaceEid`) which is mandatory for update operations. | Used to query Reltio to find the Veeva crosswalk (`vid__v`), which is mandatory for update operations. The Reltio URI from the `refId` is also used to populate the **`entity_key`** field in the VOD CSVs. | This object contains the necessary identifiers (`CrosswalkTargetObjectId`, `EntityURITargetObjectId`, etc.) to locate the target entity in Reltio for an update or delete operation. |
---
#### **2. Healthcare Professional (HCP) Attributes**
These attributes are provided within the `HCP` object of a DCR request.
| HUB /dcr Attribute | OneKey Mapping | Veeva OpenData (VOD) Mapping | Notes / Description |
| :--- | :--- | :--- | :--- |
| **`firstName`** | `individual.firstName` | `first_name__v` | HCP's first name. |
| **`lastName`** | `individual.lastName` | `last_name__v` | HCP's last name. Mandatory for creating a new HCP in OneKey. |
| **`middleName`** | `individual.middleName` | `middle_name__v` | HCP's middle name. |
| **`prefix`** | `individual.prefixNameCode` | `prefix__v` | Name prefix (e.g., Mr., Dr.). Requires a lookup from the canonical code (`HCPPrefix`) to the target system's specific code. |
| **`title`** | `individual.titleCode` | `professional_title__v` | Professional title. Requires a lookup from the canonical code (`HCPTitle` or `HCPProfessionalTitle`) to the target system's specific code. |
| **`gender`** | `individual.genderCode` | `gender__v` | HCP's gender. Requires a lookup to map the canonical value (e.g., M, F) to the target system's code. |
| **`subTypeCode`** | `individual.typeCode` | `hcp_type__v` | The professional subtype of the HCP (e.g., Physician, Nurse). Requires a lookup from the canonical code (`HCPSubTypeCode` or `HCPType`). |
| **`specialties`** (List) | `individual.speciality1`, `individual.speciality2`, `individual.speciality3` | `specialty_1__v` through `specialty_10__v` | The list of specialties is **flattened**. OneKey accepts up to 3 ranked specialties. Veeva accepts up to 10. A lookup is required to map the canonical `HCPSpecialty` code to the target system's value. |
| **`emails`** (List) | `individual.email` | `email_1__v`, `email_2__v` | The list of emails is flattened. OneKey typically takes the highest-ranked email. Veeva takes the top two. |
| **`phones`** (List) | `individual.mobilePhone` | `phone_1__v` to `phone_3__v` (for office), `fax_1__v` to `fax_2__v` (for fax) | The list is filtered by type and ranked. OneKey maps the highest-ranked to `mobilePhone`. Veeva separates numbers into distinct `phone` and `fax` columns based on the Reltio phone type. |
---
#### **3. Healthcare Organization (HCO) Attributes**
These attributes are provided within the `HCO` object of a DCR request.
| HUB /dcr Attribute | OneKey Mapping | Veeva OpenData (VOD) Mapping | Notes / Description |
| :--- | :--- | :--- | :--- |
| **`name`** | `workplace.usualName` / `workplace.officialName` | `corporate_name__v` | The primary, official name of the organization. |
| **`otherNames`** (List) | `workplace.usualName2` | `alternate_name_1__v` | The list of alternative names is flattened. Both systems typically take the first or highest-ranked alternative name. |
| **`subTypeCode`** | `workplace.typeCode` | `major_class_of_trade__v` | The HCO's subtype, often representing facility type. Requires a lookup from the canonical code (`COTFacilityType`). |
| **`typeCode`** | Not Mapped | `hco_type__v` | Maps to the HCO Type. Requires a lookup (`HCOType`). The OneKey mapping document indicates this is not used for their system. |
| **`websiteURL`** | `workplace.website` | `URL_1__v` | The official website of the organization. |
| **`specialties`** (List) | `workplace.speciality1` to `speciality3` | `specialty_1__v` to `specialty_10__v` | Similar to HCPs, the list of specialties is flattened and ranked. A lookup from the canonical `COTSpecialty` code is required. |
| **`emails`** (List) | `workplace.email` | `email_1__v`, `email_2__v` | List of emails is flattened, with the highest-ranked ones being used. |
| **`phones`** (List) | `workplace.telephone`, `workplace.fax` | `phone_1__v` to `phone_3__v`, `fax_1__v` to `fax_2__v` | Similar to HCPs, the list is filtered by type (`TEL.OFFICE` vs. `TEL.FAX`) and ranked before mapping. |
---
#### **4. Nested Object Mapping: Addresses**
Address information is provided as a list of objects within the `HCP` or `HCO` DCR payload.
| HUB /dcr `addresses` Attribute | OneKey Mapping | Veeva OpenData (VOD) Mapping | Notes / Description |
| :--- | :--- | :--- | :--- |
| **(Address Object)** | Mapped to a single `address` complex object in the JSON request. Only the primary address is sent. | Each address object is mapped to a **separate row** in the **`change_request_address.csv`** file. | OneKey's API takes a single address per DCR, while Veeva's file-based approach can handle multiple address changes in one DCR. |
| **`refId`** | Used to match and update an existing address. | **`address_key`** | This is the `PfizerAddressID`, a unique identifier for the address record in Reltio. |
| **`addressLine1`** | `address.longLabel` or `address.addressLine1` | `address_line_1__v` | The first line of the street address. |
| **`addressLine2`** | `address.longLabel2` or `address.addressLine2` | `address_line_2__v` | The second line of the street address. |
| **`city`** | `address.city` | `locality__v` | The city name. |
| **`stateProvince`** | `address.countyCode` | `administrative_area__v` | The state or province. Requires a lookup from the canonical value to the target system's code. |
| **`zip`** | `address.longPostalCode` or `address.Zip5` | `postal_code__v` | The postal or ZIP code. |
| **`addressType`** | Not explicitly mapped in `submitVR` request, but used in logic. | `address_type__v` | The type of address (e.g., Office, Home). Requires a lookup (`AddressType`). |
---
#### **5. Nested Object Mapping: Affiliations**
Affiliations are provided as a list of objects (`contactAffiliations` for HCP, `otherHCOAffiliations` for HCO) in the DCR payload.
| HUB /dcr `affiliations` Attribute | OneKey Mapping | Veeva OpenData (VOD) Mapping | Notes / Description |
| :--- | :--- | :--- | :--- |
| **(Affiliation Object)** | The `refId` of the target HCO is used to find its `workplace.workplaceEid`, which is then sent as part of the HCP or parent HCO record. | Each affiliation object is mapped to a **separate row** in the **`change_request_parenthco.csv`** file. | The mapping for affiliations is fundamentally different. OneKey handles it as an attribute of the child entity, while Veeva treats it as a distinct relationship record. |
| **Relation `refId`** | The `workplace.workplaceEid` of the target (parent) HCO is populated in the request. | **`parenthco_key`** (populated with the Reltio `relationUri`) | Veeva requires a unique key for the relationship itself. |
| **Start/Child Object** | The main body of the request contains the child's details (e.g., the HCP). | **`child_entity_key`** | This field is populated with the `entity_key` of the child entity (e.g., the HCP). |
| **End/Parent Object** | The `workplace.workplaceEid` of the parent HCO is sent. | **`parent_entity_key`** | This field is populated with the `entity_key` of the parent entity (e.g., the HCO). |
| **`type`** | `activity.role` (requires lookup, e.g., `TIH.W.*`) | **`relationship_type__v`** (requires lookup from `RelationType`) | The type or role of the affiliation (e.g., Employed, Manages). |
| **`primary`** | Not explicitly mapped. | **`is_primary_relationship__v`** | A boolean flag (`Y`/`N`) indicating if this is the primary affiliation. |

View File

@ -0,0 +1,191 @@
### **1. Overall DCR Process Overview**
* **Purpose of DCR Process:**
The Data Change Request (DCR) process is designed to improve and validate the quality of existing data in source systems. It provides a formal mechanism for proposing, validating, and applying data changes.
* **General DCR Process Flow:**
1. A source system creates a proposal for a data change, known as a DCR or Validation Request (VR).
2. The MDM HUB routes this request to the appropriate validation channel.
3. Validation is performed either by internal Data Stewards (DS) within Reltio or by external, third-party validator services like OneKey or Veeva OpenData.
4. A response is sent back, which includes metadata about the DCR's status (e.g., accepted, rejected) and the actual data profile update (payload) resulting from the processed DCR.
* **High-Level Solution Architecture:**
The architecture involves source systems initiating DCRs through the **MDM HUB**. The HUB acts as a central router, directing requests to either **Reltio** for internal data steward review or to **Third-Party Validators** (OneKey, Veeva). The HUB is also responsible for receiving responses and facilitating the update of data in Reltio, often through ETL processes that handle payload delivery (e.g., via S3).
***
### **2. System-Specific DCR Implementations**
Here are the details for each system involved in the DCR process.
---
#### **OneKey (OK)**
* **Role in DCR Process:** Functions as a third-party validator for DCRs. It receives validation requests, processes them, and returns the results.
* **Key Components:** DCR Service 2, OK DCR Service, OneKey Adapter, Publisher, Hub Store (Mongo DB), Manager (Reltio Adapter).
* **Actors Involved:** PforceRX, Data Stewards (in Reltio), Reltio, HUB, OneKey.
* **Core Process Details:**
* **PforceRX-initiated:** DCRs are created via the HUB's API. The HUB integrates with OneKey's API to submit requests (`/vr/submit`) and periodically checks for status updates (`/vr/trace`).
* **Reltio-initiated:** Data Stewards can suggest changes in Reltio and use the "Send to Third Party Validation" feature, which triggers a flow to submit a validation request to OneKey. Singleton entities created in Reltio can also trigger an automatic validation request to OneKey.
* **Integration Methods:**
* **API:** Real-time integration with OneKey via REST APIs (`/vr/submit`, `/vr/trace`).
* **File Transfer:** Data profile updates (payload) are delivered back to Reltio via CSV files on an S3 bucket, which are then processed by an ETL job.
* **DCR Types Handled:** Create, update, delete operations for HCP/HCO profiles; validation of newly created singleton entities; changes suggested by Data Stewards.
---
#### **Veeva OpenData (VOD)**
* **Role in DCR Process:** Functions as a third-party validator, primarily handling DCRs initiated by Data Stewards from within Reltio.
* **Key Components:** DCR Service 2, Veeva DCR Service, Veeva Adapter, GMTF (Global Master Template & Foundation) jobs.
* **Actors Involved:** Data Stewards (in Reltio), HUB, Veeva OpenData.
* **Core Process Details:**
1. Data Stewards in Reltio create DCRs using the "Suggest / Send to 3rd Party Validation" functionality.
2. The HUB stores these requests in a Mongo collection (`DCRRegistryVeeva`).
3. A scheduled job gathers these DCRs, packages them into ZIP files containing multiple CSVs, and places them on an S3 bucket.
4. Files are synchronized from S3 to Veeva's SFTP server in batches (typically every 24 hours).
5. Veeva processes the files and returns response files to an inbound S3 directory, which the HUB traces to update DCR statuses.
* **Integration Methods:**
* **File Transfer:** Asynchronous, batch-based communication via ZIP/CSV files exchanged through S3 and SFTP.
* **DCR Types Handled:** Primarily handles changes suggested by Data Stewards for existing profiles that need external validation from Veeva.
---
#### **IQVIA Highlander (HL) - *Decommissioned April 2025***
* **Role in DCR Process:** Acted as a wrapper to translate DCRs from a Veeva format into a format that could be loaded into Reltio for Data Steward review.
* **Key Components:** DCR Service (first version), IQVIA DCR Wrapper.
* **Actors Involved:** Veeva (on behalf of PforceRX), Reltio, HUB, IQVIA wrapper, Data Stewards.
* **Core Process Details:**
1. Veeva uploaded DCR requests as CSV files to an FTP location.
2. The HUB translated the Veeva CSV format into the IQVIA wrapper's CSV format.
3. The IQVIA wrapper processed this file and created DCRs directly in Reltio.
4. Data Stewards would then review, approve, or reject these DCRs within Reltio.
* **Integration Methods:**
* **File Transfer:** Communication was entirely file-based via S3 and SFTP.
* **DCR Types Handled:** Aggregated 21 specific use cases into six generic types: `NEW_HCP_GENERIC`, `UPDATE_HCP_GENERIC`, `DELETE_HCP_GENERIC`, `NEW_HCO_GENERIC`, `UPDATE_HCO_GENERIC`, `DELETE_HCO_GENERIC`.
***
### **3. Key DCR Operations and Workflows**
---
#### **Create DCR**
* **Description:** This is the main entry point for clients like PforceRx to create DCRs. The process validates the request, routes it to the correct target system (Reltio, OneKey, or Veeva), and creates the DCR.
* **Triggers:** An API call to `POST /dcr`.
* **Detailed Steps:**
1. The DCR service receives and validates the request (e.g., checks for duplicate IDs, existence of referenced objects for updates).
2. It uses a decision table to determine the target system based on attributes like country, source, and operation type.
3. It calls the appropriate internal method to create the DCR in the target system (Reltio, OneKey, or Veeva).
4. A corresponding DCR tracking entity is created in Reltio, and the state is saved in the Mongo DCR Registry.
5. For Reltio-targeted DCRs, a workflow is initiated for Data Steward review.
6. Pre-close logic may be applied to auto-accept or auto-reject the DCR based on the country.
* **Decision Logic/Rules:** A configurable decision table routes DCRs based on `userName`, `sourceName`, `country`, `operationType`, `affectedAttributes`, and `affectedObjects`.
---
#### **Submit Validation Request**
* **Description:** This process submits validation requests for newly created "singleton" entities in Reltio to the OneKey service.
* **Triggers:** Reltio events (e.g., `HCP_CREATED`, `HCO_CREATED`) are aggregated in a time window (e.g., 4 hours).
* **Detailed Steps:**
1. After an event aggregation window closes, the process performs several checks (e.g., entity is active, no existing OneKey crosswalk, no potential matches found via Reltio's `getMatches` API).
2. If all checks pass, the entity data is mapped to a OneKey `submitVR` request.
3. The request is sent to OneKey via `POST /vr/submit`.
4. A DCR entity is created in Reltio to track the status, and the request is logged in Mongo.
---
#### **Trace Validation Request**
* **Description:** This scheduled process checks the status of pending validation requests that have been sent to an external validator like OneKey or Veeva.
* **Triggers:** A timed scheduler (cron job) that runs every N hours.
* **Detailed Steps (OneKey Example):**
1. The process queries the Mongo DCR cache for requests with a `SENT` status.
2. For each request, it calls the OneKey `POST /vr/trace` API.
3. It evaluates the `processStatus` and `responseStatus` from the OneKey response.
4. If the request is resolved (`VAS_FOUND`, `VAS_NOT_FOUND`, etc.), the DCR status in Reltio and Mongo is updated to `ACCEPTED` or `REJECTED`.
5. If the response indicates a match was found but an OK crosswalk doesn't yet exist in Reltio, a new workflow is triggered for Data Steward manual review (`DS_ACTION_REQUIRED`).
---
#### **Data Steward Response**
* **Description:** This process handles the final outcome of a DCR that was reviewed internally by a Data Steward in Reltio.
* **Triggers:** Reltio change request events (`CHANGE_REQUEST_CHANGED`, `CHANGE_REQUEST_REMOVED`) that **do not** have the `ThirdPartyValidation` flag.
* **Detailed Steps:**
1. The process consumes the event from Reltio.
2. It checks the `state` of the change request.
3. If the state is `APPLIED` or `REJECTED`, the corresponding DCR entity in Reltio and the record in Mongo are updated to a final status of `ACCEPTED` or `REJECTED`.
---
#### **Data Steward OK Validation Request**
* **Description:** This process handles DCRs created by a Data Steward in Reltio using the "Suggest" and "Send to Third Party Validation" features, routing them to an external validator like OneKey.
* **Triggers:** Reltio change request events that **do** have the `ThirdPartyValidation` flag set to `true`.
* **Detailed Steps:**
1. The HUB retrieves the "preview" state of the entity from Reltio to see the suggested changes.
2. It compares the current entity with the preview to calculate the delta.
3. It maps these changes to a OneKey `submitVR` request. Attribute removals are sent as a comment due to API limitations.
4. The request is sent to OneKey.
5. Upon successful submission, the original change request in Reltio is programmatically rejected (since the validation is now happening externally), and a new DCR entity is created for tracking the OneKey validation.
***
### **4. Data Comparison and Mapping Details**
* **OneKey Comparator:**
When a Data Steward suggests changes, the HUB compares the current Reltio entity with the "preview" state to send to OneKey.
* **Simple Attributes** (e.g., `FirstName`): Values are compared for equality. The suggested value is taken if different.
* **Complex Attributes** (e.g., `Addresses`, `Specialties`): Nested attributes are matched using their Reltio URI. New nested objects are added, and changes to existing ones are applied.
* **Mandatory Fields:** For HCP, `LastName` and `Country` are mandatory. For HCO, `Country` and `Addresses` are mandatory.
* **Attribute Removal:** Due to API limitations, removing an attribute is not done directly but by generating a comment in the request, e.g., "Please remove attributes: [Address: ...]".
* **Veeva Mapping:**
The process of mapping Reltio canonical codes to Veeva's source-specific codes is multi-layered.
1. **Veeva Defaults:** The system first checks custom CSV mapping files stored in configuration (`mdm-veeva-dcr-service/defaults`). These files define direct mappings for a specific country and canonical code (e.g., `IN;SP.PD;PD`).
2. **RDM Lookups:** If no default is found, it queries RDM (via a Mongo `LookupValues` collection) for the canonical code and looks for a `sourceMapping` where the source is "VOD".
3. **Veeva Fallback:** If no mapping is found, it consults fallback CSV files (`mdm-veeva-dcr-service/fallback`) for certain attributes (e.g., `hco-specialty.csv`). A regular expression is often used to extract the correct code. If all else fails, a question mark (`?`) is used as the default fallback.
***
### **5. Status Management and Error Handling**
* **DCR Statuses:**
The system uses a combination of statuses to track the DCR lifecycle.
| RequestStatus | DCRStatus | Internal Cache Status | Description |
| :--- | :--- | :--- | :--- |
| REQUEST\_ACCEPTED | CREATED | SENT\_TO\_OK | DCR sent to OneKey, pending DS review. |
| REQUEST\_ACCEPTED | CREATED | SENT\_TO\_VEEVA | DCR sent to Veeva, pending DS review. |
| REQUEST\_ACCEPTED | CREATED | DS\_ACTION\_REQUIRED | DCR pending internal DS validation in Reltio. |
| REQUEST\_ACCEPTED | ACCEPTED | ACCEPTED | DS accepted the DCR; changes were applied. |
| REQUEST\_ACCEPTED | ACCEPTED | PRE\_ACCEPTED | Pre-close logic automatically accepted the DCR. |
| REQUEST\_REJECTED | REJECTED | REJECTED | DS rejected the DCR. |
| REQUEST\_REJECTED | REJECTED | PRE\_REJECTED | Pre-close logic automatically rejected the DCR. |
| REQUEST\_FAILED | - | FAILED | DCR failed due to a validation or system error. |
* **Error Codes:**
| Error Code | Description | HTTP Code |
| :--- | :--- | :--- |
| `DUPLICATE_REQUEST` | The `extDCRRequestId` has already been registered. | 403 |
| `NO_CHANGES_DETECTED` | The request contained no changes compared to the existing entity. | 400 |
| `VALIDATION_ERROR` | A referenced object (HCP/HCO) or attribute could not be found. | 404 / 400 |
* **DCR State Changes:**
A DCR begins in an `OPEN` state. It is then sent to a target system, moving to states like `SENT_TO_OK`, `SENT_TO_VEEVA`, or `DS_ACTION_REQUIRED`. Pre-close logic can immediately move it to `PRE_ACCEPTED` or `PRE_REJECTED`. Following data steward review (either internal or external), the DCR reaches a terminal state of `ACCEPTED` or `REJECTED`. If an error occurs, it moves to `FAILED`.
***
### **6. Technical Artifacts and Infrastructure**
* **Mongo Collections:**
* **`DCRRegistryONEKEY` / `DCRRegistryVeeva`:** System-specific collections to store and track the state of DCRs sent to OneKey and Veeva, respectively. They hold the mapped request data and trace the response.
* **`DCRRequest` / `DCRRegistry`:** General-purpose collections for tracking DCRs, their metadata, and overall status within the HUB.
* **`DCRRegistryVeeva`:** Specifically stores Veeva-bound DCRs, including the raw CSV line content, before they are submitted in a batch.
* **File Specifications:**
* **Veeva Integration:** Uses `ZIP` files containing multiple `CSV` files (`change_request.csv`, `change_request_hcp.csv`, `change_request_address.csv`, etc.). Response files follow a similar pattern and are named with the convention `<country>_DCR_Response_<Date>.zip`.
* **Highlander Integration:** Used `CSV` files for requests.
* **Event Models:**
The system uses internal Kafka events to communicate DCR status changes between components.
* **`OneKeyDCREvent`:** Published by the trace process after receiving a response from OneKey. It contains the DCR ID and the `OneKeyChangeRequest` details (`vrStatus`, `vrStatusDetail`, comments, validated IDs).
* **`VeevaDCREvent`:** Published by the Veeva trace process. It contains the DCR ID and `VeevaChangeRequestDetails` (`vrStatus`, `vrStatusDetail`, comments, new Veeva IDs).

View File

@ -1,35 +1,35 @@
version: '3.8'
name: jira-webhook-stack
services:
redis:
image: redis:alpine
ports:
- "6379:6379"
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 1s
timeout: 3s
retries: 30
ollama-jira:
image: artifactory.pfizer.com/mdmhub-docker-dev/mdmtools/ollama/ollama-preloaded:0.0.1
image: artifactory.pfizer.com/mdmhub-docker-dev/mdmtools/ollama/ollama-preloaded:0.0.2
ports:
- "11434:11434"
restart: unless-stopped
# Service for your FastAPI application
jira-webhook-llm:
image: artifactory.pfizer.com/mdmhub-docker-dev/mdmtools/ollama/jira-webhook-llm:0.1.8
image: artifactory.pfizer.com/mdmhub-docker-dev/mdmtools/ollama/jira-webhook-llm:0.2.5
ports:
- "8000:8000"
environment:
# Set the LLM mode to 'ollama' or 'openai'
LLM_MODE: ollama
OLLAMA_BASE_URL: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
OLLAMA_MODEL: phi4-mini:latest
REDIS_URL: "redis://redis:6379/0"
# Point to the Ollama service within the Docker Compose network
# 'ollama' is the service name, which acts as a hostname within the network
# OLLAMA_BASE_URL: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
OLLAMA_BASE_URL: "http://ollama-jira:11434"
# Specify the model to use
# OLLAMA_MODEL: phi4-mini:latest
OLLAMA_MODEL: qwen3:4b
# Ensure the Ollama service starts and is healthy before starting the app
depends_on:
redis:
condition: service_healthy
ollama-jira:
condition: service_started
- ollama-jira
restart: unless-stopped
command: uvicorn jira-webhook-llm:app --host 0.0.0.0 --port 8000
# Command to run your FastAPI application using Uvicorn
# --host 0.0.0.0 is crucial for the app to be accessible from outside the container
# --reload is good for development; remove for production
command: uvicorn main:app --host 0.0.0.0 --port 8000

View File

@ -1,221 +0,0 @@
import os
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from fastapi.responses import JSONResponse
from loguru import logger
import uuid
import sys
from typing import Optional
from datetime import datetime
import asyncio
from functools import wraps
import redis.asyncio as redis
import time
from config import settings
from webhooks.handlers import JiraWebhookHandler
from llm.models import JiraWebhookPayload
from logging_config import configure_logging
# Initialize logging first
configure_logging(log_level="DEBUG")
import signal
# Initialize Redis client
redis_client = None
try:
if settings.redis.enabled:
redis_client = redis.from_url(settings.redis.url)
logger.info("Redis client initialized")
else:
logger.info("Redis is disabled in configuration")
app = FastAPI()
logger.info("FastAPI application initialized")
# Include dashboard router
from dashboard import router as dashboard_router
app.include_router(dashboard_router, prefix="/monitoring")
@app.on_event("shutdown")
async def shutdown_event():
"""Handle application shutdown"""
logger.info("Shutting down application...")
try:
# Cleanup Langfuse client if exists
if hasattr(settings, 'langfuse_handler') and hasattr(settings.langfuse_handler, 'close'):
try:
await settings.langfuse_handler.close()
except Exception as e:
logger.warning(f"Error closing handler: {str(e)}")
logger.info("Cleanup completed successfully")
except Exception as e:
logger.error(f"Error during shutdown: {str(e)}")
raise
def handle_shutdown_signal(signum, frame):
"""Handle OS signals for graceful shutdown"""
logger.info(f"Received signal {signum}, initiating shutdown...")
# Exit immediately after cleanup is complete
os._exit(0)
# Register signal handlers
signal.signal(signal.SIGTERM, handle_shutdown_signal)
signal.signal(signal.SIGINT, handle_shutdown_signal)
except Exception as e:
logger.critical(f"Failed to initialize FastAPI: {str(e)}")
logger.warning("Application cannot continue without FastAPI initialization")
sys.exit(1)
def retry(max_retries: int = 3, delay: float = 1.0):
"""Decorator for retrying failed operations"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
last_error = e
logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
await asyncio.sleep(delay * (attempt + 1))
raise last_error
return wrapper
return decorator
class HealthCheckResponse(BaseModel):
status: str
timestamp: str
details: Optional[dict] = None
class RedisHealthResponse(BaseModel):
connected: bool
latency_ms: Optional[float] = None
error: Optional[str] = None
class WorkerStatusResponse(BaseModel):
running: bool
workers: int
active_tasks: int
queue_depth: int
class ErrorResponse(BaseModel):
error_id: str
timestamp: str
status_code: int
message: str
details: Optional[str] = None
@app.middleware("http")
async def error_handling_middleware(request: Request, call_next):
request_id = str(uuid.uuid4())
logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}")
try:
response = await call_next(request)
return response
except HTTPException as e:
logger.error(f"HTTP Error: {e.status_code} - {e.detail}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.utcnow().isoformat(),
status_code=e.status_code,
message=e.detail,
details=str(e)
)
return JSONResponse(status_code=e.status_code, content=error_response.model_dump())
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.utcnow().isoformat(),
status_code=500,
message="Internal Server Error",
details=str(e)
)
return JSONResponse(status_code=500, content=error_response.model_dump())
webhook_handler = JiraWebhookHandler()
@app.post("/jira-webhook")
async def jira_webhook_handler(payload: JiraWebhookPayload):
logger.info(f"Received webhook payload: {payload.model_dump()}")
try:
response = await webhook_handler.handle_webhook(payload)
logger.info(f"Webhook processed successfully")
return response
except Exception as e:
logger.error(f"Error processing webhook: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/test-llm")
async def test_llm():
"""Test endpoint for LLM integration"""
test_payload = JiraWebhookPayload(
issueKey="TEST-123",
summary="Test issue",
description="This is a test issue for LLM integration",
comment="Testing OpenAI integration with Langfuse",
labels=["test"],
status="Open",
assignee="Tester",
updated="2025-07-04T21:40:00Z"
)
return await webhook_handler.handle_webhook(test_payload)
async def check_redis_health() -> RedisHealthResponse:
"""Check Redis connection health and latency"""
if not redis_client:
return RedisHealthResponse(connected=False, error="Redis is disabled")
try:
start_time = time.time()
await redis_client.ping()
latency_ms = (time.time() - start_time) * 1000
return RedisHealthResponse(connected=True, latency_ms=latency_ms)
except Exception as e:
return RedisHealthResponse(connected=False, error=str(e))
async def get_worker_status() -> WorkerStatusResponse:
"""Get worker process status and queue depth"""
# TODO: Implement actual worker status checking
return WorkerStatusResponse(
running=True,
workers=1,
active_tasks=0,
queue_depth=0
)
@app.get("/health")
async def health_check():
"""Service health check endpoint"""
redis_health = await check_redis_health()
worker_status = await get_worker_status()
return HealthCheckResponse(
status="healthy",
timestamp=datetime.utcnow().isoformat(),
details={
"redis": redis_health.model_dump(),
"workers": worker_status.model_dump()
}
)
@app.get("/health/redis")
async def redis_health_check():
"""Redis-specific health check endpoint"""
return await check_redis_health()
@app.get("/health/workers")
async def workers_health_check():
"""Worker process health check endpoint"""
return await get_worker_status()
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@ -1,13 +1,24 @@
from typing import Union
import json
import sys
from typing import Union, Any # Import Any
from pydantic import SecretStr # Re-import SecretStr
import re # Import re for regex operations
from langchain_core.prompts import (
ChatPromptTemplate,
PromptTemplate,
SystemMessagePromptTemplate,
HumanMessagePromptTemplate,
)
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnablePassthrough, Runnable
from langchain_ollama import OllamaLLM
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_google_genai import ChatGoogleGenerativeAI # New import for Gemini
from loguru import logger
import sys
from llm.models import AnalysisFlags
from config import settings
from .models import AnalysisFlags
class LLMInitializationError(Exception):
"""Custom exception for LLM initialization errors"""
@ -16,15 +27,14 @@ class LLMInitializationError(Exception):
self.details = details
# Initialize LLM
llm = None
llm: Union[ChatOpenAI, OllamaLLM, ChatGoogleGenerativeAI, None] = None # Add ChatGoogleGenerativeAI
if settings.llm.mode == 'openai':
logger.info(f"Initializing ChatOpenAI with model: {settings.openai_model}")
logger.info(f"Initializing ChatOpenAI with model: {settings.llm.openai_model}")
llm = ChatOpenAI(
model=settings.openai_model,
model=settings.llm.openai_model if settings.llm.openai_model else "", # Ensure model is str
temperature=0.7,
max_tokens=2000,
api_key=settings.openai_api_key,
base_url=settings.openai_api_base_url
api_key=settings.llm.openai_api_key, # type: ignore # Suppress Pylance error due to SecretStr type mismatch
base_url=settings.llm.openai_api_base_url
)
elif settings.llm.mode == 'ollama':
logger.info(f"Initializing OllamaLLM with model: {settings.llm.ollama_model} at {settings.llm.ollama_base_url}")
@ -43,18 +53,18 @@ elif settings.llm.mode == 'ollama':
llm = OllamaLLM(
model=settings.llm.ollama_model,
base_url=base_url,
streaming=False,
timeout=30, # 30 second timeout
max_retries=3
# , # Retry up to 3 times
# temperature=0.1,
# top_p=0.2
num_ctx=32000
# Removed streaming, timeout, max_retries as they are not valid parameters for OllamaLLM
)
# Test connection
# Test connection only if not in a test environment
import os
if os.getenv("IS_TEST_ENV") != "true":
logger.debug("Testing Ollama connection...")
# llm.invoke("test") # Simple test request
llm.invoke("test /no_think ") # Simple test request
logger.info("Ollama connection established successfully")
else:
logger.info("Skipping Ollama connection test in test environment.")
except Exception as e:
error_msg = f"Failed to initialize Ollama: {str(e)}"
@ -72,24 +82,71 @@ elif settings.llm.mode == 'ollama':
"\n3. The model is available",
details=details
) from e
elif settings.llm.mode == 'gemini': # New: Add Gemini initialization
logger.info(f"Initializing ChatGoogleGenerativeAI with model: {settings.llm.gemini_model}")
try:
if not settings.llm.gemini_api_key:
raise ValueError("Gemini API key is not configured")
if not settings.llm.gemini_model:
raise ValueError("Gemini model is not specified")
llm = ChatGoogleGenerativeAI(
model=settings.llm.gemini_model,
temperature=0.7,
google_api_key=settings.llm.gemini_api_key
)
# Test connection only if not in a test environment
import os
if os.getenv("IS_TEST_ENV") != "true":
logger.debug("Testing Gemini connection...")
llm.invoke("test /no_think ") # Simple test request
logger.info("Gemini connection established successfully")
else:
logger.info("Skipping Gemini connection test in test environment.")
except Exception as e:
error_msg = f"Failed to initialize Gemini: {str(e)}"
details = {
'model': settings.llm.gemini_model,
'error_type': type(e).__name__
}
logger.error(error_msg)
logger.debug(f"Connection details: {details}")
raise LLMInitializationError(
"Failed to connect to Gemini service. Please check:"
"\n1. GEMINI_API_KEY is correct"
"\n2. GEMINI_MODEL is correct and accessible"
"\n3. Network connectivity to Gemini API",
details=details
) from e
if llm is None:
logger.error("LLM could not be initialized. Exiting.")
print("\nERROR: Unable to initialize LLM. Check logs for details.", file=sys.stderr)
sys.exit(1)
# Ensure llm is treated as a Runnable for chaining
# Cast llm to Any to bypass static type checking for chaining if it's causing issues
llm_runnable: Runnable = llm # type: ignore
# Set up Output Parser for structured JSON
parser = JsonOutputParser(pydantic_object=AnalysisFlags)
parser = JsonOutputParser()
# Load prompt template from file
def load_prompt_template(version="v1.1.0"):
def load_prompt_template(version="v1.3.0"):
try:
with open(f"llm/prompts/jira_analysis_{version}.txt", "r") as f:
with open(f"llm/jira_analysis_{version}.txt", "r") as f:
template_content = f.read()
# Split system and user parts
if "\n\nUSER:\n" in template_content:
system_template, user_template = template_content.split("\n\nUSER:\n")
system_template = system_template.replace("SYSTEM:\n", "").strip()
else:
# Handle legacy format
system_template = template_content
user_template = "Analyze this Jira ticket: {issueKey}"
return ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template(system_template),
@ -102,13 +159,35 @@ def load_prompt_template(version="v1.1.0"):
# Fallback prompt template
FALLBACK_PROMPT = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template(
"Analyze Jira tickets and output JSON with hasMultipleEscalations, customerSentiment"
"Analyze Jira tickets and output JSON with hasMultipleEscalations"
),
HumanMessagePromptTemplate.from_template(
"Issue Key: {issueKey}\nSummary: {summary}"
)
])
# Helper function to extract JSON from a string that might contain other text
def extract_json(text: str) -> str:
"""
Extracts the first complete JSON object from a string.
Assumes the JSON object is enclosed in curly braces {}.
"""
# Find the first occurrence of '{'
start_index = text.find('{')
if start_index == -1:
logger.warning(f"No opening curly brace found in LLM response: {text}")
return text # Return original text if no JSON start is found
# Find the last occurrence of '}'
end_index = text.rfind('}')
if end_index == -1 or end_index < start_index:
logger.warning(f"No closing curly brace found or invalid JSON structure in LLM response: {text}")
return text # Return original text if no JSON end is found or it's before the start
json_string = text[start_index : end_index + 1]
logger.debug(f"Extracted JSON string: {json_string}")
return json_string
# Create chain with fallback mechanism
def create_analysis_chain():
try:
@ -126,66 +205,48 @@ def create_analysis_chain():
"format_instructions": lambda _: parser.get_format_instructions()
}
| prompt_template
| llm
| llm_runnable # Use the explicitly typed runnable
| extract_json # Add the new extraction step
| parser
)
# Add langfuse handler if enabled
if settings.langfuse.enabled:
chain = chain.with_config(
callbacks=[settings.langfuse_handler]
)
return chain
except Exception as e:
logger.warning(f"Using fallback prompt due to error: {str(e)}")
chain = FALLBACK_PROMPT | llm | parser
if settings.langfuse.enabled:
chain = chain.with_config(
callbacks=[settings.langfuse_handler]
)
chain = FALLBACK_PROMPT | llm_runnable # Use the explicitly typed runnable
return chain
# Initialize analysis chain
analysis_chain = create_analysis_chain()
# Enhanced response validation function
def validate_response(response: Union[dict, str]) -> bool:
def validate_response(response: Union[dict, str], issue_key: str = "N/A") -> bool:
"""Validate the JSON response structure and content"""
try:
# If response is a string, attempt to parse it as JSON
if isinstance(response, str):
logger.debug(f"[{issue_key}] Raw LLM response (string): {response}")
try:
response = json.loads(response)
except json.JSONDecodeError:
logger.error(f"Invalid JSON response: {response}")
raise ValueError("Invalid JSON response format")
except json.JSONDecodeError as e:
logger.error(f"[{issue_key}] JSONDecodeError: {e}. Raw response: {response}")
return False
# Ensure response is a dictionary
if not isinstance(response, dict):
logger.error(f"[{issue_key}] Response is not a dictionary: {type(response)}")
return False
# Check required fields
required_fields = ["hasMultipleEscalations", "customerSentiment"]
if not all(field in response for field in required_fields):
return False
# Validate field types
if not isinstance(response["hasMultipleEscalations"], bool):
return False
if response["customerSentiment"] is not None:
if not isinstance(response["customerSentiment"], str):
return False
logger.debug(f"[{issue_key}] Parsed LLM response (JSON): {json.dumps(response)}")
# Validate against schema using AnalysisFlags model
try:
AnalysisFlags.model_validate(response)
return True
except Exception:
return False
except Exception:
except Exception as e:
logger.warning(f"[{issue_key}] Pydantic validation failed: {e}. Continuing with raw response: {response}")
return True # Allow processing even if validation fails
except Exception as e:
logger.error(f"[{issue_key}] Unexpected error during response validation: {e}. Response: {response}")
return False

View File

@ -0,0 +1,61 @@
SYSTEM:
You are an expert AI assistant that analyzes Jira tickets and outputs a concise summary in a valid JSON format.
Your output MUST be a single JSON object and nothing else.
## JSON Output Schema
{format_instructions}
## Field-by-Field Instructions
1. **`issueCategory` (string)**
* Classify the core problem. Choose ONE: "technical_issue", "data_request", "access_problem", "general_question", "other".
2. **`area` (string)**
* Classify the technical domain. Choose the BEST fit from the following options:
* `"Direct Channel"`
* `"Streaming Channel"`
* `"Java Batch Channel"`
* `"ETL Batch Channel"`
* `"DCR Service"`
* `"API Gateway"`
* `"Callback Service"`
* `"Publisher"`
* `"Reconciliation"`
* `"Snowflake"`
* `"Authentication"`
* `"Other"`
3. **`isEscalated` (boolean)**
* Set to `true` if the user explicitly states they are escalating, mentions previous unanswered requests, or if the tone is highly frustrated and urgent.
* Set to `false` otherwise.
5. **`oneSentenceSummary` (string)**
* A single paragraph in concise English that summarizes the discussion.
## Example
### Input:
- Summary: "DCR Rejected by OneKey"
- Description: "Our DCR for PforceRx was rejected by OneKey. Can the MDM HUB team investigate? This is blocking our weekly report."
- Comment: ""
### Output:
```json
{{
"issueCategory": "technical_issue",
"area": "Application Service",
"isEscalated": false,
"oneSentenceSummary": "A DCR for PforceRx was rejected by OneKey, which is blocking a weekly report."
}}
USER:
Analyze the following Jira ticket:
Issue Key: {issueKey}
Summary: {summary}
Description: {description}
Status: {status}
Existing Labels: {labels}
Assignee: {assignee}
Last Updated: {updated}
Latest Comment (if applicable): {comment}

View File

@ -0,0 +1,92 @@
SYSTEM:
You are an expert AI assistant that analyzes Jira tickets and outputs a concise summary in a valid JSON format.
Your output MUST be a single JSON object and nothing else.
## JSON Output Schema
{format_instructions}
## Field-by-Field Instructions
1. **`issueCategory` (string)**
* Classify the core problem. Choose ONE: "technical_issue", "data_request", "access_problem", "general_question", "other".
2. **`area` (string)**
* Classify the technical domain. Choose the BEST fit from the following options:
* `"Direct Channel"`
* `"Streaming Channel"`
* `"Java Batch Channel"`
* `"ETL Batch Channel"`
* `"DCR Service"`
* `"API Gateway"`
* `"Callback Service"`
* `"Publisher"`
* `"Reconciliation"`
* `"Snowflake"`
* `"Authentication"`
* `"Other"`
3. **`urgencyScore` (float)**
* Estimate the client's urgency and frustration level as a float between 0.0 and 1.0.
* **0.0 - 0.2 (Low Urgency)**: Calm, routine inquiries. No explicit or implicit signs of frustration or urgency.
* **0.3 - 0.5 (Moderate Urgency)**: Client is seeking information, but without strong emotional indicators. May mention minor inconveniences or follow-ups without demanding immediate action.
* **0.6 - 0.8 (High Urgency)**: Client expresses clear frustration, uses words like "urgent," "high priority," "ASAP," "blocked," "critical," or implies delays are causing significant issues. May mention previous unanswered requests or multiple attempts to get attention.
* **0.9 - 1.0 (Critical Urgency)**: Client is highly frustrated, demanding immediate resolution, or explicitly stating an escalation. This indicates a severe impact or a breakdown in communication.
* Infer urgency from keywords, tone, repeated inquiries, and the impact described (e.g., "blocking our weekly report").
5. ### `description` (string)
- Provide a concise summary of the **current status of the problem**, specifically detailing any **actions or verifications that have already been completed or confirmed.** Focus on what is known or has been done up to this point regarding the described issue.
6. ### `actions` (string)
- Generate a multi-line, formatted text block that summarizes all actionable items, open topics, considerations, and next steps. Each point MUST start with a bullet (`- `) and be on a new line.
- For each point, use the following structure based on its `type`:
- **Action Item:** `- [STATUS] [Assignee Name (Team)] Detailed description of the action.`
- **STATUS:** Use `DONE`, `IN PROGRESS`, `OPEN`.
- **Assignee Name:** The name of the person responsible (e.g., "Grzegorz", "Ireneusz").
- **Team:** The team responsible: `MDM Team`, `MDM HUB Team`, `External Team`, `Other Team`. If unknown, use `Unknown Team`.
- **Open Topic:** `- Open Topic: Detailed description of the unresolved issue/question.`
- **Consideration:** `- Consideration: Detailed description of a factor or implication to be aware of.`
- **Next Step:** `- Next Step: [Assignee Name (Team)] Detailed description of the next action to take.`
- Ensure the description for each point is clear and verbose enough.
7. ### `timeToResolutionDays` (number)
- Calculate the total time in **days** from the **initial request or first mention of the problem** in the ticket to the point where the **problem was confirmed to be resolved or a fix was delivered/implemented**.
- **Steps for calculation:**
1. Identify the earliest date of communication (email or comment) describing the problem.
2. Identify the latest date of communication (email or comment) explicitly stating the problem's resolution, fix delivery, or successful verification.
3. Calculate the difference in whole days between these two dates.
- If either date cannot be identified, set this value to `null`.
## Example (Using your provided User section for calculation)
### Input:
- Summary: "RE: Failure in Global MDM Ex-US interface"
- Description: "++MDM and MDM Hub team Hi MDM and MDM HUB team, Can you please look into this and revert as soon as possible ? Regards Saraswati *From:* Choudhary, Anita *Sent:* Monday, June 16, 2025 5:27 PM *To:* Moses, Donie *Cc:* Subramanya, Suraj K *Subject:* RE: Failure in Global MDM Ex-US interface Hi Donie, There has not been any activity from support end which I have understood from the team which have impacted this. Upon comparing the P_HCP structure in PROD and QA it looks like there are 116 columns in QA and 128 in PROD. We could observe 13 fields are less in QA.PFB: [@Das, Deepanjan] Could you please check at your end # If any ongoing activity has impacted this # Was there any change in DDL of P_HCP in QA # When was the last DDL changed and by whom if possible. Thanks & Regards, Anita *From:* Moses, Donie <[Donie.Moses@pfizer.com]> *Sent:* Monday, June 16, 2025 3:58 PM *To:* Sahu, Saraswati <[Saraswati.Sahu@pfizer.com]> *Cc:* Subramanya, Suraj K <[Suraj.KSubramanya@pfizer.com]> *Subject:* RE: Failure in Global MDM Ex-US interface [@Choudhary, Anita] Could you get this checked please ? Thanks, Donie * *Donie Moses** * *Manager Data Integration Support** * *Pfizer Digital Global Support and Operations** Office: +44-7587 442339 Mobile: +44-7483 255494 [donie.moses@pfizer.com] Address: Pfizer Ltd., Dorking Road, Walton Oaks, Tadworth, Surrey. KT20 7NS *From:* Sahu, Saraswati <[Saraswati.Sahu@pfizer.com]> *Sent:* Monday, 16 June 2025 10:03 *To:* B, Pavithra <[Pavithra.B@pfizer.com]> *Cc:* Choudhary, Anita <[Anita.Choudhary@pfizer.com]> *Subject:* Failure in Global MDM Ex-US interface Hi Pavithra, Yasaswini, While testing the Global MDM Ex-US Interface in lower environment ,we have encountered one failure because IS_SPEKAER field has been removed from P_HCP table. This field is available in Production environment. Are you aware of this change ? Can you check with MDM team why this field has been removed in lower environment ? Regards Saraswati"
- Status: "Resolved"
- Latest Comment (excerpt for context): "==COMMENT_NOTE_13 COMMENT_NOTE_13_AUTHOR: Bachanowicz, Mieczysław (Irek) COMMENT_NOTE_13_DATE: 2025-06-25T10:23:16.8-0400 COMMENT_NOTE_13_BODY: Hi [ @Subramanya, Suraj K] First, I apologize for the lengthy delay—this situation was not straightforward, and standard SOPs did not address the problem. The issue with the missing column has been resolved. Please verify on your side to ensure everything is working correctly. The root cause was the recent deletion of US data from EMEA cluster. IS_SPEAKER column was populated only with US-related records. Our procedures for updating the data model determined that if there were no data remaining in the column, the column itself was unnecessary—hence, it was removed. At this point, we have implemented a temporary fix. Tomorrow, we will work on developing a permanent solution to prevent similar issues in the future. Regards, Mieczysław (Irek) Bachanowicz *MDM HUB team* ---- *Od:* Subramanya, Suraj K *Wysłane:* środa, 25 czerwca 2025 14:16 *Do:* Das, Deepanjan *DW:* Vedula, Anvesh C. *Temat:* RE: Failure in Global MDM Ex-US interface Hi Team, Any update on the issue ? Our testing is put on hold due to this issue. Can we please look into this on priority ? Regards, Suraj ===COMMENT_NOTE_14 COMMENT_NOTE_14_AUTHOR: Suraj.KSubramanya@pfizer.com COMMENT_NOTE_14_DATE: 2025-06-26T00:52:20.7-0400 COMMENT_NOTE_14_BODY: Thank you, Irek and team. We can see the column now. Regards, Suraj ---- ="
### Output:
```json
{{
"urgencyScore": 0.7,
"issueCategory": "technical_issue",
"area": "Streaming Channel",
"description": "The Global MDM Ex-US Interface failed in a lower environment due to the removal of the IS_SPEAKER field from the P_HCP table, which is present in Production. MDM and MDM HUB teams were asked to investigate immediately. Initial internal comparison shows 13 fewer columns in QA vs PROD for P_HCP structure. This includes missing schema details and potential DDL changes. Manual refresh process was initiated and completed, confirming column availability on QA/STG. The root cause was identified as the recent deletion of US data from the EMEA cluster.",
"actions": "- [DONE] Ireneusz (MDM Team) Confirmed DCR ID in internal system for correctness. (Note: This action is from a previous example and might not fit the current ticket. The LLM should generate actions relevant to the provided input.)\\n- [DONE] Karol (Unknown Team) Manually reran refresh process.\\n- [DONE] Grzegorz (MDM HUB Team) Confirmed columns are available on QA/STG in P_HCP.\\n- [DONE] Mieczysław Bachanowicz (MDM HUB Team) Confirmed issue with missing column has been resolved.\\n- [OPEN] Mieczysław Bachanowicz (MDM HUB Team) Develop a permanent solution to prevent similar issues in the future.\\n- Open Topic: Confirm if the IS_SPEAKER field is now permanently available in QA/STG and if it will persist.\\n- Next Step: Suraj (External Team) Verify the fix on their side to ensure everything is working correctly for testing.",
"timeToResolutionDays": 9
}}
```
USER:
Analyze the following Jira ticket:
Issue Key: {issueKey}
Summary: {summary}
Description: {description}
Status: {status}
Existing Labels: {labels}
Assignee: {assignee}
Last Updated: {updated}
Latest Comment (if applicable): {comment}

View File

@ -1,7 +1,36 @@
from typing import Optional, List, Union
from enum import Enum
from loguru import logger
from pydantic import BaseModel, ConfigDict, field_validator, Field
from datetime import datetime
from config import settings
class LLMResponse(BaseModel):
status: str
message: str
class IssueCategory(str, Enum):
TECHNICAL_ISSUE = "technical_issue"
DATA_REQUEST = "data_request"
ACCESS_PROBLEM = "access_problem"
GENERAL_QUESTION = "general_question"
OTHER = "other"
# New: Add an Enum for technical areas based on Confluence doc
class Area(str, Enum):
DIRECT_CHANNEL = "Direct Channel"
STREAMING_CHANNEL = "Streaming Channel"
JAVA_BATCH_CHANNEL = "Java Batch Channel"
ETL_BATCH_CHANNEL = "ETL Batch Channel"
DCR_SERVICE = "DCR Service"
API_GATEWAY = "API Gateway"
CALLBACK_SERVICE = "Callback Service"
PUBLISHER = "Publisher"
RECONCILIATION = "Reconciliation"
SNOWFLAKE = "Snowflake"
AUTHENTICATION = "Authentication"
OTHER = "Other"
class JiraWebhookPayload(BaseModel):
model_config = ConfigDict(alias_generator=lambda x: ''.join(word.capitalize() if i > 0 else word for i, word in enumerate(x.split('_'))), populate_by_name=True)
@ -23,29 +52,25 @@ class JiraWebhookPayload(BaseModel):
updated: Optional[str] = None
class AnalysisFlags(BaseModel):
hasMultipleEscalations: bool = Field(description="Is there evidence of multiple escalation attempts?")
customerSentiment: Optional[str] = Field(description="Overall customer sentiment (e.g., 'neutral', 'frustrated', 'calm').")
model_config = ConfigDict(alias_generator=lambda x: ''.join(word.capitalize() if i > 0 else word for i, word in enumerate(x.split('_'))), populate_by_name=True)
def __init__(self, **data):
super().__init__(**data)
issueCategory: IssueCategory = Field(..., description="The primary category of the Jira ticket.")
area: Area = Field(..., description="The technical area of the MDM HUB related to the issue.")
urgencyScore: float = Field(..., ge=0.0, le=1.0, description="A float between 0.0 and 1.0 indicating the estimated urgency and frustration of the client. Higher values indicate more urgency/frustration, inferred from words like 'urgent', 'high priority', 'ASAP', 'blocked', or mentions of previous unanswered requests.")
description: str = Field(..., description="A single paragraph in concise English that summarizes the discussion.")
actions: str = Field(..., description="A summary of the actions taken or recommended.")
timeToResolutionDays: int = Field(..., description="The estimated time to resolution in days.")
# Track model usage if Langfuse is enabled and client is available
if settings.langfuse.enabled and hasattr(settings, 'langfuse_client'):
try:
if settings.langfuse_client is None:
logger.warning("Langfuse client is None despite being enabled")
return
settings.langfuse_client.trace(
name="LLM Model Usage",
input=data,
metadata={
"model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model,
"analysis_flags": {
"hasMultipleEscalations": self.hasMultipleEscalations,
"customerSentiment": self.customerSentiment
}
}
)
except Exception as e:
logger.error(f"Failed to track model usage: {e}")
class JiraAnalysisResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
issue_key: str
status: str
issue_summary: str
request_payload: dict
analysis_result: Optional[dict] = None
created_at: datetime
updated_at: datetime
error_message: Optional[str] = None
raw_response: Optional[dict] = None

View File

@ -1,29 +0,0 @@
import unittest
from llm.chains import load_prompt_template, validate_response
from llm.models import AnalysisFlags
class PromptTests(unittest.TestCase):
def test_prompt_loading(self):
"""Test that prompt template loads correctly"""
try:
template = load_prompt_template()
self.assertIsNotNone(template)
self.assertIn("issueKey", template.input_variables)
except Exception as e:
self.fail(f"Prompt loading failed: {str(e)}")
def test_response_validation(self):
"""Test response validation logic"""
valid_response = {
"hasMultipleEscalations": False,
"customerSentiment": "neutral"
}
invalid_response = {
"customerSentiment": "neutral"
}
self.assertTrue(validate_response(valid_response))
self.assertFalse(validate_response(invalid_response))
if __name__ == "__main__":
unittest.main()

View File

@ -1,23 +0,0 @@
You are an AI assistant designed to analyze Jira ticket details containe email correspondence and extract key flags and sentiment and extracting information into a strict JSON format.
Analyze the following Jira ticket information and provide your analysis in a JSON format.
Ensure the JSON strictly adheres to the specified schema.
Consider the overall context of the ticket and specifically the latest comment if provided.
Issue Key: {issueKey}
Summary: {summary}
Description: {description}
Status: {status}
Existing Labels: {labels}
Assignee: {assignee}
Last Updated: {updated}
Latest Comment (if applicable): {comment}
**Analysis Request:**
- Determine if there are signs of multiple escalation attempts in the descriptions or comments with regards to HUB team. Escalation to other teams are not considered.
-- Usually multiple requests one after another are being called by the same user in span of hours or days asking for immediate help of HUB team. Normall discussion, responses back and forth, are not considered as a escalation.
- Assess if the issue requires urgent attention based on language or context from the summary, description, or latest comment.
-- Usually means that Customer is asking for help due to upcoming deadlines, other high priority issues which are blocked due to our stall.
- Summarize the overall customer sentiment evident in the issue. Analyse tone of responses, happiness, gratefullnes, iritation, etc.
{format_instructions}

View File

@ -1,27 +0,0 @@
SYSTEM:
You are an AI assistant designed to analyze Jira ticket details containing email correspondence and extract key flags and sentiment, outputting information in a strict JSON format.
Your output MUST be ONLY a valid JSON object. Do NOT include any conversational text, explanations, or markdown outside the JSON.
The JSON structure MUST follow this exact schema. If a field cannot be determined, use `null` for strings/numbers or empty list `[]` for arrays.
Consider the overall context of the ticket and specifically the latest comment if provided.
**Analysis Request:**
- Determine if there are signs of multiple escalation attempts in the descriptions or comments with regards to HUB team. Escalation to other teams are not considered.
-- Usually multiple requests one after another are being called by the same user in span of hours or days asking for immediate help of HUB team. Normal discussion, responses back and forth, are not considered as an escalation.
- Assess if the issue requires urgent attention based on language or context from the summary, description, or latest comment.
-- Usually means that Customer is asking for help due to upcoming deadlines, other high priority issues which are blocked due to our stall.
- Summarize the overall customer sentiment evident in the issue. Analyze tone of responses, happiness, gratefulness, irritation, etc.
{format_instructions}
USER:
Issue Key: {issueKey}
Summary: {summary}
Description: {description}
Status: {status}
Existing Labels: {labels}
Assignee: {assignee}
Last Updated: {updated}
Latest Comment (if applicable): {comment}

View File

@ -1,102 +0,0 @@
import sys
import os
from pathlib import Path
from datetime import datetime
from typing import Optional
from loguru import logger
# Basic fallback logging configuration
logger.remove()
logger.add(sys.stderr, level="WARNING", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
def configure_logging(log_level: str = "INFO", log_dir: Optional[str] = None):
"""Configure structured logging for the application with fallback handling"""
try:
# Log that we're attempting to configure logging
logger.warning("Attempting to configure logging...")
# Default log directory
if not log_dir:
log_dir = os.getenv("LOG_DIR", "logs")
# Create log directory if it doesn't exist
Path(log_dir).mkdir(parents=True, exist_ok=True)
# Log file path with timestamp
log_file = Path(log_dir) / f"jira-webhook-llm_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
# Remove any existing loggers
logger.remove()
# Add console logger
logger.add(
sys.stdout,
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {extra[request_id]} | {message}",
colorize=True,
backtrace=True,
diagnose=True
)
# Add file logger
logger.add(
str(log_file),
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {extra[request_id]} | {message}",
rotation="100 MB",
retention="30 days",
compression="zip",
backtrace=True,
diagnose=True
)
# Configure default extras
logger.configure(extra={"request_id": "N/A"})
logger.info("Logging configured successfully")
except Exception as e:
# Fallback to basic logging if configuration fails
logger.remove()
logger.add(sys.stderr, level="WARNING", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
logger.error(f"Failed to configure logging: {str(e)}. Using fallback logging configuration.")
"""Configure structured logging for the application"""
# Default log directory
if not log_dir:
log_dir = os.getenv("LOG_DIR", "logs")
# Create log directory if it doesn't exist
Path(log_dir).mkdir(parents=True, exist_ok=True)
# Log file path with timestamp
log_file = Path(log_dir) / f"jira-webhook-llm_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
# Remove any existing loggers
logger.remove()
# Add console logger
logger.add(
sys.stdout,
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {extra[request_id]} | {message}",
colorize=True,
backtrace=True,
diagnose=True
)
# Add file logger
logger.add(
str(log_file),
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {extra[request_id]} | {message}",
rotation="100 MB",
retention="30 days",
compression="zip",
backtrace=True,
diagnose=True
)
# Configure default extras
logger.configure(extra={"request_id": "N/A"})
logger.info("Logging configured successfully")

211
main.py Normal file
View File

@ -0,0 +1,211 @@
# Standard library imports
import json
import os
import sys
import time
import asyncio
import signal
import uuid
from datetime import datetime, timezone
from typing import Dict, Optional
from http import HTTPStatus
from functools import partial, wraps
from contextlib import asynccontextmanager
# Third-party imports
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from loguru import logger
from langfuse import Langfuse # Import Langfuse
from langfuse.langchain import CallbackHandler # Import CallbackHandler
# Local application imports
from shared_store import RequestStatus, requests_queue, ProcessingRequest
from llm.models import JiraWebhookPayload
from llm.chains import analysis_chain, validate_response
from app.handlers import jira_router, queue_router # Import new routers
from config import settings
# Initialize Langfuse client globally
langfuse_client = None
if settings.langfuse.enabled:
langfuse_client = Langfuse(
public_key=settings.langfuse.public_key,
secret_key=settings.langfuse.secret_key,
host=settings.langfuse.host
)
logger.info("Langfuse client initialized.")
else:
logger.info("Langfuse integration is disabled.")
async def process_single_jira_request(request: ProcessingRequest):
"""Processes a single Jira webhook request using the LLM."""
payload = JiraWebhookPayload.model_validate(request.payload)
# Initialize Langfuse callback handler for this trace
langfuse_handler = None
if langfuse_client:
langfuse_handler = CallbackHandler() # No arguments needed for constructor
logger.bind(
issue_key=payload.issueKey,
request_id=request.id,
timestamp=datetime.now(timezone.utc).isoformat()
).info(f"[{payload.issueKey}] Processing webhook request.")
llm_input = {
"issueKey": payload.issueKey,
"summary": payload.summary,
"description": payload.description if payload.description else "No description provided.",
"status": payload.status if payload.status else "Unknown",
"labels": ", ".join(payload.labels) if payload.labels else "None",
"assignee": payload.assignee if payload.assignee else "Unassigned",
"updated": payload.updated if payload.updated else "Unknown",
"comment": payload.comment if payload.comment else "No new comment provided."
}
try:
# Pass the Langfuse callback handler to the ainvoke method
raw_llm_response = await analysis_chain.ainvoke(
llm_input,
config={
"callbacks": [langfuse_handler],
"callbacks_extra": {
"session_id": str(request.id),
"trace_name": f"Jira-Analysis-{payload.issueKey}"
}
} if langfuse_handler else {}
)
# Store the raw LLM response
request.response = raw_llm_response
if not validate_response(raw_llm_response, payload.issueKey): # Pass issueKey for logging
error_msg = f"Invalid LLM response structure: {raw_llm_response}"
logger.error(f"[{payload.issueKey}] {error_msg}")
raise ValueError(error_msg)
logger.debug(f"[{payload.issueKey}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}")
logger.info(f"[{payload.issueKey}] Successfully processed request {request.id}.")
except Exception as e:
logger.error(f"[{payload.issueKey}] LLM processing failed: {str(e)}")
request.status = RequestStatus.FAILED
request.error = str(e)
raise
finally:
if langfuse_handler:
langfuse_client.flush() # Ensure all traces are sent
logger.debug(f"[{payload.issueKey}] Langfuse client flushed.")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Starts background processing loop with database integration"""
async def processing_loop():
while True:
request = None
try:
request = requests_queue.get_next_request()
if request:
try:
request.status = RequestStatus.PROCESSING
request.started_at = datetime.now(timezone.utc)
# Process request
await process_single_jira_request(request)
request.status = RequestStatus.COMPLETED
request.completed_at = datetime.now(timezone.utc)
except Exception as e:
request.status = RequestStatus.FAILED
request.error = str(e)
request.completed_at = datetime.now(timezone.utc)
request.retry_count += 1
if request.retry_count < settings.processor.max_retries:
retry_delay = min(
settings.processor.initial_retry_delay_seconds * (2 ** request.retry_count),
3600
)
logger.warning(f"Request {request.id} failed, will retry in {retry_delay}s")
else:
logger.error(f"Request {request.id} failed after {request.retry_count} attempts")
finally:
if request:
requests_queue.task_done()
except Exception as e:
logger.error(f"Processing loop error: {str(e)}")
await asyncio.sleep(settings.processor.poll_interval_seconds)
task = asyncio.create_task(processing_loop())
try:
logger.info("Application initialized with processing loop started")
yield
finally:
# Ensure all tasks are done before cancelling the processing loop
logger.info("Waiting for pending queue tasks to complete...")
# requests_queue.join()
task.cancel()
await task # Await the task to ensure it's fully cancelled and cleaned up
logger.info("Processing loop terminated")
def create_app():
"""Factory function to create FastAPI app instance"""
_app = FastAPI(lifespan=lifespan)
# Include routers
_app.include_router(jira_router)
_app.include_router(queue_router)
# Add health check endpoint
@_app.get("/health")
async def health_check():
return {"status": "healthy"}
# Add error handling middleware
@_app.middleware("http")
async def error_handling_middleware(request: Request, call_next):
request_id = str(uuid.uuid4())
logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}")
try:
response = await call_next(request)
return response
except HTTPException as e:
logger.error(f"HTTP Error: {e.status_code} - {e.detail}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.now(timezone.utc).isoformat(),
status_code=e.status_code,
message=e.detail,
details=str(e)
)
return JSONResponse(status_code=e.status_code, content=error_response.model_dump())
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.now(timezone.utc).isoformat(),
status_code=500,
message="Internal Server Error",
details=str(e)
)
return JSONResponse(status_code=500, content=error_response.model_dump())
return _app
class ErrorResponse(BaseModel):
error_id: str
timestamp: str
status_code: int
message: str
details: Optional[str] = None
app = create_app()
app = create_app()

View File

@ -1,21 +1,21 @@
fastapi==0.111.0
pydantic==2.9.0 # Changed from 2.7.4 to meet ollama's requirement
pydantic-settings==2.0.0
langchain==0.3.26
langchain-ollama==0.3.3
langchain-openai==0.3.27
langchain-core==0.3.68
langfuse==3.1.3
pydantic==2.7.4
pydantic-settings>=2.0.0
langchain>=0.2.0
langchain-ollama>=0.1.0
langchain-openai>=0.1.0
langchain-google-genai==2.1.8
langchain-core>=0.3.68,<0.4.0 # Pin to the range required by langchain-google-genai
langfuse==3.2.1
uvicorn==0.30.1
python-multipart==0.0.9 # Good to include for FastAPI forms
loguru==0.7.3
plotly==5.15.0
# Testing dependencies
unittest2>=1.1.0
# Testing dependencies
# unittest2>=1.1.0 # Removed as it's an older backport
pytest==8.2.0
pytest-asyncio==0.23.5
pytest-cov==4.1.0
httpx==0.27.0
PyYAML
redis>=5.0.0
PyYAML==6.0.2
SQLAlchemy==2.0.30
alembic==1.13.1

120
shared_store.py Normal file
View File

@ -0,0 +1,120 @@
from typing import List, Dict, Optional, Any # Import Any
import threading
from datetime import datetime, timezone
from enum import Enum
class RequestStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
TIMEOUT = "timeout"
# Thread-safe storage for requests and responses
from queue import Queue
from dataclasses import dataclass, field
@dataclass
class ProcessingRequest:
id: int
payload: Dict
status: RequestStatus = RequestStatus.PENDING
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
error: Optional[str] = None
retry_count: int = 0
response: Optional[Any] = None # Add response field
class RequestQueue:
def __init__(self):
self._queue: Queue[ProcessingRequest] = Queue()
self._requests: List[ProcessingRequest] = [] # To store all requests for retrieval
self._processing_lock = threading.Lock()
self._id_lock = threading.Lock()
self._current_id = 0
def _get_next_id(self) -> int:
"""Generate and return the next available request ID"""
with self._id_lock:
self._current_id += 1
return self._current_id
def add_request(self, payload: Dict) -> int:
"""Adds a new request to the queue and returns its ID"""
request_id = self._get_next_id()
request = ProcessingRequest(id=request_id, payload=payload)
self._queue.put(request)
with self._processing_lock: # Protect access to _requests list
self._requests.append(request)
return request_id
def get_next_request(self) -> Optional[ProcessingRequest]:
"""Fetches the next available request from the queue"""
with self._processing_lock:
if not self._queue.empty():
return self._queue.get()
return None
def get_all_requests(self) -> List[ProcessingRequest]:
"""Returns a list of all requests currently in the store."""
with self._processing_lock:
return list(self._requests) # Return a copy to prevent external modification
def get_request_by_id(self, request_id: int) -> Optional[ProcessingRequest]:
"""Retrieves a specific request by its ID."""
with self._processing_lock:
return next((req for req in self._requests if req.id == request_id), None)
def get_latest_completed_by_issue_key(self, issue_key: str) -> Optional[ProcessingRequest]:
"""
Retrieves the latest successfully completed request for a given issue key.
Skips pending or failed requests.
"""
with self._processing_lock:
completed_requests = [
req for req in self._requests
if req.payload.get("issueKey") == issue_key and req.status == RequestStatus.COMPLETED
]
# Sort by completed_at to get the latest, if available
completed_requests.sort(key=lambda req: req.completed_at if req.completed_at else datetime.min, reverse=True)
return completed_requests[0] if completed_requests else None
def are_there_any_open_requests_for_issue_key(self, issue_key: str) -> bool:
"""
Checks if there are any pending or processing requests for a given issue key.
"""
with self._processing_lock:
for req in self._requests:
if req.payload.get("issueKey") == issue_key and \
(req.status == RequestStatus.PENDING or req.status == RequestStatus.PROCESSING):
return True
return False
def delete_request_by_id(self, request_id: int) -> bool:
"""Deletes a specific request by its ID."""
with self._processing_lock:
initial_length = len(self._requests)
self._requests = [req for req in self._requests if req.id != request_id]
return len(self._requests) < initial_length
def clear_all_requests(self):
"""Clears all requests from the store."""
with self._processing_lock:
self._requests.clear()
# Clear the queue as well, though it's generally processed
while not self._queue.empty():
try:
self._queue.get_nowait()
except Exception:
continue
def task_done(self):
"""Indicates that a formerly enqueued task is complete."""
self._queue.task_done()
def join(self):
"""Blocks until all items in the queue have been gotten and processed."""
self._queue.join()
requests_queue = RequestQueue()

View File

@ -1 +0,0 @@
# Initialize tests package

View File

@ -1,20 +0,0 @@
import pytest
from fastapi.testclient import TestClient
from jira_webhook_llm import app
@pytest.fixture
def test_client():
return TestClient(app)
@pytest.fixture
def mock_jira_payload():
return {
"issueKey": "TEST-123",
"summary": "Test Issue",
"description": "Test Description",
"comment": "Test Comment",
"labels": ["test"],
"status": "Open",
"assignee": "Tester",
"updated": "2025-07-13T12:00:00Z"
}

View File

@ -1,254 +0,0 @@
import pytest
from fastapi import HTTPException
from jira_webhook_llm import app
from llm.models import JiraWebhookPayload
from unittest.mock import patch, MagicMock
import redis
from datetime import datetime, timedelta
import time
@pytest.fixture
def mock_jira_payload():
return {
"issueKey": "TEST-123",
"summary": "Test issue",
"description": "Test description",
"comment": "Test comment",
"labels": ["bug", "urgent"],
"status": "Open",
"assignee": "testuser",
"updated": "2025-07-14T00:00:00Z"
}
def test_error_handling_middleware(test_client, mock_jira_payload):
# Test 404 error handling
response = test_client.post("/nonexistent-endpoint", json={})
assert response.status_code == 404
assert "error_id" in response.json()
# Test validation error handling
invalid_payload = mock_jira_payload.copy()
invalid_payload.pop("issueKey")
response = test_client.post("/jira-webhook", json=invalid_payload)
assert response.status_code == 422
assert "details" in response.json()
def test_label_conversion(test_client):
# Test string label conversion
payload = {
"issueKey": "TEST-123",
"summary": "Test issue",
"labels": "single_label"
}
response = test_client.post("/jira-webhook", json=payload)
assert response.status_code == 200
# Test list label handling
payload["labels"] = ["label1", "label2"]
response = test_client.post("/jira-webhook", json=payload)
assert response.status_code == 200
def test_camel_case_handling(test_client):
# Test camelCase field names
payload = {
"issue_key": "TEST-123",
"summary": "Test issue",
"description": "Test description"
}
response = test_client.post("/jira-webhook", json=payload)
assert response.status_code == 200
def test_optional_fields(test_client):
# Test with only required fields
payload = {
"issueKey": "TEST-123",
"summary": "Test issue"
}
response = test_client.post("/jira-webhook", json=payload)
assert response.status_code == 200
# Test with all optional fields
payload.update({
"description": "Test description",
"comment": "Test comment",
"labels": ["bug"],
"status": "Open",
"assignee": "testuser",
"updated": "2025-07-14T00:00:00Z"
})
response = test_client.post("/jira-webhook", json=payload)
assert response.status_code == 200
def test_webhook_handler(test_client, mock_jira_payload):
# Test successful webhook handling
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
assert "response" in response.json()
def test_llm_test_endpoint(test_client):
# Test LLM test endpoint
response = test_client.post("/test-llm")
assert response.status_code == 200
assert "response" in response.json()
def test_retry_decorator():
# Test retry decorator functionality
@app.retry(max_retries=3)
async def failing_function():
raise Exception("Test error")
with pytest.raises(Exception):
failing_function()
def test_rate_limiting(test_client, mock_jira_payload):
"""Test rate limiting functionality"""
with patch('redis.Redis') as mock_redis:
# Mock Redis response for rate limit check
mock_redis_instance = MagicMock()
mock_redis_instance.zcard.return_value = 100 # Exceed limit
mock_redis.from_url.return_value = mock_redis_instance
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 429
assert "Too many requests" in response.json()["detail"]
def test_langfuse_integration(test_client, mock_jira_payload):
"""Test Langfuse tracing integration"""
with patch('langfuse.Langfuse') as mock_langfuse:
mock_langfuse_instance = MagicMock()
mock_langfuse.return_value = mock_langfuse_instance
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
mock_langfuse_instance.start_span.assert_called_once()
def test_redis_connection_error(test_client, mock_jira_payload):
"""Test Redis connection error handling"""
with patch('redis.Redis') as mock_redis:
mock_redis.side_effect = redis.ConnectionError("Connection failed")
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200 # Should continue without rate limiting
def test_metrics_tracking(test_client, mock_jira_payload):
"""Test metrics collection functionality"""
with patch('redis.Redis') as mock_redis:
mock_redis_instance = MagicMock()
mock_redis.from_url.return_value = mock_redis_instance
# Make multiple requests to test metrics
for _ in range(3):
test_client.post("/jira-webhook", json=mock_jira_payload)
# Verify metrics were updated
handler = app.dependency_overrides.get('get_webhook_handler')()
assert handler.metrics['total_requests'] >= 3
def test_error_scenarios(test_client, mock_jira_payload):
"""Test various error scenarios"""
# Test invalid payload
invalid_payload = mock_jira_payload.copy()
invalid_payload.pop('issueKey')
response = test_client.post("/jira-webhook", json=invalid_payload)
assert response.status_code == 422
# Test LLM processing failure
with patch('llm.chains.analysis_chain.ainvoke') as mock_llm:
mock_llm.side_effect = Exception("LLM failed")
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
assert "error" in response.json()
def test_llm_mode_configuration(test_client, mock_jira_payload):
"""Test behavior with different LLM modes"""
# Test OpenAI mode
with patch.dict('os.environ', {'LLM_MODE': 'openai'}):
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
# Test Ollama mode
with patch.dict('os.environ', {'LLM_MODE': 'ollama'}):
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
def test_langfuse_configuration(test_client, mock_jira_payload):
"""Test Langfuse enabled/disabled scenarios"""
# Test with Langfuse enabled
with patch.dict('os.environ', {'LANGFUSE_ENABLED': 'true'}):
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
# Test with Langfuse disabled
with patch.dict('os.environ', {'LANGFUSE_ENABLED': 'false'}):
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
def test_redis_configuration(test_client, mock_jira_payload):
"""Test Redis enabled/disabled scenarios"""
# Test with Redis enabled
with patch.dict('os.environ', {'REDIS_ENABLED': 'true'}):
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
# Test with Redis disabled
with patch.dict('os.environ', {'REDIS_ENABLED': 'false'}):
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
def test_validation_error_handling(test_client):
# Test missing required field
payload = {"summary": "Test issue"} # Missing issueKey
response = test_client.post("/jira-webhook", json=payload)
assert response.status_code == 422
assert "details" in response.json()
assert "issueKey" in response.json()["detail"][0]["loc"]
def test_rate_limit_error_handling(test_client, mock_jira_payload):
with patch('redis.Redis') as mock_redis:
mock_redis_instance = MagicMock()
mock_redis_instance.zcard.return_value = 100 # Exceed limit
mock_redis.from_url.return_value = mock_redis_instance
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 429
assert "Too many requests" in response.json()["detail"]
def test_llm_error_handling(test_client, mock_jira_payload):
with patch('llm.chains.analysis_chain.ainvoke') as mock_llm:
mock_llm.side_effect = Exception("LLM processing failed")
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
assert "error" in response.json()
assert "LLM processing failed" in response.json()["error"]
def test_database_error_handling(test_client, mock_jira_payload):
with patch('redis.Redis') as mock_redis:
mock_redis.side_effect = redis.ConnectionError("Database connection failed")
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
assert "Database connection failed" in response.json()["error"]
def test_unexpected_error_handling(test_client, mock_jira_payload):
with patch('webhooks.handlers.JiraWebhookHandler.handle_webhook') as mock_handler:
mock_handler.side_effect = Exception("Unexpected error")
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 500
assert "Unexpected error" in response.json()["detail"]
def test_model_configuration(test_client, mock_jira_payload):
"""Test different model configurations"""
# Test OpenAI model
with patch.dict('os.environ', {
'LLM_MODE': 'openai',
'OPENAI_MODEL': 'gpt-4'
}):
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
# Test Ollama model
with patch.dict('os.environ', {
'LLM_MODE': 'ollama',
'OLLAMA_MODEL': 'phi4-mini:latest'
}):
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200

View File

@ -1,72 +0,0 @@
import pytest
from fastapi.testclient import TestClient
from jira_webhook_llm.dashboard import router
from unittest.mock import patch, MagicMock
import json
client = TestClient(router)
def test_dashboard_langfuse_disabled():
with patch('config.settings.langfuse.enabled', False):
response = client.get("/dashboard")
assert response.status_code == 200
assert "Langfuse monitoring is disabled" in response.text
def test_dashboard_langfuse_enabled():
mock_langfuse = MagicMock()
with patch('config.settings.langfuse.enabled', True), \
patch('config.settings.langfuse_client', mock_langfuse):
response = client.get("/dashboard")
assert response.status_code == 200
assert "System Monitoring Dashboard" in response.text
assert "Plotly.newPlot" in response.text
def test_queue_depth_data():
from jira_webhook_llm.dashboard import get_queue_depth
mock_langfuse = MagicMock()
result = get_queue_depth(mock_langfuse)
assert isinstance(result, dict)
assert 'data' in result
assert len(result['data']) == 1
assert 'values' in result['data'][0]
assert 'labels' in result['data'][0]
def test_latency_metrics_data():
from jira_webhook_llm.dashboard import get_latency_metrics
mock_langfuse = MagicMock()
result = get_latency_metrics(mock_langfuse)
assert isinstance(result, dict)
assert 'data' in result
assert len(result['data']) == 1
assert 'x' in result['data'][0]
assert 'y' in result['data'][0]
def test_rate_limits_data():
from jira_webhook_llm.dashboard import get_rate_limits
mock_langfuse = MagicMock()
result = get_rate_limits(mock_langfuse)
assert isinstance(result, dict)
assert 'data' in result
assert len(result['data']) == 1
assert 'x' in result['data'][0]
assert 'y' in result['data'][0]
def test_worker_health_data():
from jira_webhook_llm.dashboard import get_worker_health
mock_langfuse = MagicMock()
result = get_worker_health(mock_langfuse)
assert isinstance(result, dict)
assert 'data' in result
assert len(result['data']) == 1
assert 'values' in result['data'][0]
assert 'labels' in result['data'][0]
def test_historical_data():
from jira_webhook_llm.dashboard import get_historical_data
mock_langfuse = MagicMock()
result = get_historical_data(mock_langfuse)
assert isinstance(result, dict)
assert 'data' in result
assert len(result['data']) == 1
assert 'x' in result['data'][0]
assert 'y' in result['data'][0]

View File

@ -1,69 +0,0 @@
import pytest
import json
from llm.chains import validate_response
from llm.models import AnalysisFlags
def test_validate_response_valid():
"""Test validation with valid response"""
response = {
"hasMultipleEscalations": False,
"customerSentiment": "neutral"
}
assert validate_response(response) is True
def test_validate_response_valid_json_string():
"""Test validation with valid JSON string"""
response = json.dumps({
"hasMultipleEscalations": True,
"customerSentiment": "frustrated"
})
assert validate_response(response) is True
def test_validate_response_invalid_json_string():
"""Test validation with invalid JSON string"""
response = "not a valid json"
assert validate_response(response) is False
def test_validate_response_missing_field():
"""Test validation with missing required field"""
response = {
"hasMultipleEscalations": False
}
assert validate_response(response) is False
def test_validate_response_invalid_type():
"""Test validation with invalid field type"""
response = {
"hasMultipleEscalations": "not a boolean",
"customerSentiment": "neutral"
}
assert validate_response(response) is False
def test_validate_response_null_sentiment():
"""Test validation with null sentiment"""
response = {
"hasMultipleEscalations": True,
"customerSentiment": None
}
assert validate_response(response) is True
def test_validate_response_invalid_structure():
"""Test validation with invalid JSON structure"""
response = "not a dictionary"
assert validate_response(response) is False
def test_validate_response_complex_error():
"""Test validation with multiple errors"""
response = {
"hasMultipleEscalations": "invalid",
"customerSentiment": 123
}
assert validate_response(response) is False
def test_validate_response_model_validation():
"""Test validation using Pydantic model"""
response = {
"hasMultipleEscalations": True,
"customerSentiment": "calm"
}
assert AnalysisFlags.model_validate(response) is not None

View File

@ -1,165 +0,0 @@
from fastapi import HTTPException, Request
from loguru import logger
import json
import redis
from typing import Optional, List, Union
from pydantic import BaseModel, ConfigDict, field_validator
from datetime import datetime, timedelta
import time
from config import settings
from langfuse import Langfuse
from llm.models import JiraWebhookPayload, AnalysisFlags
from llm.chains import analysis_chain, validate_response
class BadRequestError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=400, detail=detail)
class RateLimitError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=429, detail=detail)
class ValidationError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=422, detail=detail)
class JiraWebhookHandler:
def __init__(self):
self.analysis_chain = analysis_chain
self.redis = None
self.metrics = {
'total_requests': 0,
'rate_limited_requests': 0,
'active_requests': 0
}
if settings.redis.enabled:
self.redis = redis.Redis.from_url(settings.redis.url)
async def check_rate_limit(self, ip: str) -> bool:
"""Check if request is within rate limit using sliding window algorithm"""
if not self.redis:
return True
current_time = time.time()
window_size = settings.redis.rate_limit.window
max_requests = settings.redis.rate_limit.max_requests
# Remove old requests outside the window
self.redis.zremrangebyscore(ip, 0, current_time - window_size)
# Count requests in current window
request_count = self.redis.zcard(ip)
self.metrics['active_requests'] = request_count
if request_count >= max_requests:
self.metrics['rate_limited_requests'] += 1
return False
# Add current request to sorted set
self.redis.zadd(ip, {current_time: current_time})
self.redis.expire(ip, window_size)
return True
async def handle_webhook(self, payload: JiraWebhookPayload, request: Request):
try:
self.metrics['total_requests'] += 1
# Check rate limit
if settings.redis.enabled:
ip = request.client.host
if not await self.check_rate_limit(ip):
raise RateLimitError(
f"Too many requests. Limit is {settings.redis.rate_limit.max_requests} "
f"requests per {settings.redis.rate_limit.window} seconds"
)
if not payload.issueKey:
raise BadRequestError("Missing required field: issueKey")
if not payload.summary:
raise BadRequestError("Missing required field: summary")
logger.bind(
issue_key=payload.issueKey,
timestamp=datetime.utcnow().isoformat()
).info("Received webhook")
# Create Langfuse trace if enabled
trace = None
if settings.langfuse.enabled:
trace = settings.langfuse_client.start_span(
name="Jira Webhook",
input=payload.dict(),
metadata={
"trace_id": f"webhook-{payload.issueKey}",
"issue_key": payload.issueKey,
"timestamp": datetime.utcnow().isoformat()
}
)
llm_input = {
"issueKey": payload.issueKey,
"summary": payload.summary,
"description": payload.description if payload.description else "No description provided.",
"status": payload.status if payload.status else "Unknown",
"labels": ", ".join(payload.labels) if payload.labels else "None",
"assignee": payload.assignee if payload.assignee else "Unassigned",
"updated": payload.updated if payload.updated else "Unknown",
"comment": payload.comment if payload.comment else "No new comment provided."
}
# Create Langfuse span for LLM processing if enabled
llm_span = None
if settings.langfuse.enabled and trace:
llm_span = trace.start_span(
name="LLM Processing",
input=llm_input,
metadata={
"model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model
}
)
try:
analysis_result = await self.analysis_chain.ainvoke(llm_input)
# Update Langfuse span with output if enabled
if settings.langfuse.enabled and llm_span:
llm_span.update(output=analysis_result)
llm_span.end()
# Validate LLM response
if not validate_response(analysis_result):
logger.warning(f"Invalid LLM response format for {payload.issueKey}")
analysis_result = {
"hasMultipleEscalations": False,
"customerSentiment": "neutral"
}
logger.debug(f"LLM Analysis Result for {payload.issueKey}: {json.dumps(analysis_result, indent=2)}")
return {"status": "success", "analysis_flags": analysis_result}
except Exception as e:
logger.error(f"LLM processing failed for {payload.issueKey}: {str(e)}")
# Log error to Langfuse if enabled
if settings.langfuse.enabled and llm_span:
llm_span.error(e)
llm_span.end()
return {
"status": "error",
"analysis_flags": {
"hasMultipleEscalations": False,
"customerSentiment": "neutral"
},
"error": str(e)
}
except Exception as e:
logger.error(f"Error processing webhook: {str(e)}")
import traceback
logger.error(f"Stack trace: {traceback.format_exc()}")
# Log error to Langfuse if enabled
if settings.langfuse.enabled and trace:
trace.end(error=e)
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")