Compare commits

...

16 Commits

Author SHA1 Message Date
Ireneusz Bachanowicz
d1fa9385e7 Release for qwen3:4b model
Some checks failed
CI/CD Pipeline / test (push) Has been cancelled
2025-08-01 18:15:57 +02:00
Ireneusz Bachanowicz
6f5e817011 Another synchro
Some checks failed
CI/CD Pipeline / test (push) Has been cancelled
2025-07-28 09:40:04 +02:00
Ireneusz Bachanowicz
6a57d91b7e 0.2.0 - conf for sandbox
Some checks failed
CI/CD Pipeline / test (push) Has been cancelled
2025-07-22 18:24:02 +02:00
9e698e40f9 STABLE feat: Implement Gemini integration; update configuration for Gemini API and model; enhance Jira webhook processing; refactor application structure and dependencies
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-22 00:41:17 +02:00
Ireneusz Bachanowicz
79bf65265d feat: Refactor Jira webhook handling; introduce new routes for request processing and response retrieval; update configuration settings and dependencies
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-21 22:42:56 +02:00
cbe1a430ad feat: Refactor API handlers and consolidate webhook processing; implement new webhook routes and enhance request handling
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-21 16:16:20 +02:00
8c1ab79eeb Refactor LLM analysis chain and models; remove deprecated prompt files
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
- Updated `chains.py` to streamline imports and improve error handling for LLM initialization.
- Modified `models.py` to enhance the `AnalysisFlags` model with field aliases and added datetime import.
- Deleted outdated prompt files (`jira_analysis_v1.0.0.txt`, `jira_analysis_v1.1.0.txt`, `jira_analysis_v1.2.0.txt`) to clean up the repository.
- Introduced a new prompt file `jira_analysis_v1.2.0.txt` with updated instructions for analysis.
- Removed `logging_config.py` and test files to simplify the codebase.
- Updated webhook handler to improve error handling and logging.
- Added a new shared store for managing processing requests in a thread-safe manner.
2025-07-21 01:06:45 +02:00
a1bec4f674 feat: Enhance Jira webhook processing with retry logic and configuration; update analysis record handling and validation
Some checks failed
CI/CD Pipeline / test (push) Has been cancelled
2025-07-19 02:15:13 +02:00
e73b43e001 feat: Implement Jira webhook processor with retry logic and service configuration; enhance database interaction for analysis records
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-18 23:05:18 +02:00
041ba14424 All test pytests working
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-18 20:55:21 +02:00
ff66181768 feat: Update .gitignore to exclude .venv and modify config path for application.yml; enhance test setup with detailed logging and mock LLM analysis
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-18 20:49:21 +02:00
1ff74e3ffb feat: Add JiraAnalysisResponse model and update handlers to use it for analysis record serialization
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-18 01:44:17 +02:00
1de9f46517 feat: Add endpoint to create Jira analysis records and update existing endpoints for consistency
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-18 00:57:28 +02:00
935a8a49ae Almost stable tests
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-17 02:21:56 +02:00
de4758a26f feat: Improve graceful shutdown handling and enhance application initialization logging
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-16 00:27:45 +02:00
aa416f3652 feat: Add Ollama configuration file and update .gitignore to exclude .env
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
2025-07-15 23:00:53 +02:00
32 changed files with 1391 additions and 869 deletions

16
.env Normal file
View File

@ -0,0 +1,16 @@
# Ollama configuration
# LLM_OLLAMA_BASE_URL="https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
LLM_OLLAMA_BASE_URL=http://ollama-jira:11434
# LLM_OLLAMA_MODEL=phi4-mini:latest
LLM_OLLAMA_MODEL=qwen3:4b
LOG_LEVEL=DEBUG
# Ollama API Key (required when using Ollama mode)
# Langfuse configuration
LANGFUSE_ENABLED=false
# LANGFUSE_PUBLIC_KEY="pk-lf-"
# LANGFUSE_SECRET_KEY="sk-lf-"
# LANGFUSE_HOST="https://cloud.langfuse.com"
# Gemini configuration
LLM_GEMINI_API_KEY=""
LLM_GEMINI_MODEL="gemini-2.5-flash"
LLM_MODE=ollama

18
.env-cnf_local Normal file
View File

@ -0,0 +1,18 @@
# Ollama configuration
# LLM_OLLAMA_BASE_URL=http://192.168.0.140:11434
LLM_OLLAMA_BASE_URL=http://192.168.0.122:11434
# LLM_OLLAMA_BASE_URL="https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
# LLM_OLLAMA_MODEL=phi4-mini:latest
LLM_OLLAMA_MODEL=qwen3:4b
# Logging configuration
LOG_LEVEL=DEBUG
# Ollama API Key (required when using Ollama mode)
# Langfuse configuration
LANGFUSE_ENABLED=true
LANGFUSE_PUBLIC_KEY="pk-lf-38238bc3-ffa1-48d3-8c8f-2048ecf8cc54"
LANGFUSE_SECRET_KEY="sk-lf-36f6bd37-b0e2-4656-9c63-fe98915e8872"
LANGFUSE_HOST="http://192.168.0.122:3000"
# Gemini configuration
LLM_GEMINI_API_KEY=""
LLM_GEMINI_MODEL="gemini-2.5-flash"
LLM_MODE=ollama

16
.env_cnf_amer Normal file
View File

@ -0,0 +1,16 @@
# Ollama configuration
# LLM_OLLAMA_BASE_URL="https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
LLM_OLLAMA_BASE_URL=http://ollama-jira:11434
# LLM_OLLAMA_MODEL=phi4-mini:latest
LLM_OLLAMA_MODEL=qwen3:4b
LOG_LEVEL=DEBUG
# Ollama API Key (required when using Ollama mode)
# Langfuse configuration
LANGFUSE_ENABLED=false
# LANGFUSE_PUBLIC_KEY="pk-lf-"
# LANGFUSE_SECRET_KEY="sk-lf-"
# LANGFUSE_HOST="https://cloud.langfuse.com"
# Gemini configuration
LLM_GEMINI_API_KEY=""
LLM_GEMINI_MODEL="gemini-2.5-flash"
LLM_MODE=ollama

4
.gitignore vendored
View File

@ -12,10 +12,12 @@ __pycache__/
.Python .Python
env/ env/
venv/ venv/
.venv/
*.egg *.egg
*.egg-info/ *.egg-info/
build/ build/
dist/ dist/
.roo/*
# Editor files (e.g., Visual Studio Code, Sublime Text, Vim) # Editor files (e.g., Visual Studio Code, Sublime Text, Vim)
.vscode/ .vscode/
@ -45,5 +47,5 @@ obj/
*.class *.class
# Miscellaneous # Miscellaneous
.env #.env
.DS_Store .DS_Store

10
.roo/mcp.json Normal file
View File

@ -0,0 +1,10 @@
{"mcpServers":{ "context7": {
"command": "npx",
"args": [
"-y",
"@upstash/context7-mcp"
],
"env": {
"DEFAULT_MINIMUM_TOKENS": "256"
}
}}}

32
=3.2.0
View File

@ -1,32 +0,0 @@
Requirement already satisfied: langfuse in ./venv/lib/python3.12/site-packages (3.1.3)
Requirement already satisfied: backoff>=1.10.0 in ./venv/lib/python3.12/site-packages (from langfuse) (2.2.1)
Requirement already satisfied: httpx<1.0,>=0.15.4 in ./venv/lib/python3.12/site-packages (from langfuse) (0.27.0)
Requirement already satisfied: opentelemetry-api<2.0.0,>=1.33.1 in ./venv/lib/python3.12/site-packages (from langfuse) (1.34.1)
Requirement already satisfied: opentelemetry-exporter-otlp<2.0.0,>=1.33.1 in ./venv/lib/python3.12/site-packages (from langfuse) (1.34.1)
Requirement already satisfied: opentelemetry-sdk<2.0.0,>=1.33.1 in ./venv/lib/python3.12/site-packages (from langfuse) (1.34.1)
Requirement already satisfied: packaging<25.0,>=23.2 in ./venv/lib/python3.12/site-packages (from langfuse) (24.2)
Requirement already satisfied: pydantic<3.0,>=1.10.7 in ./venv/lib/python3.12/site-packages (from langfuse) (2.9.0)
Requirement already satisfied: requests<3,>=2 in ./venv/lib/python3.12/site-packages (from langfuse) (2.32.4)
Requirement already satisfied: wrapt<2.0,>=1.14 in ./venv/lib/python3.12/site-packages (from langfuse) (1.17.2)
Requirement already satisfied: anyio in ./venv/lib/python3.12/site-packages (from httpx<1.0,>=0.15.4->langfuse) (4.9.0)
Requirement already satisfied: certifi in ./venv/lib/python3.12/site-packages (from httpx<1.0,>=0.15.4->langfuse) (2025.6.15)
Requirement already satisfied: httpcore==1.* in ./venv/lib/python3.12/site-packages (from httpx<1.0,>=0.15.4->langfuse) (1.0.9)
Requirement already satisfied: idna in ./venv/lib/python3.12/site-packages (from httpx<1.0,>=0.15.4->langfuse) (3.10)
Requirement already satisfied: sniffio in ./venv/lib/python3.12/site-packages (from httpx<1.0,>=0.15.4->langfuse) (1.3.1)
Requirement already satisfied: h11>=0.16 in ./venv/lib/python3.12/site-packages (from httpcore==1.*->httpx<1.0,>=0.15.4->langfuse) (0.16.0)
Requirement already satisfied: importlib-metadata<8.8.0,>=6.0 in ./venv/lib/python3.12/site-packages (from opentelemetry-api<2.0.0,>=1.33.1->langfuse) (8.7.0)
Requirement already satisfied: typing-extensions>=4.5.0 in ./venv/lib/python3.12/site-packages (from opentelemetry-api<2.0.0,>=1.33.1->langfuse) (4.14.1)
Requirement already satisfied: opentelemetry-exporter-otlp-proto-grpc==1.34.1 in ./venv/lib/python3.12/site-packages (from opentelemetry-exporter-otlp<2.0.0,>=1.33.1->langfuse) (1.34.1)
Requirement already satisfied: opentelemetry-exporter-otlp-proto-http==1.34.1 in ./venv/lib/python3.12/site-packages (from opentelemetry-exporter-otlp<2.0.0,>=1.33.1->langfuse) (1.34.1)
Requirement already satisfied: googleapis-common-protos~=1.52 in ./venv/lib/python3.12/site-packages (from opentelemetry-exporter-otlp-proto-grpc==1.34.1->opentelemetry-exporter-otlp<2.0.0,>=1.33.1->langfuse) (1.70.0)
Requirement already satisfied: grpcio<2.0.0,>=1.63.2 in ./venv/lib/python3.12/site-packages (from opentelemetry-exporter-otlp-proto-grpc==1.34.1->opentelemetry-exporter-otlp<2.0.0,>=1.33.1->langfuse) (1.73.1)
Requirement already satisfied: opentelemetry-exporter-otlp-proto-common==1.34.1 in ./venv/lib/python3.12/site-packages (from opentelemetry-exporter-otlp-proto-grpc==1.34.1->opentelemetry-exporter-otlp<2.0.0,>=1.33.1->langfuse) (1.34.1)
Requirement already satisfied: opentelemetry-proto==1.34.1 in ./venv/lib/python3.12/site-packages (from opentelemetry-exporter-otlp-proto-grpc==1.34.1->opentelemetry-exporter-otlp<2.0.0,>=1.33.1->langfuse) (1.34.1)
Requirement already satisfied: protobuf<6.0,>=5.0 in ./venv/lib/python3.12/site-packages (from opentelemetry-proto==1.34.1->opentelemetry-exporter-otlp-proto-grpc==1.34.1->opentelemetry-exporter-otlp<2.0.0,>=1.33.1->langfuse) (5.29.5)
Requirement already satisfied: opentelemetry-semantic-conventions==0.55b1 in ./venv/lib/python3.12/site-packages (from opentelemetry-sdk<2.0.0,>=1.33.1->langfuse) (0.55b1)
Requirement already satisfied: annotated-types>=0.4.0 in ./venv/lib/python3.12/site-packages (from pydantic<3.0,>=1.10.7->langfuse) (0.7.0)
Requirement already satisfied: pydantic-core==2.23.2 in ./venv/lib/python3.12/site-packages (from pydantic<3.0,>=1.10.7->langfuse) (2.23.2)
Requirement already satisfied: tzdata in ./venv/lib/python3.12/site-packages (from pydantic<3.0,>=1.10.7->langfuse) (2025.2)
Requirement already satisfied: charset_normalizer<4,>=2 in ./venv/lib/python3.12/site-packages (from requests<3,>=2->langfuse) (3.4.2)
Requirement already satisfied: urllib3<3,>=1.21.1 in ./venv/lib/python3.12/site-packages (from requests<3,>=2->langfuse) (2.5.0)
Requirement already satisfied: zipp>=3.20 in ./venv/lib/python3.12/site-packages (from importlib-metadata<8.8.0,>=6.0->opentelemetry-api<2.0.0,>=1.33.1->langfuse) (3.23.0)

View File

@ -40,13 +40,17 @@ ENV PYTHONDONTWRITEBYTECODE=1 \
# Copy the configuration directory first. # Copy the configuration directory first.
# If only code changes, this layer remains cached. # If only code changes, this layer remains cached.
COPY config ./config COPY config ./config
COPY llm ./llm
COPY app ./app
# Copy your application source code. # Copy your application source code.
COPY jira-webhook-llm.py . COPY main.py .
COPY config.py . COPY config.py .
COPY shared_store.py .
# Expose the port your application listens on. # Expose the port your application listens on.
EXPOSE 8000 EXPOSE 8000
# Define the command to run your application. # Define the command to run your application.
CMD ["uvicorn", "jira-webhook-llm:app", "--host", "0.0.0.0", "--port", "8000"] CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

60
app/handlers.py Normal file
View File

@ -0,0 +1,60 @@
from datetime import datetime, timezone
from fastapi import APIRouter, Request, HTTPException, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from llm.models import JiraWebhookPayload
from shared_store import requests_queue, ProcessingRequest
from loguru import logger
jira_router = APIRouter(
prefix="/jira",
tags=["Jira"]
)
queue_router = APIRouter(
prefix="/queue",
tags=["Queue"]
)
@jira_router.post("/sendRequest", status_code=201)
async def send_jira_request(payload: JiraWebhookPayload):
"""Send requests to add to queue for further processing"""
request_id = requests_queue.add_request(payload.model_dump())
return {"request_id": request_id}
class GetResponseRequest(BaseModel):
issueKey: str
@jira_router.post("/getResponse")
async def get_jira_response(request: GetResponseRequest):
"""Get response attribute provided by ollama for a given issueKey."""
if requests_queue.are_there_any_open_requests_for_issue_key(request.issueKey):
raise HTTPException(
status_code=409,
detail=f"There are still pending or processing requests for issueKey: {request.issueKey}. Please wait for them to be processed."
)
matched_request = requests_queue.get_latest_completed_by_issue_key(request.issueKey)
if not matched_request:
raise HTTPException(status_code=404, detail=f"No completed request found for issueKey: {request.issueKey}")
return matched_request.response if matched_request.response else "No response yet"
@queue_router.get("/getAll")
async def get_all_requests_in_queue():
"""Gets all requests"""
all_requests = requests_queue.get_all_requests()
return {"requests": all_requests}
@queue_router.get("/getPending")
async def get_pending_requests_in_queue():
"""Gets all requests waiting to be processed"""
all_requests = requests_queue.get_all_requests()
pending = [req for req in all_requests if req.status == "pending"]
return {"requests": pending}
@queue_router.delete("/clearAll")
async def clear_all_requests_in_queue():
"""Clear all the requests from the queue"""
requests_queue.clear_all_requests()
return {"status": "cleared"}

298
config.py
View File

@ -1,63 +1,25 @@
import os import os
import logging
import sys import sys
from typing import Optional from typing import Optional
from pydantic_settings import BaseSettings from pydantic_settings import BaseSettings
from pydantic import validator, ConfigDict from langfuse._client.client import Langfuse
from loguru import logger from pydantic import field_validator
from watchfiles import watch, Change from pydantic_settings import SettingsConfigDict
from threading import Thread
from langfuse import Langfuse
from langfuse.langchain import CallbackHandler
import yaml import yaml
_logger = logging.getLogger(__name__)
from pathlib import Path from pathlib import Path
class LangfuseConfig(BaseSettings): class LangfuseConfig(BaseSettings):
enabled: bool = True enabled: bool = False
public_key: Optional[str] = None
secret_key: Optional[str] = None secret_key: Optional[str] = None
public_key: Optional[str] = None
host: Optional[str] = None host: Optional[str] = None
@validator('host') model_config = SettingsConfigDict(
def validate_host(cls, v):
if v and not v.startswith(('http://', 'https://')):
raise ValueError("Langfuse host must start with http:// or https://")
return v
def __init__(self, **data):
try:
logger.info("Initializing LangfuseConfig with data: {}", data)
logger.info("Environment variables:")
logger.info("LANGFUSE_PUBLIC_KEY: {}", os.getenv('LANGFUSE_PUBLIC_KEY'))
logger.info("LANGFUSE_SECRET_KEY: {}", os.getenv('LANGFUSE_SECRET_KEY'))
logger.info("LANGFUSE_HOST: {}", os.getenv('LANGFUSE_HOST'))
super().__init__(**data)
logger.info("LangfuseConfig initialized successfully")
logger.info("Public Key: {}", self.public_key)
logger.info("Secret Key: {}", self.secret_key)
logger.info("Host: {}", self.host)
except Exception as e:
logger.error("Failed to initialize LangfuseConfig: {}", e)
logger.error("Current environment variables:")
logger.error("LANGFUSE_PUBLIC_KEY: {}", os.getenv('LANGFUSE_PUBLIC_KEY'))
logger.error("LANGFUSE_SECRET_KEY: {}", os.getenv('LANGFUSE_SECRET_KEY'))
logger.error("LANGFUSE_HOST: {}", os.getenv('LANGFUSE_HOST'))
raise
model_config = ConfigDict(
env_prefix='LANGFUSE_', env_prefix='LANGFUSE_',
env_file='.env', env_file='.env',
env_file_encoding='utf-8', env_file_encoding='utf-8',
extra='ignore',
env_nested_delimiter='__',
case_sensitive=True
)
class LogConfig(BaseSettings):
level: str = 'INFO'
model_config = ConfigDict(
env_prefix='LOG_',
extra='ignore' extra='ignore'
) )
@ -73,167 +35,135 @@ class LLMConfig(BaseSettings):
ollama_base_url: Optional[str] = None ollama_base_url: Optional[str] = None
ollama_model: Optional[str] = None ollama_model: Optional[str] = None
@validator('mode') # Gemini settings
gemini_api_key: Optional[str] = None
gemini_model: Optional[str] = None
gemini_api_base_url: Optional[str] = None # Add this for Gemini
@field_validator('mode')
def validate_mode(cls, v): def validate_mode(cls, v):
if v not in ['openai', 'ollama']: if v not in ['openai', 'ollama', 'gemini']: # Add 'gemini'
raise ValueError("LLM mode must be either 'openai' or 'ollama'") raise ValueError("LLM mode must be 'openai', 'ollama', or 'gemini'")
return v return v
model_config = ConfigDict( model_config = SettingsConfigDict(
env_prefix='LLM_', env_prefix='LLM_',
env_file='.env', env_file='.env',
env_file_encoding='utf-8', env_file_encoding='utf-8',
extra='ignore' extra='ignore'
) )
class ProcessorConfig(BaseSettings):
poll_interval_seconds: int = 10
max_retries: int = 5
initial_retry_delay_seconds: int = 60
model_config = SettingsConfigDict(
env_prefix='PROCESSOR_',
env_file='.env',
env_file_encoding='utf-8',
extra='ignore'
)
class Settings: class Settings:
def __init__(self): def __init__(self):
try: try:
logger.info("Loading configuration from application.yml and environment variables") # Load settings from YAML file as a fallback
# Load configuration from YAML file
yaml_config = self._load_yaml_config() yaml_config = self._load_yaml_config()
logger.info("Loaded YAML config: {}", yaml_config) # Add this log line
# Initialize configurations, allowing environment variables to override YAML # Load settings from environment and .env file first
logger.info("Initializing LogConfig") self.llm = LLMConfig()
self.log = LogConfig(**yaml_config.get('log', {})) self.processor = ProcessorConfig()
logger.info("LogConfig initialized: {}", self.log.model_dump()) self.langfuse = LangfuseConfig()
logger.info("Initializing LLMConfig") # Apply YAML configuration for any values not set by the environment
self.llm = LLMConfig(**yaml_config.get('llm', {})) self._apply_yaml_fallback(yaml_config)
logger.info("LLMConfig initialized: {}", self.llm.model_dump())
logger.info("Initializing LangfuseConfig") # Initialize Langfuse client if enabled
self.langfuse = LangfuseConfig(**yaml_config.get('langfuse', {})) self.langfuse_client: Optional[Langfuse] = None
logger.info("LangfuseConfig initialized: {}", self.langfuse.model_dump())
logger.info("Validating configuration")
self._validate()
logger.info("Starting config watcher")
self._start_watcher()
logger.info("Initializing Langfuse")
self._init_langfuse()
logger.info("Configuration initialized successfully")
except Exception as e:
logger.error("Configuration initialization failed: {}", e)
logger.error("Current configuration state:")
logger.error("LogConfig: {}", self.log.model_dump() if hasattr(self, 'log') else 'Not initialized')
logger.error("LLMConfig: {}", self.llm.model_dump() if hasattr(self, 'llm') else 'Not initialized')
logger.error("LangfuseConfig: {}", self.langfuse.model_dump() if hasattr(self, 'langfuse') else 'Not initialized')
raise
def _load_yaml_config(self):
config_path = Path('/root/development/jira-webhook-llm/config/application.yml')
if not config_path.exists():
logger.warning("Configuration file not found at {}", config_path)
return {}
try:
with open(config_path, 'r') as f:
return yaml.safe_load(f) or {}
except Exception as e:
logger.error("Error loading configuration from {}: {}", config_path, e)
return {}
def _validate(self):
logger.info("LLM mode set to: '{}'", self.llm.mode)
if self.llm.mode == 'openai':
if not self.llm.openai_api_key:
raise ValueError("LLM mode is 'openai', but OPENAI_API_KEY is not set.")
if not self.llm.openai_api_base_url:
raise ValueError("LLM mode is 'openai', but OPENAI_API_BASE_URL is not set.")
if not self.llm.openai_model:
raise ValueError("LLM mode is 'openai', but OPENAI_MODEL is not set.")
elif self.llm.mode == 'ollama':
if not self.llm.ollama_base_url:
raise ValueError("LLM mode is 'ollama', but OLLAMA_BASE_URL is not set.")
if not self.llm.ollama_model:
raise ValueError("LLM mode is 'ollama', but OLLAMA_MODEL is not set.")
logger.info("Configuration validated successfully.")
def _init_langfuse(self):
if self.langfuse.enabled: if self.langfuse.enabled:
try: if self.langfuse.secret_key and self.langfuse.public_key and self.langfuse.host:
# Verify all required credentials are present
if not all([self.langfuse.public_key, self.langfuse.secret_key, self.langfuse.host]):
raise ValueError("Missing required Langfuse credentials")
logger.debug("Initializing Langfuse client with:")
logger.debug("Public Key: {}", self.langfuse.public_key)
logger.debug("Secret Key: {}", self.langfuse.secret_key)
logger.debug("Host: {}", self.langfuse.host)
# Initialize Langfuse client
self.langfuse_client = Langfuse( self.langfuse_client = Langfuse(
public_key=self.langfuse.public_key, public_key=self.langfuse.public_key,
secret_key=self.langfuse.secret_key, secret_key=self.langfuse.secret_key,
host=self.langfuse.host host=self.langfuse.host
) )
else:
_logger.warning("Langfuse is enabled but missing one or more of LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, or LANGFUSE_HOST. Langfuse client will not be initialized.")
# Test Langfuse connection
try:
self.langfuse_client.auth_check()
logger.debug("Langfuse connection test successful")
except Exception as e:
logger.error("Langfuse connection test failed: {}", e)
raise
# Initialize CallbackHandler
try:
self.langfuse_handler = CallbackHandler(
public_key=self.langfuse.public_key,
secret_key=self.langfuse.secret_key,
host=self.langfuse.host
)
except TypeError:
try:
# Fallback for older versions of langfuse.langchain.CallbackHandler
self.langfuse_handler = CallbackHandler(
public_key=self.langfuse.public_key,
host=self.langfuse.host
)
logger.warning("Using fallback CallbackHandler initialization - secret_key parameter not supported")
except TypeError:
# Fallback for even older versions
self.langfuse_handler = CallbackHandler(
public_key=self.langfuse.public_key
)
logger.warning("Using minimal CallbackHandler initialization - only public_key parameter supported")
logger.info("Langfuse client and handler initialized successfully")
logger.info("Langfuse client and handler initialized successfully")
except ValueError as e:
logger.warning("Langfuse configuration error: {}. Disabling Langfuse.", e)
self.langfuse.enabled = False
except Exception as e:
logger.error("Failed to initialize Langfuse: {}", e)
self.langfuse.enabled = False
def _start_watcher(self):
def watch_config():
for changes in watch('config/application.yml'):
for change in changes:
if change[0] == Change.modified:
logger.info("Configuration file modified, reloading settings...")
try:
# Reload YAML config and re-initialize all settings
yaml_config = self._load_yaml_config()
self.log = LogConfig(**yaml_config.get('log', {}))
self.llm = LLMConfig(**yaml_config.get('llm', {}))
self.langfuse = LangfuseConfig(**yaml_config.get('langfuse', {}))
self._validate() self._validate()
self._init_langfuse() # Re-initialize Langfuse client if needed
logger.info("Configuration reloaded successfully")
except Exception as e: except Exception as e:
logger.error("Error reloading configuration: {}", e) print(f"Configuration initialization failed: {e}")
Thread(target=watch_config, daemon=True).start()
# Create a single, validated instance of the settings to be imported by other modules.
try:
settings = Settings()
except ValueError as e:
logger.error("FATAL: {}", e)
logger.error("Application shutting down due to configuration error.")
sys.exit(1) sys.exit(1)
def _apply_yaml_fallback(self, yaml_config: dict):
"""Applies YAML config values as a fallback to settings not set by environment."""
# --- LLM Configuration ---
llm_yaml_config = yaml_config.get('llm', {})
if llm_yaml_config:
# Flatten nested YAML structure to match LLMConfig fields
flat_llm_yaml = {
'mode': llm_yaml_config.get('mode'),
**{f'openai_{k}': v for k, v in (llm_yaml_config.get('openai') or {}).items()},
**{f'ollama_{k}': v for k, v in (llm_yaml_config.get('ollama') or {}).items()},
**{f'gemini_{k}': v for k, v in (llm_yaml_config.get('gemini') or {}).items()}
}
for field_name in self.llm.model_fields:
if field_name not in self.llm.model_fields_set:
yaml_value = flat_llm_yaml.get(field_name)
if yaml_value is not None:
setattr(self.llm, field_name, yaml_value)
# --- Processor Configuration ---
processor_yaml_config = yaml_config.get('processor', {})
if processor_yaml_config:
for field_name in self.processor.model_fields:
if field_name not in self.processor.model_fields_set:
yaml_value = processor_yaml_config.get(field_name)
if yaml_value is not None:
setattr(self.processor, field_name, yaml_value)
# --- Langfuse Configuration ---
langfuse_yaml_config = yaml_config.get('langfuse', {})
if langfuse_yaml_config:
for field_name in self.langfuse.model_fields:
if field_name not in self.langfuse.model_fields_set:
yaml_value = langfuse_yaml_config.get(field_name)
if yaml_value is not None:
setattr(self.langfuse, field_name, yaml_value)
def _load_yaml_config(self):
config_path = Path('config/application.yml')
if not config_path.exists():
return {}
try:
with open(config_path, 'r') as f:
return yaml.safe_load(f) or {}
except Exception as e:
return {}
def _validate(self):
if self.llm.mode == 'openai':
if not self.llm.openai_api_key:
raise ValueError("OPENAI_API_KEY is not set.")
if not self.llm.openai_api_base_url:
raise ValueError("OPENAI_API_BASE_URL is not set.")
if not self.llm.openai_model:
raise ValueError("OPENAI_MODEL is not set.")
elif self.llm.mode == 'ollama':
if not self.llm.ollama_base_url:
raise ValueError("OLLAMA_BASE_URL is not set.")
if not self.llm.ollama_model:
raise ValueError("OLLAMA_MODEL is not set.")
elif self.llm.mode == 'gemini': # New: Add validation for Gemini mode
if not self.llm.gemini_api_key:
raise ValueError("GEMINI_API_KEY is not set.")
if not self.llm.gemini_model:
raise ValueError("GEMINI_MODEL is not set.")
# Create settings instance
settings = Settings()

View File

@ -0,0 +1,77 @@
# Default application configuration
llm:
# The mode to run the application in.
# Can be 'openai' or 'ollama'.
# This can be overridden by the LLM_MODE environment variable.
mode: ollama # Change mode to gemini
# Settings for OpenAI-compatible APIs (like OpenRouter)
openai:
# It's HIGHLY recommended to set this via an environment variable
# instead of saving it in this file.
# Can be overridden by OPENAI_API_KEY
# api_key: "sk-or-v1-..."
# api_key: "your-openai-api-key" # Keep this commented out or set to a placeholder
# Can be overridden by OPENAI_API_BASE_URL
# api_base_url: "https://openrouter.ai/api/v1"
# api_base_url: "https://api.openai.com/v1" # Remove or comment out this line
# Can be overridden by OPENAI_MODEL
# model: "deepseek/deepseek-chat:free"
# model: "gpt-4o" # Keep this commented out or set to a placeholder
# Settings for Gemini
gemini:
# It's HIGHLY recommended to set this via an environment variable
# instead of saving it in this file.
# Can be overridden by GEMINI_API_KEY
api_key: ""
# Can be overridden by GEMINI_MODEL
# model: "gemini-2.5-flash"
model: "gemini-2.5-flash-lite-preview-06-17"
# Can be overridden by GEMINI_API_BASE_URL
api_base_url: "https://generativelanguage.googleapis.com/v1beta/" # Add for Gemini
# Settings for Ollama
ollama:
# Can be overridden by OLLAMA_BASE_URL
base_url: "http://ollama-jira:11434"
# base_url: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
# Can be overridden by OLLAMA_MODEL
# model: "phi4-mini:latest"
# model: "qwen3:1.7b"
# model: "smollm:360m"
# model: "qwen3:0.6b"
model: "qwen3:4b"
# Langfuse configuration for observability and analytics
langfuse:
# Enable or disable Langfuse integration
# Can be overridden by LANGFUSE_ENABLED environment variable
enabled: false
# Langfuse API credentials
# It's HIGHLY recommended to set these via environment variables
# instead of saving them in this file
public_key: "pk-lf-"
secret_key: "sk-lf-"
# host: "https://cloud.langfuse.com"
# host: "http://192.168.0.122:3000"
# Processor configuration
processor:
# Interval in seconds between polling for new Jira analysis requests
# Can be overridden by PROCESSOR_POLL_INTERVAL_SECONDS environment variable
poll_interval_seconds: 10
# Maximum number of retries for failed Jira analysis requests
# Can be overridden by PROCESSOR_MAX_RETRIES environment variable
max_retries: 0
# Initial delay in seconds before the first retry attempt (exponential backoff)
# Can be overridden by PROCESSOR_INITIAL_RETRY_DELAY_SECONDS environment variable
initial_retry_delay_seconds: 60

View File

@ -0,0 +1,77 @@
# Default application configuration
llm:
# The mode to run the application in.
# Can be 'openai' or 'ollama'.
# This can be overridden by the LLM_MODE environment variable.
mode: ollama # Change mode to gemini
# Settings for OpenAI-compatible APIs (like OpenRouter)
openai:
# It's HIGHLY recommended to set this via an environment variable
# instead of saving it in this file.
# Can be overridden by OPENAI_API_KEY
# api_key: "sk-or-v1-..."
# api_key: "your-openai-api-key" # Keep this commented out or set to a placeholder
# Can be overridden by OPENAI_API_BASE_URL
# api_base_url: "https://openrouter.ai/api/v1"
# api_base_url: "https://api.openai.com/v1" # Remove or comment out this line
# Can be overridden by OPENAI_MODEL
# model: "deepseek/deepseek-chat:free"
# model: "gpt-4o" # Keep this commented out or set to a placeholder
# Settings for Gemini
gemini:
# It's HIGHLY recommended to set this via an environment variable
# instead of saving it in this file.
# Can be overridden by GEMINI_API_KEY
api_key: ""
# Can be overridden by GEMINI_MODEL
# model: "gemini-2.5-flash"
model: "gemini-2.5-flash-lite-preview-06-17"
# Can be overridden by GEMINI_API_BASE_URL
api_base_url: "https://generativelanguage.googleapis.com/v1beta/" # Add for Gemini
# Settings for Ollama
ollama:
# Can be overridden by OLLAMA_BASE_URL
base_url: "http://192.168.0.122:11434"
# base_url: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
# Can be overridden by OLLAMA_MODEL
# model: "phi4-mini:latest"
# model: "qwen3:1.7b"
# model: "smollm:360m"
# model: "qwen3:0.6b"
model: "qwen3:4b"
# Langfuse configuration for observability and analytics
langfuse:
# Enable or disable Langfuse integration
# Can be overridden by LANGFUSE_ENABLED environment variable
enabled: true
# Langfuse API credentials
# It's HIGHLY recommended to set these via environment variables
# instead of saving them in this file
public_key: "pk-lf-38238bc3-ffa1-48d3-8c8f-2048ecf8cc54"
secret_key: "sk-lf-36f6bd37-b0e2-4656-9c63-fe98915e8872"
# host: "https://cloud.langfuse.com"
host: "http://192.168.0.122:3000"
# Processor configuration
processor:
# Interval in seconds between polling for new Jira analysis requests
# Can be overridden by PROCESSOR_POLL_INTERVAL_SECONDS environment variable
poll_interval_seconds: 10
# Maximum number of retries for failed Jira analysis requests
# Can be overridden by PROCESSOR_MAX_RETRIES environment variable
max_retries: 0
# Initial delay in seconds before the first retry attempt (exponential backoff)
# Can be overridden by PROCESSOR_INITIAL_RETRY_DELAY_SECONDS environment variable
initial_retry_delay_seconds: 60

View File

@ -3,42 +3,75 @@ llm:
# The mode to run the application in. # The mode to run the application in.
# Can be 'openai' or 'ollama'. # Can be 'openai' or 'ollama'.
# This can be overridden by the LLM_MODE environment variable. # This can be overridden by the LLM_MODE environment variable.
mode: ollama mode: ollama # Change mode to gemini
# Settings for OpenAI-compatible APIs (like OpenRouter) # Settings for OpenAI-compatible APIs (like OpenRouter)
openai: openai:
# It's HIGHLY recommended to set this via an environment variable # It's HIGHLY recommended to set this via an environment variable
# instead of saving it in this file. # instead of saving it in this file.
# Can be overridden by OPENAI_API_KEY # Can be overridden by OPENAI_API_KEY
api_key: "sk-or-v1-..." # api_key: "sk-or-v1-..."
# api_key: "your-openai-api-key" # Keep this commented out or set to a placeholder
# Can be overridden by OPENAI_API_BASE_URL # Can be overridden by OPENAI_API_BASE_URL
api_base_url: "https://openrouter.ai/api/v1" # api_base_url: "https://openrouter.ai/api/v1"
# api_base_url: "https://api.openai.com/v1" # Remove or comment out this line
# Can be overridden by OPENAI_MODEL # Can be overridden by OPENAI_MODEL
model: "deepseek/deepseek-chat:free" # model: "deepseek/deepseek-chat:free"
# model: "gpt-4o" # Keep this commented out or set to a placeholder
# Settings for Gemini
gemini:
# It's HIGHLY recommended to set this via an environment variable
# instead of saving it in this file.
# Can be overridden by GEMINI_API_KEY
api_key: ""
# Can be overridden by GEMINI_MODEL
# model: "gemini-2.5-flash"
model: "gemini-2.5-flash-lite-preview-06-17"
# Can be overridden by GEMINI_API_BASE_URL
api_base_url: "https://generativelanguage.googleapis.com/v1beta/" # Add for Gemini
# Settings for Ollama # Settings for Ollama
ollama: ollama:
# Can be overridden by OLLAMA_BASE_URL # Can be overridden by OLLAMA_BASE_URL
base_url: "http://192.168.0.140:11434" base_url: "http://ollama-jira:11434"
# base_url: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama" # base_url: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
# Can be overridden by OLLAMA_MODEL # Can be overridden by OLLAMA_MODEL
model: "phi4-mini:latest" # model: "phi4-mini:latest"
# model: "qwen3:1.7b" # model: "qwen3:1.7b"
# model: "smollm:360m" # model: "smollm:360m"
# model: "qwen3:0.6b" # model: "qwen3:0.6b"
model: "qwen3:4b"
# Langfuse configuration for observability and analytics # Langfuse configuration for observability and analytics
langfuse: langfuse:
# Enable or disable Langfuse integration # Enable or disable Langfuse integration
# Can be overridden by LANGFUSE_ENABLED environment variable # Can be overridden by LANGFUSE_ENABLED environment variable
enabled: true enabled: false
# Langfuse API credentials # Langfuse API credentials
# It's HIGHLY recommended to set these via environment variables # It's HIGHLY recommended to set these via environment variables
# instead of saving them in this file # instead of saving them in this file
public_key: "pk-lf-17dfde63-93e2-4983-8aa7-2673d3ecaab8" public_key: "pk-lf-"
secret_key: "sk-lf-ba41a266-6fe5-4c90-a483-bec8a7aaa321" secret_key: "sk-lf-"
host: "https://cloud.langfuse.com" # host: "https://cloud.langfuse.com"
# host: "http://192.168.0.122:3000"
# Processor configuration
processor:
# Interval in seconds between polling for new Jira analysis requests
# Can be overridden by PROCESSOR_POLL_INTERVAL_SECONDS environment variable
poll_interval_seconds: 10
# Maximum number of retries for failed Jira analysis requests
# Can be overridden by PROCESSOR_MAX_RETRIES environment variable
max_retries: 0
# Initial delay in seconds before the first retry attempt (exponential backoff)
# Can be overridden by PROCESSOR_INITIAL_RETRY_DELAY_SECONDS environment variable
initial_retry_delay_seconds: 60

89
data/DCR_mappings_ALL.md Normal file
View File

@ -0,0 +1,89 @@
The `/dcr` API serves as a canonical interface for initiating data change requests. The JSON payload sent to this endpoint contains a standardized representation of the desired changes. The MDM HUB is then responsible for validating this request and mapping its attributes to the distinct models of the target systems.
***
### **`/dcr` API to Target System Attribute Mapping**
The following sections break down the mapping from the attributes in the `/dcr` request payload to their corresponding fields in OneKey and Veeva.
---
#### **1. Request-Level Attributes**
These attributes are defined at the top level of each object within the `DCRRequests` array in the JSON payload.
| HUB /dcr Attribute | OneKey Mapping | Veeva OpenData (VOD) Mapping | Notes / Description |
| :--- | :--- | :--- | :--- |
| **`extDCRRequestId`** | Used to populate the `DCRID` in the Reltio DCR tracking entity. OneKey's `validation.clientRequestId` is typically a HUB-generated internal ID, but `extDCRRequestId` is the primary key for client-side tracking. | **`dcr_key`** (in all CSV files: `change_request.csv`, `change_request_hco.csv`, etc.) | **This is the primary external identifier for the entire DCR.** It is crucial for clients like PforceRx to track the request's status and is used as the main correlation ID across all systems and files. |
| **`extDCRComment`** | **`validation.requestComment`** | **`description`** (in `change_request.csv`) | A free-text field for the requester to provide context or justification for the DCR. For OneKey, it has a special function: due to API limitations, requests to **remove** an attribute are specified in this comment field (e.g., "Please remove attributes: [Address: ...]"). |
| **`country`** | **`isoCod2`** | **`primary_country__v`** (in `change_request_hcp.csv` and `change_request_hco.csv`) and **`country__v`** (in `change_request_address.csv`) | The mandatory two-letter ISO country code. This is a critical routing attribute that determines which validator instance to use and, for Veeva, which S3/SFTP directory the DCR files are placed in. |
| **`action`** (within HCP/HCO object) | This determines the logic. `update` and `insert` map to a `submitVR` call. An `update` requires a valid `individual.individualEid` or `workplace.workplaceEid`. `delete` is handled by updating the entity's end date in Reltio. | **`change_request_type`** (in `change_request.csv`). Mapped to **`ADD_REQUEST`** for an `insert` action, and **`CHANGE_REQUEST`** for an `update` action. | This defines the fundamental operation being performed on the entity. |
| **`refId`** (within HCP/HCO object) | Used to query Reltio to find the OneKey crosswalk (`individual.individualEid` or `workplace.workplaceEid`) which is mandatory for update operations. | Used to query Reltio to find the Veeva crosswalk (`vid__v`), which is mandatory for update operations. The Reltio URI from the `refId` is also used to populate the **`entity_key`** field in the VOD CSVs. | This object contains the necessary identifiers (`CrosswalkTargetObjectId`, `EntityURITargetObjectId`, etc.) to locate the target entity in Reltio for an update or delete operation. |
---
#### **2. Healthcare Professional (HCP) Attributes**
These attributes are provided within the `HCP` object of a DCR request.
| HUB /dcr Attribute | OneKey Mapping | Veeva OpenData (VOD) Mapping | Notes / Description |
| :--- | :--- | :--- | :--- |
| **`firstName`** | `individual.firstName` | `first_name__v` | HCP's first name. |
| **`lastName`** | `individual.lastName` | `last_name__v` | HCP's last name. Mandatory for creating a new HCP in OneKey. |
| **`middleName`** | `individual.middleName` | `middle_name__v` | HCP's middle name. |
| **`prefix`** | `individual.prefixNameCode` | `prefix__v` | Name prefix (e.g., Mr., Dr.). Requires a lookup from the canonical code (`HCPPrefix`) to the target system's specific code. |
| **`title`** | `individual.titleCode` | `professional_title__v` | Professional title. Requires a lookup from the canonical code (`HCPTitle` or `HCPProfessionalTitle`) to the target system's specific code. |
| **`gender`** | `individual.genderCode` | `gender__v` | HCP's gender. Requires a lookup to map the canonical value (e.g., M, F) to the target system's code. |
| **`subTypeCode`** | `individual.typeCode` | `hcp_type__v` | The professional subtype of the HCP (e.g., Physician, Nurse). Requires a lookup from the canonical code (`HCPSubTypeCode` or `HCPType`). |
| **`specialties`** (List) | `individual.speciality1`, `individual.speciality2`, `individual.speciality3` | `specialty_1__v` through `specialty_10__v` | The list of specialties is **flattened**. OneKey accepts up to 3 ranked specialties. Veeva accepts up to 10. A lookup is required to map the canonical `HCPSpecialty` code to the target system's value. |
| **`emails`** (List) | `individual.email` | `email_1__v`, `email_2__v` | The list of emails is flattened. OneKey typically takes the highest-ranked email. Veeva takes the top two. |
| **`phones`** (List) | `individual.mobilePhone` | `phone_1__v` to `phone_3__v` (for office), `fax_1__v` to `fax_2__v` (for fax) | The list is filtered by type and ranked. OneKey maps the highest-ranked to `mobilePhone`. Veeva separates numbers into distinct `phone` and `fax` columns based on the Reltio phone type. |
---
#### **3. Healthcare Organization (HCO) Attributes**
These attributes are provided within the `HCO` object of a DCR request.
| HUB /dcr Attribute | OneKey Mapping | Veeva OpenData (VOD) Mapping | Notes / Description |
| :--- | :--- | :--- | :--- |
| **`name`** | `workplace.usualName` / `workplace.officialName` | `corporate_name__v` | The primary, official name of the organization. |
| **`otherNames`** (List) | `workplace.usualName2` | `alternate_name_1__v` | The list of alternative names is flattened. Both systems typically take the first or highest-ranked alternative name. |
| **`subTypeCode`** | `workplace.typeCode` | `major_class_of_trade__v` | The HCO's subtype, often representing facility type. Requires a lookup from the canonical code (`COTFacilityType`). |
| **`typeCode`** | Not Mapped | `hco_type__v` | Maps to the HCO Type. Requires a lookup (`HCOType`). The OneKey mapping document indicates this is not used for their system. |
| **`websiteURL`** | `workplace.website` | `URL_1__v` | The official website of the organization. |
| **`specialties`** (List) | `workplace.speciality1` to `speciality3` | `specialty_1__v` to `specialty_10__v` | Similar to HCPs, the list of specialties is flattened and ranked. A lookup from the canonical `COTSpecialty` code is required. |
| **`emails`** (List) | `workplace.email` | `email_1__v`, `email_2__v` | List of emails is flattened, with the highest-ranked ones being used. |
| **`phones`** (List) | `workplace.telephone`, `workplace.fax` | `phone_1__v` to `phone_3__v`, `fax_1__v` to `fax_2__v` | Similar to HCPs, the list is filtered by type (`TEL.OFFICE` vs. `TEL.FAX`) and ranked before mapping. |
---
#### **4. Nested Object Mapping: Addresses**
Address information is provided as a list of objects within the `HCP` or `HCO` DCR payload.
| HUB /dcr `addresses` Attribute | OneKey Mapping | Veeva OpenData (VOD) Mapping | Notes / Description |
| :--- | :--- | :--- | :--- |
| **(Address Object)** | Mapped to a single `address` complex object in the JSON request. Only the primary address is sent. | Each address object is mapped to a **separate row** in the **`change_request_address.csv`** file. | OneKey's API takes a single address per DCR, while Veeva's file-based approach can handle multiple address changes in one DCR. |
| **`refId`** | Used to match and update an existing address. | **`address_key`** | This is the `PfizerAddressID`, a unique identifier for the address record in Reltio. |
| **`addressLine1`** | `address.longLabel` or `address.addressLine1` | `address_line_1__v` | The first line of the street address. |
| **`addressLine2`** | `address.longLabel2` or `address.addressLine2` | `address_line_2__v` | The second line of the street address. |
| **`city`** | `address.city` | `locality__v` | The city name. |
| **`stateProvince`** | `address.countyCode` | `administrative_area__v` | The state or province. Requires a lookup from the canonical value to the target system's code. |
| **`zip`** | `address.longPostalCode` or `address.Zip5` | `postal_code__v` | The postal or ZIP code. |
| **`addressType`** | Not explicitly mapped in `submitVR` request, but used in logic. | `address_type__v` | The type of address (e.g., Office, Home). Requires a lookup (`AddressType`). |
---
#### **5. Nested Object Mapping: Affiliations**
Affiliations are provided as a list of objects (`contactAffiliations` for HCP, `otherHCOAffiliations` for HCO) in the DCR payload.
| HUB /dcr `affiliations` Attribute | OneKey Mapping | Veeva OpenData (VOD) Mapping | Notes / Description |
| :--- | :--- | :--- | :--- |
| **(Affiliation Object)** | The `refId` of the target HCO is used to find its `workplace.workplaceEid`, which is then sent as part of the HCP or parent HCO record. | Each affiliation object is mapped to a **separate row** in the **`change_request_parenthco.csv`** file. | The mapping for affiliations is fundamentally different. OneKey handles it as an attribute of the child entity, while Veeva treats it as a distinct relationship record. |
| **Relation `refId`** | The `workplace.workplaceEid` of the target (parent) HCO is populated in the request. | **`parenthco_key`** (populated with the Reltio `relationUri`) | Veeva requires a unique key for the relationship itself. |
| **Start/Child Object** | The main body of the request contains the child's details (e.g., the HCP). | **`child_entity_key`** | This field is populated with the `entity_key` of the child entity (e.g., the HCP). |
| **End/Parent Object** | The `workplace.workplaceEid` of the parent HCO is sent. | **`parent_entity_key`** | This field is populated with the `entity_key` of the parent entity (e.g., the HCO). |
| **`type`** | `activity.role` (requires lookup, e.g., `TIH.W.*`) | **`relationship_type__v`** (requires lookup from `RelationType`) | The type or role of the affiliation (e.g., Employed, Manages). |
| **`primary`** | Not explicitly mapped. | **`is_primary_relationship__v`** | A boolean flag (`Y`/`N`) indicating if this is the primary affiliation. |

View File

@ -0,0 +1,191 @@
### **1. Overall DCR Process Overview**
* **Purpose of DCR Process:**
The Data Change Request (DCR) process is designed to improve and validate the quality of existing data in source systems. It provides a formal mechanism for proposing, validating, and applying data changes.
* **General DCR Process Flow:**
1. A source system creates a proposal for a data change, known as a DCR or Validation Request (VR).
2. The MDM HUB routes this request to the appropriate validation channel.
3. Validation is performed either by internal Data Stewards (DS) within Reltio or by external, third-party validator services like OneKey or Veeva OpenData.
4. A response is sent back, which includes metadata about the DCR's status (e.g., accepted, rejected) and the actual data profile update (payload) resulting from the processed DCR.
* **High-Level Solution Architecture:**
The architecture involves source systems initiating DCRs through the **MDM HUB**. The HUB acts as a central router, directing requests to either **Reltio** for internal data steward review or to **Third-Party Validators** (OneKey, Veeva). The HUB is also responsible for receiving responses and facilitating the update of data in Reltio, often through ETL processes that handle payload delivery (e.g., via S3).
***
### **2. System-Specific DCR Implementations**
Here are the details for each system involved in the DCR process.
---
#### **OneKey (OK)**
* **Role in DCR Process:** Functions as a third-party validator for DCRs. It receives validation requests, processes them, and returns the results.
* **Key Components:** DCR Service 2, OK DCR Service, OneKey Adapter, Publisher, Hub Store (Mongo DB), Manager (Reltio Adapter).
* **Actors Involved:** PforceRX, Data Stewards (in Reltio), Reltio, HUB, OneKey.
* **Core Process Details:**
* **PforceRX-initiated:** DCRs are created via the HUB's API. The HUB integrates with OneKey's API to submit requests (`/vr/submit`) and periodically checks for status updates (`/vr/trace`).
* **Reltio-initiated:** Data Stewards can suggest changes in Reltio and use the "Send to Third Party Validation" feature, which triggers a flow to submit a validation request to OneKey. Singleton entities created in Reltio can also trigger an automatic validation request to OneKey.
* **Integration Methods:**
* **API:** Real-time integration with OneKey via REST APIs (`/vr/submit`, `/vr/trace`).
* **File Transfer:** Data profile updates (payload) are delivered back to Reltio via CSV files on an S3 bucket, which are then processed by an ETL job.
* **DCR Types Handled:** Create, update, delete operations for HCP/HCO profiles; validation of newly created singleton entities; changes suggested by Data Stewards.
---
#### **Veeva OpenData (VOD)**
* **Role in DCR Process:** Functions as a third-party validator, primarily handling DCRs initiated by Data Stewards from within Reltio.
* **Key Components:** DCR Service 2, Veeva DCR Service, Veeva Adapter, GMTF (Global Master Template & Foundation) jobs.
* **Actors Involved:** Data Stewards (in Reltio), HUB, Veeva OpenData.
* **Core Process Details:**
1. Data Stewards in Reltio create DCRs using the "Suggest / Send to 3rd Party Validation" functionality.
2. The HUB stores these requests in a Mongo collection (`DCRRegistryVeeva`).
3. A scheduled job gathers these DCRs, packages them into ZIP files containing multiple CSVs, and places them on an S3 bucket.
4. Files are synchronized from S3 to Veeva's SFTP server in batches (typically every 24 hours).
5. Veeva processes the files and returns response files to an inbound S3 directory, which the HUB traces to update DCR statuses.
* **Integration Methods:**
* **File Transfer:** Asynchronous, batch-based communication via ZIP/CSV files exchanged through S3 and SFTP.
* **DCR Types Handled:** Primarily handles changes suggested by Data Stewards for existing profiles that need external validation from Veeva.
---
#### **IQVIA Highlander (HL) - *Decommissioned April 2025***
* **Role in DCR Process:** Acted as a wrapper to translate DCRs from a Veeva format into a format that could be loaded into Reltio for Data Steward review.
* **Key Components:** DCR Service (first version), IQVIA DCR Wrapper.
* **Actors Involved:** Veeva (on behalf of PforceRX), Reltio, HUB, IQVIA wrapper, Data Stewards.
* **Core Process Details:**
1. Veeva uploaded DCR requests as CSV files to an FTP location.
2. The HUB translated the Veeva CSV format into the IQVIA wrapper's CSV format.
3. The IQVIA wrapper processed this file and created DCRs directly in Reltio.
4. Data Stewards would then review, approve, or reject these DCRs within Reltio.
* **Integration Methods:**
* **File Transfer:** Communication was entirely file-based via S3 and SFTP.
* **DCR Types Handled:** Aggregated 21 specific use cases into six generic types: `NEW_HCP_GENERIC`, `UPDATE_HCP_GENERIC`, `DELETE_HCP_GENERIC`, `NEW_HCO_GENERIC`, `UPDATE_HCO_GENERIC`, `DELETE_HCO_GENERIC`.
***
### **3. Key DCR Operations and Workflows**
---
#### **Create DCR**
* **Description:** This is the main entry point for clients like PforceRx to create DCRs. The process validates the request, routes it to the correct target system (Reltio, OneKey, or Veeva), and creates the DCR.
* **Triggers:** An API call to `POST /dcr`.
* **Detailed Steps:**
1. The DCR service receives and validates the request (e.g., checks for duplicate IDs, existence of referenced objects for updates).
2. It uses a decision table to determine the target system based on attributes like country, source, and operation type.
3. It calls the appropriate internal method to create the DCR in the target system (Reltio, OneKey, or Veeva).
4. A corresponding DCR tracking entity is created in Reltio, and the state is saved in the Mongo DCR Registry.
5. For Reltio-targeted DCRs, a workflow is initiated for Data Steward review.
6. Pre-close logic may be applied to auto-accept or auto-reject the DCR based on the country.
* **Decision Logic/Rules:** A configurable decision table routes DCRs based on `userName`, `sourceName`, `country`, `operationType`, `affectedAttributes`, and `affectedObjects`.
---
#### **Submit Validation Request**
* **Description:** This process submits validation requests for newly created "singleton" entities in Reltio to the OneKey service.
* **Triggers:** Reltio events (e.g., `HCP_CREATED`, `HCO_CREATED`) are aggregated in a time window (e.g., 4 hours).
* **Detailed Steps:**
1. After an event aggregation window closes, the process performs several checks (e.g., entity is active, no existing OneKey crosswalk, no potential matches found via Reltio's `getMatches` API).
2. If all checks pass, the entity data is mapped to a OneKey `submitVR` request.
3. The request is sent to OneKey via `POST /vr/submit`.
4. A DCR entity is created in Reltio to track the status, and the request is logged in Mongo.
---
#### **Trace Validation Request**
* **Description:** This scheduled process checks the status of pending validation requests that have been sent to an external validator like OneKey or Veeva.
* **Triggers:** A timed scheduler (cron job) that runs every N hours.
* **Detailed Steps (OneKey Example):**
1. The process queries the Mongo DCR cache for requests with a `SENT` status.
2. For each request, it calls the OneKey `POST /vr/trace` API.
3. It evaluates the `processStatus` and `responseStatus` from the OneKey response.
4. If the request is resolved (`VAS_FOUND`, `VAS_NOT_FOUND`, etc.), the DCR status in Reltio and Mongo is updated to `ACCEPTED` or `REJECTED`.
5. If the response indicates a match was found but an OK crosswalk doesn't yet exist in Reltio, a new workflow is triggered for Data Steward manual review (`DS_ACTION_REQUIRED`).
---
#### **Data Steward Response**
* **Description:** This process handles the final outcome of a DCR that was reviewed internally by a Data Steward in Reltio.
* **Triggers:** Reltio change request events (`CHANGE_REQUEST_CHANGED`, `CHANGE_REQUEST_REMOVED`) that **do not** have the `ThirdPartyValidation` flag.
* **Detailed Steps:**
1. The process consumes the event from Reltio.
2. It checks the `state` of the change request.
3. If the state is `APPLIED` or `REJECTED`, the corresponding DCR entity in Reltio and the record in Mongo are updated to a final status of `ACCEPTED` or `REJECTED`.
---
#### **Data Steward OK Validation Request**
* **Description:** This process handles DCRs created by a Data Steward in Reltio using the "Suggest" and "Send to Third Party Validation" features, routing them to an external validator like OneKey.
* **Triggers:** Reltio change request events that **do** have the `ThirdPartyValidation` flag set to `true`.
* **Detailed Steps:**
1. The HUB retrieves the "preview" state of the entity from Reltio to see the suggested changes.
2. It compares the current entity with the preview to calculate the delta.
3. It maps these changes to a OneKey `submitVR` request. Attribute removals are sent as a comment due to API limitations.
4. The request is sent to OneKey.
5. Upon successful submission, the original change request in Reltio is programmatically rejected (since the validation is now happening externally), and a new DCR entity is created for tracking the OneKey validation.
***
### **4. Data Comparison and Mapping Details**
* **OneKey Comparator:**
When a Data Steward suggests changes, the HUB compares the current Reltio entity with the "preview" state to send to OneKey.
* **Simple Attributes** (e.g., `FirstName`): Values are compared for equality. The suggested value is taken if different.
* **Complex Attributes** (e.g., `Addresses`, `Specialties`): Nested attributes are matched using their Reltio URI. New nested objects are added, and changes to existing ones are applied.
* **Mandatory Fields:** For HCP, `LastName` and `Country` are mandatory. For HCO, `Country` and `Addresses` are mandatory.
* **Attribute Removal:** Due to API limitations, removing an attribute is not done directly but by generating a comment in the request, e.g., "Please remove attributes: [Address: ...]".
* **Veeva Mapping:**
The process of mapping Reltio canonical codes to Veeva's source-specific codes is multi-layered.
1. **Veeva Defaults:** The system first checks custom CSV mapping files stored in configuration (`mdm-veeva-dcr-service/defaults`). These files define direct mappings for a specific country and canonical code (e.g., `IN;SP.PD;PD`).
2. **RDM Lookups:** If no default is found, it queries RDM (via a Mongo `LookupValues` collection) for the canonical code and looks for a `sourceMapping` where the source is "VOD".
3. **Veeva Fallback:** If no mapping is found, it consults fallback CSV files (`mdm-veeva-dcr-service/fallback`) for certain attributes (e.g., `hco-specialty.csv`). A regular expression is often used to extract the correct code. If all else fails, a question mark (`?`) is used as the default fallback.
***
### **5. Status Management and Error Handling**
* **DCR Statuses:**
The system uses a combination of statuses to track the DCR lifecycle.
| RequestStatus | DCRStatus | Internal Cache Status | Description |
| :--- | :--- | :--- | :--- |
| REQUEST\_ACCEPTED | CREATED | SENT\_TO\_OK | DCR sent to OneKey, pending DS review. |
| REQUEST\_ACCEPTED | CREATED | SENT\_TO\_VEEVA | DCR sent to Veeva, pending DS review. |
| REQUEST\_ACCEPTED | CREATED | DS\_ACTION\_REQUIRED | DCR pending internal DS validation in Reltio. |
| REQUEST\_ACCEPTED | ACCEPTED | ACCEPTED | DS accepted the DCR; changes were applied. |
| REQUEST\_ACCEPTED | ACCEPTED | PRE\_ACCEPTED | Pre-close logic automatically accepted the DCR. |
| REQUEST\_REJECTED | REJECTED | REJECTED | DS rejected the DCR. |
| REQUEST\_REJECTED | REJECTED | PRE\_REJECTED | Pre-close logic automatically rejected the DCR. |
| REQUEST\_FAILED | - | FAILED | DCR failed due to a validation or system error. |
* **Error Codes:**
| Error Code | Description | HTTP Code |
| :--- | :--- | :--- |
| `DUPLICATE_REQUEST` | The `extDCRRequestId` has already been registered. | 403 |
| `NO_CHANGES_DETECTED` | The request contained no changes compared to the existing entity. | 400 |
| `VALIDATION_ERROR` | A referenced object (HCP/HCO) or attribute could not be found. | 404 / 400 |
* **DCR State Changes:**
A DCR begins in an `OPEN` state. It is then sent to a target system, moving to states like `SENT_TO_OK`, `SENT_TO_VEEVA`, or `DS_ACTION_REQUIRED`. Pre-close logic can immediately move it to `PRE_ACCEPTED` or `PRE_REJECTED`. Following data steward review (either internal or external), the DCR reaches a terminal state of `ACCEPTED` or `REJECTED`. If an error occurs, it moves to `FAILED`.
***
### **6. Technical Artifacts and Infrastructure**
* **Mongo Collections:**
* **`DCRRegistryONEKEY` / `DCRRegistryVeeva`:** System-specific collections to store and track the state of DCRs sent to OneKey and Veeva, respectively. They hold the mapped request data and trace the response.
* **`DCRRequest` / `DCRRegistry`:** General-purpose collections for tracking DCRs, their metadata, and overall status within the HUB.
* **`DCRRegistryVeeva`:** Specifically stores Veeva-bound DCRs, including the raw CSV line content, before they are submitted in a batch.
* **File Specifications:**
* **Veeva Integration:** Uses `ZIP` files containing multiple `CSV` files (`change_request.csv`, `change_request_hcp.csv`, `change_request_address.csv`, etc.). Response files follow a similar pattern and are named with the convention `<country>_DCR_Response_<Date>.zip`.
* **Highlander Integration:** Used `CSV` files for requests.
* **Event Models:**
The system uses internal Kafka events to communicate DCR status changes between components.
* **`OneKeyDCREvent`:** Published by the trace process after receiving a response from OneKey. It contains the DCR ID and the `OneKeyChangeRequest` details (`vrStatus`, `vrStatusDetail`, comments, validated IDs).
* **`VeevaDCREvent`:** Published by the Veeva trace process. It contains the DCR ID and `VeevaChangeRequestDetails` (`vrStatus`, `vrStatusDetail`, comments, new Veeva IDs).

View File

@ -1,14 +1,14 @@
name: jira-webhook-stack name: jira-webhook-stack
services: services:
ollama-jira: ollama-jira:
image: artifactory.pfizer.com/mdmhub-docker-dev/mdmtools/ollama/ollama-preloaded:0.0.1 image: artifactory.pfizer.com/mdmhub-docker-dev/mdmtools/ollama/ollama-preloaded:0.0.2
ports: ports:
- "11434:11434" - "11434:11434"
restart: unless-stopped restart: unless-stopped
# Service for your FastAPI application # Service for your FastAPI application
jira-webhook-llm: jira-webhook-llm:
image: artifactory.pfizer.com/mdmhub-docker-dev/mdmtools/ollama/jira-webhook-llm:0.1.8 image: artifactory.pfizer.com/mdmhub-docker-dev/mdmtools/ollama/jira-webhook-llm:0.2.5
ports: ports:
- "8000:8000" - "8000:8000"
environment: environment:
@ -17,10 +17,12 @@ services:
# Point to the Ollama service within the Docker Compose network # Point to the Ollama service within the Docker Compose network
# 'ollama' is the service name, which acts as a hostname within the network # 'ollama' is the service name, which acts as a hostname within the network
OLLAMA_BASE_URL: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama" # OLLAMA_BASE_URL: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
OLLAMA_BASE_URL: "http://ollama-jira:11434"
# Specify the model to use # Specify the model to use
OLLAMA_MODEL: phi4-mini:latest # OLLAMA_MODEL: phi4-mini:latest
OLLAMA_MODEL: qwen3:4b
# Ensure the Ollama service starts and is healthy before starting the app # Ensure the Ollama service starts and is healthy before starting the app
depends_on: depends_on:
@ -30,4 +32,4 @@ services:
# Command to run your FastAPI application using Uvicorn # Command to run your FastAPI application using Uvicorn
# --host 0.0.0.0 is crucial for the app to be accessible from outside the container # --host 0.0.0.0 is crucial for the app to be accessible from outside the container
# --reload is good for development; remove for production # --reload is good for development; remove for production
command: uvicorn jira-webhook-llm:app --host 0.0.0.0 --port 8000 command: uvicorn main:app --host 0.0.0.0 --port 8000

View File

@ -1,144 +0,0 @@
import os
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from fastapi.responses import JSONResponse
from loguru import logger
import uuid
import sys
from typing import Optional
from datetime import datetime
import asyncio
from functools import wraps
from config import settings
from webhooks.handlers import JiraWebhookHandler
from llm.models import JiraWebhookPayload
from logging_config import configure_logging
# Initialize logging first
configure_logging(log_level="DEBUG")
import signal
try:
app = FastAPI()
logger.info("FastAPI application initialized")
@app.on_event("shutdown")
async def shutdown_event():
"""Handle application shutdown"""
logger.info("Shutting down application...")
try:
# Cleanup Langfuse client if exists
if hasattr(settings, 'langfuse_handler') and hasattr(settings.langfuse_handler, 'close'):
try:
await settings.langfuse_handler.close()
except Exception as e:
logger.warning(f"Error closing handler: {str(e)}")
logger.info("Cleanup completed successfully")
except Exception as e:
logger.error(f"Error during shutdown: {str(e)}")
raise
def handle_shutdown_signal(signum, frame):
"""Handle OS signals for graceful shutdown"""
logger.info(f"Received signal {signum}, initiating shutdown...")
# Exit immediately after cleanup is complete
os._exit(0)
# Register signal handlers
signal.signal(signal.SIGTERM, handle_shutdown_signal)
signal.signal(signal.SIGINT, handle_shutdown_signal)
except Exception as e:
logger.critical(f"Failed to initialize FastAPI: {str(e)}")
logger.warning("Application cannot continue without FastAPI initialization")
sys.exit(1)
def retry(max_retries: int = 3, delay: float = 1.0):
"""Decorator for retrying failed operations"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
last_error = e
logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt < max_retries - 1:
await asyncio.sleep(delay * (attempt + 1))
raise last_error
return wrapper
return decorator
class ErrorResponse(BaseModel):
error_id: str
timestamp: str
status_code: int
message: str
details: Optional[str] = None
@app.middleware("http")
async def error_handling_middleware(request: Request, call_next):
request_id = str(uuid.uuid4())
logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}")
try:
response = await call_next(request)
return response
except HTTPException as e:
logger.error(f"HTTP Error: {e.status_code} - {e.detail}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.utcnow().isoformat(),
status_code=e.status_code,
message=e.detail,
details=str(e)
)
return JSONResponse(status_code=e.status_code, content=error_response.model_dump())
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.utcnow().isoformat(),
status_code=500,
message="Internal Server Error",
details=str(e)
)
return JSONResponse(status_code=500, content=error_response.model_dump())
webhook_handler = JiraWebhookHandler()
@app.post("/jira-webhook")
async def jira_webhook_handler(payload: JiraWebhookPayload):
logger.info(f"Received webhook payload: {payload.model_dump()}")
try:
response = await webhook_handler.handle_webhook(payload)
logger.info(f"Webhook processed successfully")
return response
except Exception as e:
logger.error(f"Error processing webhook: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/test-llm")
async def test_llm():
"""Test endpoint for LLM integration"""
test_payload = JiraWebhookPayload(
issueKey="TEST-123",
summary="Test issue",
description="This is a test issue for LLM integration",
comment="Testing OpenAI integration with Langfuse",
labels=["test"],
status="Open",
assignee="Tester",
updated="2025-07-04T21:40:00Z"
)
return await webhook_handler.handle_webhook(test_payload)
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@ -1,13 +1,24 @@
from typing import Union import json
import sys
from typing import Union, Any # Import Any
from pydantic import SecretStr # Re-import SecretStr
import re # Import re for regex operations
from langchain_core.prompts import (
ChatPromptTemplate,
PromptTemplate,
SystemMessagePromptTemplate,
HumanMessagePromptTemplate,
)
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnablePassthrough, Runnable
from langchain_ollama import OllamaLLM from langchain_ollama import OllamaLLM
from langchain_openai import ChatOpenAI from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate from langchain_google_genai import ChatGoogleGenerativeAI # New import for Gemini
from langchain_core.output_parsers import JsonOutputParser
from loguru import logger from loguru import logger
import sys
from llm.models import AnalysisFlags
from config import settings from config import settings
from .models import AnalysisFlags
class LLMInitializationError(Exception): class LLMInitializationError(Exception):
"""Custom exception for LLM initialization errors""" """Custom exception for LLM initialization errors"""
@ -16,15 +27,14 @@ class LLMInitializationError(Exception):
self.details = details self.details = details
# Initialize LLM # Initialize LLM
llm = None llm: Union[ChatOpenAI, OllamaLLM, ChatGoogleGenerativeAI, None] = None # Add ChatGoogleGenerativeAI
if settings.llm.mode == 'openai': if settings.llm.mode == 'openai':
logger.info(f"Initializing ChatOpenAI with model: {settings.openai_model}") logger.info(f"Initializing ChatOpenAI with model: {settings.llm.openai_model}")
llm = ChatOpenAI( llm = ChatOpenAI(
model=settings.openai_model, model=settings.llm.openai_model if settings.llm.openai_model else "", # Ensure model is str
temperature=0.7, temperature=0.7,
max_tokens=2000, api_key=settings.llm.openai_api_key, # type: ignore # Suppress Pylance error due to SecretStr type mismatch
api_key=settings.openai_api_key, base_url=settings.llm.openai_api_base_url
base_url=settings.openai_api_base_url
) )
elif settings.llm.mode == 'ollama': elif settings.llm.mode == 'ollama':
logger.info(f"Initializing OllamaLLM with model: {settings.llm.ollama_model} at {settings.llm.ollama_base_url}") logger.info(f"Initializing OllamaLLM with model: {settings.llm.ollama_model} at {settings.llm.ollama_base_url}")
@ -43,18 +53,18 @@ elif settings.llm.mode == 'ollama':
llm = OllamaLLM( llm = OllamaLLM(
model=settings.llm.ollama_model, model=settings.llm.ollama_model,
base_url=base_url, base_url=base_url,
streaming=False, num_ctx=32000
timeout=30, # 30 second timeout # Removed streaming, timeout, max_retries as they are not valid parameters for OllamaLLM
max_retries=3
# , # Retry up to 3 times
# temperature=0.1,
# top_p=0.2
) )
# Test connection # Test connection only if not in a test environment
import os
if os.getenv("IS_TEST_ENV") != "true":
logger.debug("Testing Ollama connection...") logger.debug("Testing Ollama connection...")
# llm.invoke("test") # Simple test request llm.invoke("test /no_think ") # Simple test request
logger.info("Ollama connection established successfully") logger.info("Ollama connection established successfully")
else:
logger.info("Skipping Ollama connection test in test environment.")
except Exception as e: except Exception as e:
error_msg = f"Failed to initialize Ollama: {str(e)}" error_msg = f"Failed to initialize Ollama: {str(e)}"
@ -72,24 +82,71 @@ elif settings.llm.mode == 'ollama':
"\n3. The model is available", "\n3. The model is available",
details=details details=details
) from e ) from e
elif settings.llm.mode == 'gemini': # New: Add Gemini initialization
logger.info(f"Initializing ChatGoogleGenerativeAI with model: {settings.llm.gemini_model}")
try:
if not settings.llm.gemini_api_key:
raise ValueError("Gemini API key is not configured")
if not settings.llm.gemini_model:
raise ValueError("Gemini model is not specified")
llm = ChatGoogleGenerativeAI(
model=settings.llm.gemini_model,
temperature=0.7,
google_api_key=settings.llm.gemini_api_key
)
# Test connection only if not in a test environment
import os
if os.getenv("IS_TEST_ENV") != "true":
logger.debug("Testing Gemini connection...")
llm.invoke("test /no_think ") # Simple test request
logger.info("Gemini connection established successfully")
else:
logger.info("Skipping Gemini connection test in test environment.")
except Exception as e:
error_msg = f"Failed to initialize Gemini: {str(e)}"
details = {
'model': settings.llm.gemini_model,
'error_type': type(e).__name__
}
logger.error(error_msg)
logger.debug(f"Connection details: {details}")
raise LLMInitializationError(
"Failed to connect to Gemini service. Please check:"
"\n1. GEMINI_API_KEY is correct"
"\n2. GEMINI_MODEL is correct and accessible"
"\n3. Network connectivity to Gemini API",
details=details
) from e
if llm is None: if llm is None:
logger.error("LLM could not be initialized. Exiting.") logger.error("LLM could not be initialized. Exiting.")
print("\nERROR: Unable to initialize LLM. Check logs for details.", file=sys.stderr) print("\nERROR: Unable to initialize LLM. Check logs for details.", file=sys.stderr)
sys.exit(1) sys.exit(1)
# Ensure llm is treated as a Runnable for chaining
# Cast llm to Any to bypass static type checking for chaining if it's causing issues
llm_runnable: Runnable = llm # type: ignore
# Set up Output Parser for structured JSON # Set up Output Parser for structured JSON
parser = JsonOutputParser(pydantic_object=AnalysisFlags) parser = JsonOutputParser()
# Load prompt template from file # Load prompt template from file
def load_prompt_template(version="v1.1.0"): def load_prompt_template(version="v1.3.0"):
try: try:
with open(f"llm/prompts/jira_analysis_{version}.txt", "r") as f: with open(f"llm/jira_analysis_{version}.txt", "r") as f:
template_content = f.read() template_content = f.read()
# Split system and user parts # Split system and user parts
if "\n\nUSER:\n" in template_content:
system_template, user_template = template_content.split("\n\nUSER:\n") system_template, user_template = template_content.split("\n\nUSER:\n")
system_template = system_template.replace("SYSTEM:\n", "").strip() system_template = system_template.replace("SYSTEM:\n", "").strip()
else:
# Handle legacy format
system_template = template_content
user_template = "Analyze this Jira ticket: {issueKey}"
return ChatPromptTemplate.from_messages([ return ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template(system_template), SystemMessagePromptTemplate.from_template(system_template),
@ -102,13 +159,35 @@ def load_prompt_template(version="v1.1.0"):
# Fallback prompt template # Fallback prompt template
FALLBACK_PROMPT = ChatPromptTemplate.from_messages([ FALLBACK_PROMPT = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template( SystemMessagePromptTemplate.from_template(
"Analyze Jira tickets and output JSON with hasMultipleEscalations, customerSentiment" "Analyze Jira tickets and output JSON with hasMultipleEscalations"
), ),
HumanMessagePromptTemplate.from_template( HumanMessagePromptTemplate.from_template(
"Issue Key: {issueKey}\nSummary: {summary}" "Issue Key: {issueKey}\nSummary: {summary}"
) )
]) ])
# Helper function to extract JSON from a string that might contain other text
def extract_json(text: str) -> str:
"""
Extracts the first complete JSON object from a string.
Assumes the JSON object is enclosed in curly braces {}.
"""
# Find the first occurrence of '{'
start_index = text.find('{')
if start_index == -1:
logger.warning(f"No opening curly brace found in LLM response: {text}")
return text # Return original text if no JSON start is found
# Find the last occurrence of '}'
end_index = text.rfind('}')
if end_index == -1 or end_index < start_index:
logger.warning(f"No closing curly brace found or invalid JSON structure in LLM response: {text}")
return text # Return original text if no JSON end is found or it's before the start
json_string = text[start_index : end_index + 1]
logger.debug(f"Extracted JSON string: {json_string}")
return json_string
# Create chain with fallback mechanism # Create chain with fallback mechanism
def create_analysis_chain(): def create_analysis_chain():
try: try:
@ -126,66 +205,48 @@ def create_analysis_chain():
"format_instructions": lambda _: parser.get_format_instructions() "format_instructions": lambda _: parser.get_format_instructions()
} }
| prompt_template | prompt_template
| llm | llm_runnable # Use the explicitly typed runnable
| extract_json # Add the new extraction step
| parser | parser
) )
# Add langfuse handler if enabled
if settings.langfuse.enabled:
chain = chain.with_config(
callbacks=[settings.langfuse_handler]
)
return chain return chain
except Exception as e: except Exception as e:
logger.warning(f"Using fallback prompt due to error: {str(e)}") logger.warning(f"Using fallback prompt due to error: {str(e)}")
chain = FALLBACK_PROMPT | llm | parser chain = FALLBACK_PROMPT | llm_runnable # Use the explicitly typed runnable
if settings.langfuse.enabled:
chain = chain.with_config(
callbacks=[settings.langfuse_handler]
)
return chain return chain
# Initialize analysis chain # Initialize analysis chain
analysis_chain = create_analysis_chain() analysis_chain = create_analysis_chain()
# Enhanced response validation function # Enhanced response validation function
def validate_response(response: Union[dict, str]) -> bool: def validate_response(response: Union[dict, str], issue_key: str = "N/A") -> bool:
"""Validate the JSON response structure and content""" """Validate the JSON response structure and content"""
try: try:
# If response is a string, attempt to parse it as JSON # If response is a string, attempt to parse it as JSON
if isinstance(response, str): if isinstance(response, str):
logger.debug(f"[{issue_key}] Raw LLM response (string): {response}")
try: try:
response = json.loads(response) response = json.loads(response)
except json.JSONDecodeError: except json.JSONDecodeError as e:
logger.error(f"Invalid JSON response: {response}") logger.error(f"[{issue_key}] JSONDecodeError: {e}. Raw response: {response}")
raise ValueError("Invalid JSON response format") return False
# Ensure response is a dictionary # Ensure response is a dictionary
if not isinstance(response, dict): if not isinstance(response, dict):
logger.error(f"[{issue_key}] Response is not a dictionary: {type(response)}")
return False return False
# Check required fields logger.debug(f"[{issue_key}] Parsed LLM response (JSON): {json.dumps(response)}")
required_fields = ["hasMultipleEscalations", "customerSentiment"]
if not all(field in response for field in required_fields):
return False
# Validate field types
if not isinstance(response["hasMultipleEscalations"], bool):
return False
if response["customerSentiment"] is not None:
if not isinstance(response["customerSentiment"], str):
return False
# Validate against schema using AnalysisFlags model # Validate against schema using AnalysisFlags model
try: try:
AnalysisFlags.model_validate(response) AnalysisFlags.model_validate(response)
return True return True
except Exception: except Exception as e:
return False logger.warning(f"[{issue_key}] Pydantic validation failed: {e}. Continuing with raw response: {response}")
return True # Allow processing even if validation fails
except Exception: except Exception as e:
logger.error(f"[{issue_key}] Unexpected error during response validation: {e}. Response: {response}")
return False return False

View File

@ -0,0 +1,61 @@
SYSTEM:
You are an expert AI assistant that analyzes Jira tickets and outputs a concise summary in a valid JSON format.
Your output MUST be a single JSON object and nothing else.
## JSON Output Schema
{format_instructions}
## Field-by-Field Instructions
1. **`issueCategory` (string)**
* Classify the core problem. Choose ONE: "technical_issue", "data_request", "access_problem", "general_question", "other".
2. **`area` (string)**
* Classify the technical domain. Choose the BEST fit from the following options:
* `"Direct Channel"`
* `"Streaming Channel"`
* `"Java Batch Channel"`
* `"ETL Batch Channel"`
* `"DCR Service"`
* `"API Gateway"`
* `"Callback Service"`
* `"Publisher"`
* `"Reconciliation"`
* `"Snowflake"`
* `"Authentication"`
* `"Other"`
3. **`isEscalated` (boolean)**
* Set to `true` if the user explicitly states they are escalating, mentions previous unanswered requests, or if the tone is highly frustrated and urgent.
* Set to `false` otherwise.
5. **`oneSentenceSummary` (string)**
* A single paragraph in concise English that summarizes the discussion.
## Example
### Input:
- Summary: "DCR Rejected by OneKey"
- Description: "Our DCR for PforceRx was rejected by OneKey. Can the MDM HUB team investigate? This is blocking our weekly report."
- Comment: ""
### Output:
```json
{{
"issueCategory": "technical_issue",
"area": "Application Service",
"isEscalated": false,
"oneSentenceSummary": "A DCR for PforceRx was rejected by OneKey, which is blocking a weekly report."
}}
USER:
Analyze the following Jira ticket:
Issue Key: {issueKey}
Summary: {summary}
Description: {description}
Status: {status}
Existing Labels: {labels}
Assignee: {assignee}
Last Updated: {updated}
Latest Comment (if applicable): {comment}

View File

@ -0,0 +1,92 @@
SYSTEM:
You are an expert AI assistant that analyzes Jira tickets and outputs a concise summary in a valid JSON format.
Your output MUST be a single JSON object and nothing else.
## JSON Output Schema
{format_instructions}
## Field-by-Field Instructions
1. **`issueCategory` (string)**
* Classify the core problem. Choose ONE: "technical_issue", "data_request", "access_problem", "general_question", "other".
2. **`area` (string)**
* Classify the technical domain. Choose the BEST fit from the following options:
* `"Direct Channel"`
* `"Streaming Channel"`
* `"Java Batch Channel"`
* `"ETL Batch Channel"`
* `"DCR Service"`
* `"API Gateway"`
* `"Callback Service"`
* `"Publisher"`
* `"Reconciliation"`
* `"Snowflake"`
* `"Authentication"`
* `"Other"`
3. **`urgencyScore` (float)**
* Estimate the client's urgency and frustration level as a float between 0.0 and 1.0.
* **0.0 - 0.2 (Low Urgency)**: Calm, routine inquiries. No explicit or implicit signs of frustration or urgency.
* **0.3 - 0.5 (Moderate Urgency)**: Client is seeking information, but without strong emotional indicators. May mention minor inconveniences or follow-ups without demanding immediate action.
* **0.6 - 0.8 (High Urgency)**: Client expresses clear frustration, uses words like "urgent," "high priority," "ASAP," "blocked," "critical," or implies delays are causing significant issues. May mention previous unanswered requests or multiple attempts to get attention.
* **0.9 - 1.0 (Critical Urgency)**: Client is highly frustrated, demanding immediate resolution, or explicitly stating an escalation. This indicates a severe impact or a breakdown in communication.
* Infer urgency from keywords, tone, repeated inquiries, and the impact described (e.g., "blocking our weekly report").
5. ### `description` (string)
- Provide a concise summary of the **current status of the problem**, specifically detailing any **actions or verifications that have already been completed or confirmed.** Focus on what is known or has been done up to this point regarding the described issue.
6. ### `actions` (string)
- Generate a multi-line, formatted text block that summarizes all actionable items, open topics, considerations, and next steps. Each point MUST start with a bullet (`- `) and be on a new line.
- For each point, use the following structure based on its `type`:
- **Action Item:** `- [STATUS] [Assignee Name (Team)] Detailed description of the action.`
- **STATUS:** Use `DONE`, `IN PROGRESS`, `OPEN`.
- **Assignee Name:** The name of the person responsible (e.g., "Grzegorz", "Ireneusz").
- **Team:** The team responsible: `MDM Team`, `MDM HUB Team`, `External Team`, `Other Team`. If unknown, use `Unknown Team`.
- **Open Topic:** `- Open Topic: Detailed description of the unresolved issue/question.`
- **Consideration:** `- Consideration: Detailed description of a factor or implication to be aware of.`
- **Next Step:** `- Next Step: [Assignee Name (Team)] Detailed description of the next action to take.`
- Ensure the description for each point is clear and verbose enough.
7. ### `timeToResolutionDays` (number)
- Calculate the total time in **days** from the **initial request or first mention of the problem** in the ticket to the point where the **problem was confirmed to be resolved or a fix was delivered/implemented**.
- **Steps for calculation:**
1. Identify the earliest date of communication (email or comment) describing the problem.
2. Identify the latest date of communication (email or comment) explicitly stating the problem's resolution, fix delivery, or successful verification.
3. Calculate the difference in whole days between these two dates.
- If either date cannot be identified, set this value to `null`.
## Example (Using your provided User section for calculation)
### Input:
- Summary: "RE: Failure in Global MDM Ex-US interface"
- Description: "++MDM and MDM Hub team Hi MDM and MDM HUB team, Can you please look into this and revert as soon as possible ? Regards Saraswati *From:* Choudhary, Anita *Sent:* Monday, June 16, 2025 5:27 PM *To:* Moses, Donie *Cc:* Subramanya, Suraj K *Subject:* RE: Failure in Global MDM Ex-US interface Hi Donie, There has not been any activity from support end which I have understood from the team which have impacted this. Upon comparing the P_HCP structure in PROD and QA it looks like there are 116 columns in QA and 128 in PROD. We could observe 13 fields are less in QA.PFB: [@Das, Deepanjan] Could you please check at your end # If any ongoing activity has impacted this # Was there any change in DDL of P_HCP in QA # When was the last DDL changed and by whom if possible. Thanks & Regards, Anita *From:* Moses, Donie <[Donie.Moses@pfizer.com]> *Sent:* Monday, June 16, 2025 3:58 PM *To:* Sahu, Saraswati <[Saraswati.Sahu@pfizer.com]> *Cc:* Subramanya, Suraj K <[Suraj.KSubramanya@pfizer.com]> *Subject:* RE: Failure in Global MDM Ex-US interface [@Choudhary, Anita] Could you get this checked please ? Thanks, Donie * *Donie Moses** * *Manager Data Integration Support** * *Pfizer Digital Global Support and Operations** Office: +44-7587 442339 Mobile: +44-7483 255494 [donie.moses@pfizer.com] Address: Pfizer Ltd., Dorking Road, Walton Oaks, Tadworth, Surrey. KT20 7NS *From:* Sahu, Saraswati <[Saraswati.Sahu@pfizer.com]> *Sent:* Monday, 16 June 2025 10:03 *To:* B, Pavithra <[Pavithra.B@pfizer.com]> *Cc:* Choudhary, Anita <[Anita.Choudhary@pfizer.com]> *Subject:* Failure in Global MDM Ex-US interface Hi Pavithra, Yasaswini, While testing the Global MDM Ex-US Interface in lower environment ,we have encountered one failure because IS_SPEKAER field has been removed from P_HCP table. This field is available in Production environment. Are you aware of this change ? Can you check with MDM team why this field has been removed in lower environment ? Regards Saraswati"
- Status: "Resolved"
- Latest Comment (excerpt for context): "==COMMENT_NOTE_13 COMMENT_NOTE_13_AUTHOR: Bachanowicz, Mieczysław (Irek) COMMENT_NOTE_13_DATE: 2025-06-25T10:23:16.8-0400 COMMENT_NOTE_13_BODY: Hi [ @Subramanya, Suraj K] First, I apologize for the lengthy delay—this situation was not straightforward, and standard SOPs did not address the problem. The issue with the missing column has been resolved. Please verify on your side to ensure everything is working correctly. The root cause was the recent deletion of US data from EMEA cluster. IS_SPEAKER column was populated only with US-related records. Our procedures for updating the data model determined that if there were no data remaining in the column, the column itself was unnecessary—hence, it was removed. At this point, we have implemented a temporary fix. Tomorrow, we will work on developing a permanent solution to prevent similar issues in the future. Regards, Mieczysław (Irek) Bachanowicz *MDM HUB team* ---- *Od:* Subramanya, Suraj K *Wysłane:* środa, 25 czerwca 2025 14:16 *Do:* Das, Deepanjan *DW:* Vedula, Anvesh C. *Temat:* RE: Failure in Global MDM Ex-US interface Hi Team, Any update on the issue ? Our testing is put on hold due to this issue. Can we please look into this on priority ? Regards, Suraj ===COMMENT_NOTE_14 COMMENT_NOTE_14_AUTHOR: Suraj.KSubramanya@pfizer.com COMMENT_NOTE_14_DATE: 2025-06-26T00:52:20.7-0400 COMMENT_NOTE_14_BODY: Thank you, Irek and team. We can see the column now. Regards, Suraj ---- ="
### Output:
```json
{{
"urgencyScore": 0.7,
"issueCategory": "technical_issue",
"area": "Streaming Channel",
"description": "The Global MDM Ex-US Interface failed in a lower environment due to the removal of the IS_SPEAKER field from the P_HCP table, which is present in Production. MDM and MDM HUB teams were asked to investigate immediately. Initial internal comparison shows 13 fewer columns in QA vs PROD for P_HCP structure. This includes missing schema details and potential DDL changes. Manual refresh process was initiated and completed, confirming column availability on QA/STG. The root cause was identified as the recent deletion of US data from the EMEA cluster.",
"actions": "- [DONE] Ireneusz (MDM Team) Confirmed DCR ID in internal system for correctness. (Note: This action is from a previous example and might not fit the current ticket. The LLM should generate actions relevant to the provided input.)\\n- [DONE] Karol (Unknown Team) Manually reran refresh process.\\n- [DONE] Grzegorz (MDM HUB Team) Confirmed columns are available on QA/STG in P_HCP.\\n- [DONE] Mieczysław Bachanowicz (MDM HUB Team) Confirmed issue with missing column has been resolved.\\n- [OPEN] Mieczysław Bachanowicz (MDM HUB Team) Develop a permanent solution to prevent similar issues in the future.\\n- Open Topic: Confirm if the IS_SPEAKER field is now permanently available in QA/STG and if it will persist.\\n- Next Step: Suraj (External Team) Verify the fix on their side to ensure everything is working correctly for testing.",
"timeToResolutionDays": 9
}}
```
USER:
Analyze the following Jira ticket:
Issue Key: {issueKey}
Summary: {summary}
Description: {description}
Status: {status}
Existing Labels: {labels}
Assignee: {assignee}
Last Updated: {updated}
Latest Comment (if applicable): {comment}

View File

@ -1,7 +1,36 @@
from typing import Optional, List, Union from typing import Optional, List, Union
from enum import Enum
from loguru import logger
from pydantic import BaseModel, ConfigDict, field_validator, Field from pydantic import BaseModel, ConfigDict, field_validator, Field
from datetime import datetime
from config import settings from config import settings
class LLMResponse(BaseModel):
status: str
message: str
class IssueCategory(str, Enum):
TECHNICAL_ISSUE = "technical_issue"
DATA_REQUEST = "data_request"
ACCESS_PROBLEM = "access_problem"
GENERAL_QUESTION = "general_question"
OTHER = "other"
# New: Add an Enum for technical areas based on Confluence doc
class Area(str, Enum):
DIRECT_CHANNEL = "Direct Channel"
STREAMING_CHANNEL = "Streaming Channel"
JAVA_BATCH_CHANNEL = "Java Batch Channel"
ETL_BATCH_CHANNEL = "ETL Batch Channel"
DCR_SERVICE = "DCR Service"
API_GATEWAY = "API Gateway"
CALLBACK_SERVICE = "Callback Service"
PUBLISHER = "Publisher"
RECONCILIATION = "Reconciliation"
SNOWFLAKE = "Snowflake"
AUTHENTICATION = "Authentication"
OTHER = "Other"
class JiraWebhookPayload(BaseModel): class JiraWebhookPayload(BaseModel):
model_config = ConfigDict(alias_generator=lambda x: ''.join(word.capitalize() if i > 0 else word for i, word in enumerate(x.split('_'))), populate_by_name=True) model_config = ConfigDict(alias_generator=lambda x: ''.join(word.capitalize() if i > 0 else word for i, word in enumerate(x.split('_'))), populate_by_name=True)
@ -23,29 +52,25 @@ class JiraWebhookPayload(BaseModel):
updated: Optional[str] = None updated: Optional[str] = None
class AnalysisFlags(BaseModel): class AnalysisFlags(BaseModel):
hasMultipleEscalations: bool = Field(description="Is there evidence of multiple escalation attempts?") model_config = ConfigDict(alias_generator=lambda x: ''.join(word.capitalize() if i > 0 else word for i, word in enumerate(x.split('_'))), populate_by_name=True)
customerSentiment: Optional[str] = Field(description="Overall customer sentiment (e.g., 'neutral', 'frustrated', 'calm').")
def __init__(self, **data): issueCategory: IssueCategory = Field(..., description="The primary category of the Jira ticket.")
super().__init__(**data) area: Area = Field(..., description="The technical area of the MDM HUB related to the issue.")
urgencyScore: float = Field(..., ge=0.0, le=1.0, description="A float between 0.0 and 1.0 indicating the estimated urgency and frustration of the client. Higher values indicate more urgency/frustration, inferred from words like 'urgent', 'high priority', 'ASAP', 'blocked', or mentions of previous unanswered requests.")
description: str = Field(..., description="A single paragraph in concise English that summarizes the discussion.")
actions: str = Field(..., description="A summary of the actions taken or recommended.")
timeToResolutionDays: int = Field(..., description="The estimated time to resolution in days.")
# Track model usage if Langfuse is enabled and client is available
if settings.langfuse.enabled and hasattr(settings, 'langfuse_client'):
try:
if settings.langfuse_client is None:
logger.warning("Langfuse client is None despite being enabled")
return
settings.langfuse_client.trace( class JiraAnalysisResponse(BaseModel):
name="LLM Model Usage", model_config = ConfigDict(from_attributes=True)
input=data, id: int
metadata={ issue_key: str
"model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model, status: str
"analysis_flags": { issue_summary: str
"hasMultipleEscalations": self.hasMultipleEscalations, request_payload: dict
"customerSentiment": self.customerSentiment analysis_result: Optional[dict] = None
} created_at: datetime
} updated_at: datetime
) error_message: Optional[str] = None
except Exception as e: raw_response: Optional[dict] = None
logger.error(f"Failed to track model usage: {e}")

View File

@ -1,29 +0,0 @@
import unittest
from llm.chains import load_prompt_template, validate_response
from llm.models import AnalysisFlags
class PromptTests(unittest.TestCase):
def test_prompt_loading(self):
"""Test that prompt template loads correctly"""
try:
template = load_prompt_template()
self.assertIsNotNone(template)
self.assertIn("issueKey", template.input_variables)
except Exception as e:
self.fail(f"Prompt loading failed: {str(e)}")
def test_response_validation(self):
"""Test response validation logic"""
valid_response = {
"hasMultipleEscalations": False,
"customerSentiment": "neutral"
}
invalid_response = {
"customerSentiment": "neutral"
}
self.assertTrue(validate_response(valid_response))
self.assertFalse(validate_response(invalid_response))
if __name__ == "__main__":
unittest.main()

View File

@ -1,23 +0,0 @@
You are an AI assistant designed to analyze Jira ticket details containe email correspondence and extract key flags and sentiment and extracting information into a strict JSON format.
Analyze the following Jira ticket information and provide your analysis in a JSON format.
Ensure the JSON strictly adheres to the specified schema.
Consider the overall context of the ticket and specifically the latest comment if provided.
Issue Key: {issueKey}
Summary: {summary}
Description: {description}
Status: {status}
Existing Labels: {labels}
Assignee: {assignee}
Last Updated: {updated}
Latest Comment (if applicable): {comment}
**Analysis Request:**
- Determine if there are signs of multiple escalation attempts in the descriptions or comments with regards to HUB team. Escalation to other teams are not considered.
-- Usually multiple requests one after another are being called by the same user in span of hours or days asking for immediate help of HUB team. Normall discussion, responses back and forth, are not considered as a escalation.
- Assess if the issue requires urgent attention based on language or context from the summary, description, or latest comment.
-- Usually means that Customer is asking for help due to upcoming deadlines, other high priority issues which are blocked due to our stall.
- Summarize the overall customer sentiment evident in the issue. Analyse tone of responses, happiness, gratefullnes, iritation, etc.
{format_instructions}

View File

@ -1,27 +0,0 @@
SYSTEM:
You are an AI assistant designed to analyze Jira ticket details containing email correspondence and extract key flags and sentiment, outputting information in a strict JSON format.
Your output MUST be ONLY a valid JSON object. Do NOT include any conversational text, explanations, or markdown outside the JSON.
The JSON structure MUST follow this exact schema. If a field cannot be determined, use `null` for strings/numbers or empty list `[]` for arrays.
Consider the overall context of the ticket and specifically the latest comment if provided.
**Analysis Request:**
- Determine if there are signs of multiple escalation attempts in the descriptions or comments with regards to HUB team. Escalation to other teams are not considered.
-- Usually multiple requests one after another are being called by the same user in span of hours or days asking for immediate help of HUB team. Normal discussion, responses back and forth, are not considered as an escalation.
- Assess if the issue requires urgent attention based on language or context from the summary, description, or latest comment.
-- Usually means that Customer is asking for help due to upcoming deadlines, other high priority issues which are blocked due to our stall.
- Summarize the overall customer sentiment evident in the issue. Analyze tone of responses, happiness, gratefulness, irritation, etc.
{format_instructions}
USER:
Issue Key: {issueKey}
Summary: {summary}
Description: {description}
Status: {status}
Existing Labels: {labels}
Assignee: {assignee}
Last Updated: {updated}
Latest Comment (if applicable): {comment}

View File

@ -1,102 +0,0 @@
import sys
import os
from pathlib import Path
from datetime import datetime
from typing import Optional
from loguru import logger
# Basic fallback logging configuration
logger.remove()
logger.add(sys.stderr, level="WARNING", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
def configure_logging(log_level: str = "INFO", log_dir: Optional[str] = None):
"""Configure structured logging for the application with fallback handling"""
try:
# Log that we're attempting to configure logging
logger.warning("Attempting to configure logging...")
# Default log directory
if not log_dir:
log_dir = os.getenv("LOG_DIR", "logs")
# Create log directory if it doesn't exist
Path(log_dir).mkdir(parents=True, exist_ok=True)
# Log file path with timestamp
log_file = Path(log_dir) / f"jira-webhook-llm_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
# Remove any existing loggers
logger.remove()
# Add console logger
logger.add(
sys.stdout,
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {extra[request_id]} | {message}",
colorize=True,
backtrace=True,
diagnose=True
)
# Add file logger
logger.add(
str(log_file),
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {extra[request_id]} | {message}",
rotation="100 MB",
retention="30 days",
compression="zip",
backtrace=True,
diagnose=True
)
# Configure default extras
logger.configure(extra={"request_id": "N/A"})
logger.info("Logging configured successfully")
except Exception as e:
# Fallback to basic logging if configuration fails
logger.remove()
logger.add(sys.stderr, level="WARNING", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
logger.error(f"Failed to configure logging: {str(e)}. Using fallback logging configuration.")
"""Configure structured logging for the application"""
# Default log directory
if not log_dir:
log_dir = os.getenv("LOG_DIR", "logs")
# Create log directory if it doesn't exist
Path(log_dir).mkdir(parents=True, exist_ok=True)
# Log file path with timestamp
log_file = Path(log_dir) / f"jira-webhook-llm_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
# Remove any existing loggers
logger.remove()
# Add console logger
logger.add(
sys.stdout,
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {extra[request_id]} | {message}",
colorize=True,
backtrace=True,
diagnose=True
)
# Add file logger
logger.add(
str(log_file),
level=log_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {extra[request_id]} | {message}",
rotation="100 MB",
retention="30 days",
compression="zip",
backtrace=True,
diagnose=True
)
# Configure default extras
logger.configure(extra={"request_id": "N/A"})
logger.info("Logging configured successfully")

211
main.py Normal file
View File

@ -0,0 +1,211 @@
# Standard library imports
import json
import os
import sys
import time
import asyncio
import signal
import uuid
from datetime import datetime, timezone
from typing import Dict, Optional
from http import HTTPStatus
from functools import partial, wraps
from contextlib import asynccontextmanager
# Third-party imports
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from loguru import logger
from langfuse import Langfuse # Import Langfuse
from langfuse.langchain import CallbackHandler # Import CallbackHandler
# Local application imports
from shared_store import RequestStatus, requests_queue, ProcessingRequest
from llm.models import JiraWebhookPayload
from llm.chains import analysis_chain, validate_response
from app.handlers import jira_router, queue_router # Import new routers
from config import settings
# Initialize Langfuse client globally
langfuse_client = None
if settings.langfuse.enabled:
langfuse_client = Langfuse(
public_key=settings.langfuse.public_key,
secret_key=settings.langfuse.secret_key,
host=settings.langfuse.host
)
logger.info("Langfuse client initialized.")
else:
logger.info("Langfuse integration is disabled.")
async def process_single_jira_request(request: ProcessingRequest):
"""Processes a single Jira webhook request using the LLM."""
payload = JiraWebhookPayload.model_validate(request.payload)
# Initialize Langfuse callback handler for this trace
langfuse_handler = None
if langfuse_client:
langfuse_handler = CallbackHandler() # No arguments needed for constructor
logger.bind(
issue_key=payload.issueKey,
request_id=request.id,
timestamp=datetime.now(timezone.utc).isoformat()
).info(f"[{payload.issueKey}] Processing webhook request.")
llm_input = {
"issueKey": payload.issueKey,
"summary": payload.summary,
"description": payload.description if payload.description else "No description provided.",
"status": payload.status if payload.status else "Unknown",
"labels": ", ".join(payload.labels) if payload.labels else "None",
"assignee": payload.assignee if payload.assignee else "Unassigned",
"updated": payload.updated if payload.updated else "Unknown",
"comment": payload.comment if payload.comment else "No new comment provided."
}
try:
# Pass the Langfuse callback handler to the ainvoke method
raw_llm_response = await analysis_chain.ainvoke(
llm_input,
config={
"callbacks": [langfuse_handler],
"callbacks_extra": {
"session_id": str(request.id),
"trace_name": f"Jira-Analysis-{payload.issueKey}"
}
} if langfuse_handler else {}
)
# Store the raw LLM response
request.response = raw_llm_response
if not validate_response(raw_llm_response, payload.issueKey): # Pass issueKey for logging
error_msg = f"Invalid LLM response structure: {raw_llm_response}"
logger.error(f"[{payload.issueKey}] {error_msg}")
raise ValueError(error_msg)
logger.debug(f"[{payload.issueKey}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}")
logger.info(f"[{payload.issueKey}] Successfully processed request {request.id}.")
except Exception as e:
logger.error(f"[{payload.issueKey}] LLM processing failed: {str(e)}")
request.status = RequestStatus.FAILED
request.error = str(e)
raise
finally:
if langfuse_handler:
langfuse_client.flush() # Ensure all traces are sent
logger.debug(f"[{payload.issueKey}] Langfuse client flushed.")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Starts background processing loop with database integration"""
async def processing_loop():
while True:
request = None
try:
request = requests_queue.get_next_request()
if request:
try:
request.status = RequestStatus.PROCESSING
request.started_at = datetime.now(timezone.utc)
# Process request
await process_single_jira_request(request)
request.status = RequestStatus.COMPLETED
request.completed_at = datetime.now(timezone.utc)
except Exception as e:
request.status = RequestStatus.FAILED
request.error = str(e)
request.completed_at = datetime.now(timezone.utc)
request.retry_count += 1
if request.retry_count < settings.processor.max_retries:
retry_delay = min(
settings.processor.initial_retry_delay_seconds * (2 ** request.retry_count),
3600
)
logger.warning(f"Request {request.id} failed, will retry in {retry_delay}s")
else:
logger.error(f"Request {request.id} failed after {request.retry_count} attempts")
finally:
if request:
requests_queue.task_done()
except Exception as e:
logger.error(f"Processing loop error: {str(e)}")
await asyncio.sleep(settings.processor.poll_interval_seconds)
task = asyncio.create_task(processing_loop())
try:
logger.info("Application initialized with processing loop started")
yield
finally:
# Ensure all tasks are done before cancelling the processing loop
logger.info("Waiting for pending queue tasks to complete...")
# requests_queue.join()
task.cancel()
await task # Await the task to ensure it's fully cancelled and cleaned up
logger.info("Processing loop terminated")
def create_app():
"""Factory function to create FastAPI app instance"""
_app = FastAPI(lifespan=lifespan)
# Include routers
_app.include_router(jira_router)
_app.include_router(queue_router)
# Add health check endpoint
@_app.get("/health")
async def health_check():
return {"status": "healthy"}
# Add error handling middleware
@_app.middleware("http")
async def error_handling_middleware(request: Request, call_next):
request_id = str(uuid.uuid4())
logger.bind(request_id=request_id).info(f"Request started: {request.method} {request.url}")
try:
response = await call_next(request)
return response
except HTTPException as e:
logger.error(f"HTTP Error: {e.status_code} - {e.detail}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.now(timezone.utc).isoformat(),
status_code=e.status_code,
message=e.detail,
details=str(e)
)
return JSONResponse(status_code=e.status_code, content=error_response.model_dump())
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
error_response = ErrorResponse(
error_id=request_id,
timestamp=datetime.now(timezone.utc).isoformat(),
status_code=500,
message="Internal Server Error",
details=str(e)
)
return JSONResponse(status_code=500, content=error_response.model_dump())
return _app
class ErrorResponse(BaseModel):
error_id: str
timestamp: str
status_code: int
message: str
details: Optional[str] = None
app = create_app()
app = create_app()

View File

@ -1,19 +1,21 @@
fastapi==0.111.0 fastapi==0.111.0
pydantic==2.9.0 # Changed from 2.7.4 to meet ollama's requirement pydantic==2.7.4
pydantic-settings==2.0.0 pydantic-settings>=2.0.0
langchain==0.3.26 langchain>=0.2.0
langchain-ollama==0.3.3 langchain-ollama>=0.1.0
langchain-openai==0.3.27 langchain-openai>=0.1.0
langchain-core==0.3.68 langchain-google-genai==2.1.8
langfuse==3.1.3 langchain-core>=0.3.68,<0.4.0 # Pin to the range required by langchain-google-genai
langfuse==3.2.1
uvicorn==0.30.1 uvicorn==0.30.1
python-multipart==0.0.9 # Good to include for FastAPI forms python-multipart==0.0.9 # Good to include for FastAPI forms
loguru==0.7.3 loguru==0.7.3
# Testing dependencies # Testing dependencies
unittest2>=1.1.0 # unittest2>=1.1.0 # Removed as it's an older backport
# Testing dependencies
pytest==8.2.0 pytest==8.2.0
pytest-asyncio==0.23.5 pytest-asyncio==0.23.5
pytest-cov==4.1.0 pytest-cov==4.1.0
httpx==0.27.0 httpx==0.27.0
PyYAML PyYAML==6.0.2
SQLAlchemy==2.0.30
alembic==1.13.1

120
shared_store.py Normal file
View File

@ -0,0 +1,120 @@
from typing import List, Dict, Optional, Any # Import Any
import threading
from datetime import datetime, timezone
from enum import Enum
class RequestStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
TIMEOUT = "timeout"
# Thread-safe storage for requests and responses
from queue import Queue
from dataclasses import dataclass, field
@dataclass
class ProcessingRequest:
id: int
payload: Dict
status: RequestStatus = RequestStatus.PENDING
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
error: Optional[str] = None
retry_count: int = 0
response: Optional[Any] = None # Add response field
class RequestQueue:
def __init__(self):
self._queue: Queue[ProcessingRequest] = Queue()
self._requests: List[ProcessingRequest] = [] # To store all requests for retrieval
self._processing_lock = threading.Lock()
self._id_lock = threading.Lock()
self._current_id = 0
def _get_next_id(self) -> int:
"""Generate and return the next available request ID"""
with self._id_lock:
self._current_id += 1
return self._current_id
def add_request(self, payload: Dict) -> int:
"""Adds a new request to the queue and returns its ID"""
request_id = self._get_next_id()
request = ProcessingRequest(id=request_id, payload=payload)
self._queue.put(request)
with self._processing_lock: # Protect access to _requests list
self._requests.append(request)
return request_id
def get_next_request(self) -> Optional[ProcessingRequest]:
"""Fetches the next available request from the queue"""
with self._processing_lock:
if not self._queue.empty():
return self._queue.get()
return None
def get_all_requests(self) -> List[ProcessingRequest]:
"""Returns a list of all requests currently in the store."""
with self._processing_lock:
return list(self._requests) # Return a copy to prevent external modification
def get_request_by_id(self, request_id: int) -> Optional[ProcessingRequest]:
"""Retrieves a specific request by its ID."""
with self._processing_lock:
return next((req for req in self._requests if req.id == request_id), None)
def get_latest_completed_by_issue_key(self, issue_key: str) -> Optional[ProcessingRequest]:
"""
Retrieves the latest successfully completed request for a given issue key.
Skips pending or failed requests.
"""
with self._processing_lock:
completed_requests = [
req for req in self._requests
if req.payload.get("issueKey") == issue_key and req.status == RequestStatus.COMPLETED
]
# Sort by completed_at to get the latest, if available
completed_requests.sort(key=lambda req: req.completed_at if req.completed_at else datetime.min, reverse=True)
return completed_requests[0] if completed_requests else None
def are_there_any_open_requests_for_issue_key(self, issue_key: str) -> bool:
"""
Checks if there are any pending or processing requests for a given issue key.
"""
with self._processing_lock:
for req in self._requests:
if req.payload.get("issueKey") == issue_key and \
(req.status == RequestStatus.PENDING or req.status == RequestStatus.PROCESSING):
return True
return False
def delete_request_by_id(self, request_id: int) -> bool:
"""Deletes a specific request by its ID."""
with self._processing_lock:
initial_length = len(self._requests)
self._requests = [req for req in self._requests if req.id != request_id]
return len(self._requests) < initial_length
def clear_all_requests(self):
"""Clears all requests from the store."""
with self._processing_lock:
self._requests.clear()
# Clear the queue as well, though it's generally processed
while not self._queue.empty():
try:
self._queue.get_nowait()
except Exception:
continue
def task_done(self):
"""Indicates that a formerly enqueued task is complete."""
self._queue.task_done()
def join(self):
"""Blocks until all items in the queue have been gotten and processed."""
self._queue.join()
requests_queue = RequestQueue()

View File

@ -1 +0,0 @@
# Initialize tests package

View File

@ -1,20 +0,0 @@
import pytest
from fastapi.testclient import TestClient
from jira_webhook_llm import app
@pytest.fixture
def test_client():
return TestClient(app)
@pytest.fixture
def mock_jira_payload():
return {
"issueKey": "TEST-123",
"summary": "Test Issue",
"description": "Test Description",
"comment": "Test Comment",
"labels": ["test"],
"status": "Open",
"assignee": "Tester",
"updated": "2025-07-13T12:00:00Z"
}

View File

@ -1,38 +0,0 @@
import pytest
from fastapi import HTTPException
from jira_webhook_llm import app
from llm.models import JiraWebhookPayload
def test_error_handling_middleware(test_client, mock_jira_payload):
# Test 404 error handling
response = test_client.post("/nonexistent-endpoint", json={})
assert response.status_code == 404
assert "error_id" in response.json()
# Test validation error handling
invalid_payload = mock_jira_payload.copy()
invalid_payload.pop("issueKey")
response = test_client.post("/jira-webhook", json=invalid_payload)
assert response.status_code == 422
assert "details" in response.json()
def test_webhook_handler(test_client, mock_jira_payload):
# Test successful webhook handling
response = test_client.post("/jira-webhook", json=mock_jira_payload)
assert response.status_code == 200
assert "response" in response.json()
def test_llm_test_endpoint(test_client):
# Test LLM test endpoint
response = test_client.post("/test-llm")
assert response.status_code == 200
assert "response" in response.json()
def test_retry_decorator():
# Test retry decorator functionality
@app.retry(max_retries=3)
async def failing_function():
raise Exception("Test error")
with pytest.raises(Exception):
failing_function()

View File

@ -1,38 +0,0 @@
import pytest
from llm.chains import validate_response
def test_validate_response_valid():
"""Test validation with valid response"""
response = {
"hasMultipleEscalations": False,
"customerSentiment": "neutral"
}
assert validate_response(response) is True
def test_validate_response_missing_field():
"""Test validation with missing required field"""
response = {
"hasMultipleEscalations": False
}
assert validate_response(response) is False
def test_validate_response_invalid_type():
"""Test validation with invalid field type"""
response = {
"hasMultipleEscalations": "not a boolean",
"customerSentiment": "neutral"
}
assert validate_response(response) is False
def test_validate_response_null_sentiment():
"""Test validation with null sentiment"""
response = {
"hasMultipleEscalations": True,
"customerSentiment": None
}
assert validate_response(response) is True
def test_validate_response_invalid_structure():
"""Test validation with invalid JSON structure"""
response = "not a dictionary"
assert validate_response(response) is False

View File

@ -1,121 +0,0 @@
from fastapi import HTTPException
from loguru import logger
import json
from typing import Optional, List, Union
from pydantic import BaseModel, ConfigDict, field_validator
from datetime import datetime
from config import settings
from langfuse import Langfuse
from llm.models import JiraWebhookPayload, AnalysisFlags
from llm.chains import analysis_chain, validate_response
class BadRequestError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=400, detail=detail)
class RateLimitError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=429, detail=detail)
class ValidationError(HTTPException):
def __init__(self, detail: str):
super().__init__(status_code=422, detail=detail)
class JiraWebhookHandler:
def __init__(self):
self.analysis_chain = analysis_chain
async def handle_webhook(self, payload: JiraWebhookPayload):
try:
if not payload.issueKey:
raise BadRequestError("Missing required field: issueKey")
if not payload.summary:
raise BadRequestError("Missing required field: summary")
logger.bind(
issue_key=payload.issueKey,
timestamp=datetime.utcnow().isoformat()
).info("Received webhook")
# Create Langfuse trace if enabled
trace = None
if settings.langfuse.enabled:
trace = settings.langfuse_client.start_span(
name="Jira Webhook",
input=payload.dict(),
metadata={
"trace_id": f"webhook-{payload.issueKey}",
"issue_key": payload.issueKey,
"timestamp": datetime.utcnow().isoformat()
}
)
llm_input = {
"issueKey": payload.issueKey,
"summary": payload.summary,
"description": payload.description if payload.description else "No description provided.",
"status": payload.status if payload.status else "Unknown",
"labels": ", ".join(payload.labels) if payload.labels else "None",
"assignee": payload.assignee if payload.assignee else "Unassigned",
"updated": payload.updated if payload.updated else "Unknown",
"comment": payload.comment if payload.comment else "No new comment provided."
}
# Create Langfuse span for LLM processing if enabled
llm_span = None
if settings.langfuse.enabled and trace:
llm_span = trace.start_span(
name="LLM Processing",
input=llm_input,
metadata={
"model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model
}
)
try:
analysis_result = await self.analysis_chain.ainvoke(llm_input)
# Update Langfuse span with output if enabled
if settings.langfuse.enabled and llm_span:
llm_span.update(output=analysis_result)
llm_span.end()
# Validate LLM response
if not validate_response(analysis_result):
logger.warning(f"Invalid LLM response format for {payload.issueKey}")
analysis_result = {
"hasMultipleEscalations": False,
"customerSentiment": "neutral"
}
logger.debug(f"LLM Analysis Result for {payload.issueKey}: {json.dumps(analysis_result, indent=2)}")
return {"status": "success", "analysis_flags": analysis_result}
except Exception as e:
logger.error(f"LLM processing failed for {payload.issueKey}: {str(e)}")
# Log error to Langfuse if enabled
if settings.langfuse.enabled and llm_span:
llm_span.error(e)
llm_span.end()
return {
"status": "error",
"analysis_flags": {
"hasMultipleEscalations": False,
"customerSentiment": "neutral"
},
"error": str(e)
}
except Exception as e:
logger.error(f"Error processing webhook: {str(e)}")
import traceback
logger.error(f"Stack trace: {traceback.format_exc()}")
# Log error to Langfuse if enabled
if settings.langfuse.enabled and trace:
trace.end(error=e)
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")