feat: Integrate Langfuse for observability and analytics in LLM processing and webhook handling
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run

This commit is contained in:
Ireneusz Bachanowicz 2025-07-13 20:14:09 +02:00
parent 0038605b57
commit f3c70b9b0f
7 changed files with 263 additions and 14 deletions

View File

@ -0,0 +1,53 @@
# Jira Webhook LLM
## Langfuse Integration
### Overview
The application integrates with Langfuse for observability and analytics of LLM usage and webhook events. This integration provides detailed tracking of:
- Webhook events
- LLM model usage
- Error tracking
- Performance metrics
### Configuration
Langfuse configuration is managed through both `application.yml` and environment variables.
#### application.yml
```yaml
langfuse:
enabled: true
public_key: "pk-lf-..."
secret_key: "sk-lf-..."
host: "https://cloud.langfuse.com"
```
#### Environment Variables
```bash
LANGFUSE_ENABLED=true
LANGFUSE_PUBLIC_KEY="pk-lf-..."
LANGFUSE_SECRET_KEY="sk-lf-..."
LANGFUSE_HOST="https://cloud.langfuse.com"
```
### Enable/Disable
To disable Langfuse integration:
1. Set `langfuse.enabled: false` in `application.yml`
2. Or set `LANGFUSE_ENABLED=false` in your environment
### Tracking Details
The following events are tracked:
1. Webhook events
- Input payload
- Timestamps
- Issue metadata
2. LLM processing
- Model used
- Input/output
- Processing time
3. Errors
- Webhook processing errors
- LLM processing errors
- Validation errors
### Viewing Data
Visit your Langfuse dashboard to view the collected metrics and traces.

View File

@ -6,6 +6,50 @@ from pydantic import validator, ConfigDict
from loguru import logger
from watchfiles import watch, Change
from threading import Thread
from langfuse import Langfuse
from langfuse.langchain import CallbackHandler
class LangfuseConfig(BaseSettings):
enabled: bool = True
public_key: Optional[str] = None
secret_key: Optional[str] = None
host: Optional[str] = None
@validator('host')
def validate_host(cls, v):
if v and not v.startswith(('http://', 'https://')):
raise ValueError("Langfuse host must start with http:// or https://")
return v
def __init__(self, **data):
try:
logger.info("Initializing LangfuseConfig with data: %s", data)
logger.info("Environment variables:")
logger.info("LANGFUSE_PUBLIC_KEY: %s", os.getenv('LANGFUSE_PUBLIC_KEY'))
logger.info("LANGFUSE_SECRET_KEY: %s", os.getenv('LANGFUSE_SECRET_KEY'))
logger.info("LANGFUSE_HOST: %s", os.getenv('LANGFUSE_HOST'))
super().__init__(**data)
logger.info("LangfuseConfig initialized successfully")
logger.info("Public Key: %s", self.public_key)
logger.info("Secret Key: %s", self.secret_key)
logger.info("Host: %s", self.host)
except Exception as e:
logger.error("Failed to initialize LangfuseConfig: %s", e)
logger.error("Current environment variables:")
logger.error("LANGFUSE_PUBLIC_KEY: %s", os.getenv('LANGFUSE_PUBLIC_KEY'))
logger.error("LANGFUSE_SECRET_KEY: %s", os.getenv('LANGFUSE_SECRET_KEY'))
logger.error("LANGFUSE_HOST: %s", os.getenv('LANGFUSE_HOST'))
raise
model_config = ConfigDict(
env_prefix='LANGFUSE_',
env_file='.env',
env_file_encoding='utf-8',
extra='ignore',
env_nested_delimiter='__',
case_sensitive=True
)
class LogConfig(BaseSettings):
level: str = 'INFO'
@ -42,10 +86,33 @@ class LLMConfig(BaseSettings):
class Settings:
def __init__(self):
try:
logger.info("Initializing LogConfig")
self.log = LogConfig()
logger.info("LogConfig initialized: %s", self.log.model_dump())
logger.info("Initializing LLMConfig")
self.llm = LLMConfig()
logger.info("LLMConfig initialized: %s", self.llm.model_dump())
logger.info("Initializing LangfuseConfig")
self.langfuse = LangfuseConfig()
logger.info("LangfuseConfig initialized: %s", self.langfuse.model_dump())
logger.info("Validating configuration")
self._validate()
logger.info("Starting config watcher")
self._start_watcher()
logger.info("Initializing Langfuse")
self._init_langfuse()
logger.info("Configuration initialized successfully")
except Exception as e:
logger.error(f"Configuration initialization failed: {e}")
logger.error("Current configuration state:")
logger.error("LogConfig: %s", self.log.model_dump() if hasattr(self, 'log') else 'Not initialized')
logger.error("LLMConfig: %s", self.llm.model_dump() if hasattr(self, 'llm') else 'Not initialized')
logger.error("LangfuseConfig: %s", self.langfuse.model_dump() if hasattr(self, 'langfuse') else 'Not initialized')
raise
def _validate(self):
logger.info(f"LLM mode set to: '{self.llm.mode}'")
@ -66,6 +133,35 @@ class Settings:
logger.info("Configuration validated successfully.")
def _init_langfuse(self):
if self.langfuse.enabled:
try:
# Verify all required credentials are present
if not all([self.langfuse.public_key, self.langfuse.secret_key, self.langfuse.host]):
raise ValueError("Missing required Langfuse credentials")
# Initialize Langfuse client
self.langfuse_client = Langfuse(
public_key=self.langfuse.public_key,
secret_key=self.langfuse.secret_key,
host=self.langfuse.host
)
# Initialize CallbackHandler
self.langfuse_handler = CallbackHandler(
public_key=self.langfuse.public_key,
secret_key=self.langfuse.secret_key,
host=self.langfuse.host
)
logger.info("Langfuse client and handler initialized successfully")
except ValueError as e:
logger.warning(f"Langfuse configuration error: {e}. Disabling Langfuse.")
self.langfuse.enabled = False
except Exception as e:
logger.error(f"Failed to initialize Langfuse: {e}")
self.langfuse.enabled = False
def _start_watcher(self):
def watch_config():
for changes in watch('config/application.yml'):

View File

@ -30,3 +30,15 @@ llm:
# model: "qwen3:1.7b"
# model: "smollm:360m"
# model: "qwen3:0.6b"
# Langfuse configuration for observability and analytics
langfuse:
# Enable or disable Langfuse integration
# Can be overridden by LANGFUSE_ENABLED environment variable
enabled: true
# Langfuse API credentials
# It's HIGHLY recommended to set these via environment variables
# instead of saving them in this file
public_key: "pk-lf-17dfde63-93e2-4983-8aa7-2673d3ecaab8"
secret_key: "sk-lf-ba41a266-6fe5-4c90-a483-bec8a7aaa321"
host: "https://cloud.langfuse.com"

View File

@ -98,5 +98,6 @@ async def test_llm():
)
return await webhook_handler.handle_webhook(test_payload)
# To run this:
# 1. Start FastAPI: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@ -1,3 +1,4 @@
from typing import Union
from langchain_ollama import OllamaLLM
from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
@ -106,18 +107,44 @@ FALLBACK_PROMPT = PromptTemplate(
def create_analysis_chain():
try:
prompt_template = load_prompt_template()
return prompt_template | llm | parser
chain = prompt_template | llm | parser
# Add langfuse handler if enabled
if settings.langfuse.enabled:
chain = chain.with_config(
callbacks=[settings.langfuse_handler]
)
return chain
except Exception as e:
logger.warning(f"Using fallback prompt due to error: {str(e)}")
return FALLBACK_PROMPT | llm | parser
chain = FALLBACK_PROMPT | llm | parser
if settings.langfuse.enabled:
chain = chain.with_config(
callbacks=[settings.langfuse_handler]
)
return chain
# Initialize analysis chain
analysis_chain = create_analysis_chain()
# Enhanced response validation function
def validate_response(response: dict) -> bool:
def validate_response(response: Union[dict, str]) -> bool:
"""Validate the JSON response structure and content"""
try:
# If response is a string, attempt to parse it as JSON
if isinstance(response, str):
try:
response = json.loads(response)
except json.JSONDecodeError:
return False
# Ensure response is a dictionary
if not isinstance(response, dict):
return False
# Check required fields
required_fields = ["hasMultipleEscalations", "customerSentiment"]
if not all(field in response for field in required_fields):
@ -140,4 +167,3 @@ def validate_response(response: dict) -> bool:
except Exception:
return False
return all(field in response for field in required_fields)

View File

@ -1,5 +1,6 @@
from typing import Optional, List, Union
from pydantic import BaseModel, ConfigDict, field_validator, Field
from config import settings
class JiraWebhookPayload(BaseModel):
model_config = ConfigDict(alias_generator=lambda x: ''.join(word.capitalize() if i > 0 else word for i, word in enumerate(x.split('_'))), populate_by_name=True)
@ -24,3 +25,23 @@ class JiraWebhookPayload(BaseModel):
class AnalysisFlags(BaseModel):
hasMultipleEscalations: bool = Field(description="Is there evidence of multiple escalation attempts?")
customerSentiment: Optional[str] = Field(description="Overall customer sentiment (e.g., 'neutral', 'frustrated', 'calm').")
def __init__(self, **data):
super().__init__(**data)
# Track model usage if Langfuse is enabled
if settings.langfuse.enabled:
try:
settings.langfuse_client.trace(
name="LLM Model Usage",
input=data,
metadata={
"model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model,
"analysis_flags": {
"hasMultipleEscalations": self.hasMultipleEscalations,
"customerSentiment": self.customerSentiment
}
}
)
except Exception as e:
logger.error(f"Failed to track model usage: {e}")

View File

@ -6,8 +6,9 @@ from pydantic import BaseModel, ConfigDict, field_validator
from datetime import datetime
from config import settings
from langfuse import Langfuse
from llm.models import JiraWebhookPayload, AnalysisFlags
from llm.chains import analysis_chain
from llm.chains import analysis_chain, validate_response
class BadRequestError(HTTPException):
def __init__(self, detail: str):
@ -38,6 +39,21 @@ class JiraWebhookHandler:
timestamp=datetime.utcnow().isoformat()
).info("Received webhook")
# Create Langfuse trace if enabled
trace = None
if settings.langfuse.enabled:
trace = settings.langfuse_client.trace(
Langfuse().trace(
id=f"webhook-{payload.issueKey}",
name="Jira Webhook",
input=payload.dict(),
metadata={
"issue_key": payload.issueKey,
"timestamp": datetime.utcnow().isoformat()
}
)
)
llm_input = {
"issueKey": payload.issueKey,
"summary": payload.summary,
@ -49,9 +65,24 @@ class JiraWebhookHandler:
"comment": payload.comment if payload.comment else "No new comment provided."
}
# Create Langfuse span for LLM processing if enabled
llm_span = None
if settings.langfuse.enabled and trace:
llm_span = trace.span(
name="LLM Processing",
input=llm_input,
metadata={
"model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model
}
)
try:
analysis_result = await self.analysis_chain.ainvoke(llm_input)
# Update Langfuse span with output if enabled
if settings.langfuse.enabled and llm_span:
llm_span.end(output=analysis_result)
# Validate LLM response
if not validate_response(analysis_result):
logger.warning(f"Invalid LLM response format for {payload.issueKey}")
@ -65,6 +96,10 @@ class JiraWebhookHandler:
except Exception as e:
logger.error(f"LLM processing failed for {payload.issueKey}: {str(e)}")
# Log error to Langfuse if enabled
if settings.langfuse.enabled and llm_span:
llm_span.end(error=e)
return {
"status": "error",
"analysis_flags": {
@ -78,4 +113,9 @@ class JiraWebhookHandler:
logger.error(f"Error processing webhook: {str(e)}")
import traceback
logger.error(f"Stack trace: {traceback.format_exc()}")
# Log error to Langfuse if enabled
if settings.langfuse.enabled and trace:
trace.end(error=e)
raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}")