From f3c70b9b0f854cf849358770ed822f03a767735d Mon Sep 17 00:00:00 2001 From: Ireneusz Bachanowicz Date: Sun, 13 Jul 2025 20:14:09 +0200 Subject: [PATCH] feat: Integrate Langfuse for observability and analytics in LLM processing and webhook handling --- README.md | 53 +++++++++++++++++++++ config.py | 104 +++++++++++++++++++++++++++++++++++++++-- config/application.yml | 14 +++++- jira-webhook-llm.py | 5 +- llm/chains.py | 36 ++++++++++++-- llm/models.py | 23 ++++++++- webhooks/handlers.py | 42 ++++++++++++++++- 7 files changed, 263 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index e69de29..1138ea7 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,53 @@ +# Jira Webhook LLM + +## Langfuse Integration + +### Overview +The application integrates with Langfuse for observability and analytics of LLM usage and webhook events. This integration provides detailed tracking of: +- Webhook events +- LLM model usage +- Error tracking +- Performance metrics + +### Configuration +Langfuse configuration is managed through both `application.yml` and environment variables. + +#### application.yml +```yaml +langfuse: + enabled: true + public_key: "pk-lf-..." + secret_key: "sk-lf-..." + host: "https://cloud.langfuse.com" +``` + +#### Environment Variables +```bash +LANGFUSE_ENABLED=true +LANGFUSE_PUBLIC_KEY="pk-lf-..." +LANGFUSE_SECRET_KEY="sk-lf-..." +LANGFUSE_HOST="https://cloud.langfuse.com" +``` + +### Enable/Disable +To disable Langfuse integration: +1. Set `langfuse.enabled: false` in `application.yml` +2. Or set `LANGFUSE_ENABLED=false` in your environment + +### Tracking Details +The following events are tracked: +1. Webhook events + - Input payload + - Timestamps + - Issue metadata +2. LLM processing + - Model used + - Input/output + - Processing time +3. Errors + - Webhook processing errors + - LLM processing errors + - Validation errors + +### Viewing Data +Visit your Langfuse dashboard to view the collected metrics and traces. \ No newline at end of file diff --git a/config.py b/config.py index afa6b14..cb86628 100644 --- a/config.py +++ b/config.py @@ -6,6 +6,50 @@ from pydantic import validator, ConfigDict from loguru import logger from watchfiles import watch, Change from threading import Thread +from langfuse import Langfuse +from langfuse.langchain import CallbackHandler + +class LangfuseConfig(BaseSettings): + enabled: bool = True + public_key: Optional[str] = None + secret_key: Optional[str] = None + host: Optional[str] = None + + @validator('host') + def validate_host(cls, v): + if v and not v.startswith(('http://', 'https://')): + raise ValueError("Langfuse host must start with http:// or https://") + return v + + def __init__(self, **data): + try: + logger.info("Initializing LangfuseConfig with data: %s", data) + logger.info("Environment variables:") + logger.info("LANGFUSE_PUBLIC_KEY: %s", os.getenv('LANGFUSE_PUBLIC_KEY')) + logger.info("LANGFUSE_SECRET_KEY: %s", os.getenv('LANGFUSE_SECRET_KEY')) + logger.info("LANGFUSE_HOST: %s", os.getenv('LANGFUSE_HOST')) + + super().__init__(**data) + logger.info("LangfuseConfig initialized successfully") + logger.info("Public Key: %s", self.public_key) + logger.info("Secret Key: %s", self.secret_key) + logger.info("Host: %s", self.host) + except Exception as e: + logger.error("Failed to initialize LangfuseConfig: %s", e) + logger.error("Current environment variables:") + logger.error("LANGFUSE_PUBLIC_KEY: %s", os.getenv('LANGFUSE_PUBLIC_KEY')) + logger.error("LANGFUSE_SECRET_KEY: %s", os.getenv('LANGFUSE_SECRET_KEY')) + logger.error("LANGFUSE_HOST: %s", os.getenv('LANGFUSE_HOST')) + raise + + model_config = ConfigDict( + env_prefix='LANGFUSE_', + env_file='.env', + env_file_encoding='utf-8', + extra='ignore', + env_nested_delimiter='__', + case_sensitive=True + ) class LogConfig(BaseSettings): level: str = 'INFO' @@ -42,10 +86,33 @@ class LLMConfig(BaseSettings): class Settings: def __init__(self): - self.log = LogConfig() - self.llm = LLMConfig() - self._validate() - self._start_watcher() + try: + logger.info("Initializing LogConfig") + self.log = LogConfig() + logger.info("LogConfig initialized: %s", self.log.model_dump()) + + logger.info("Initializing LLMConfig") + self.llm = LLMConfig() + logger.info("LLMConfig initialized: %s", self.llm.model_dump()) + + logger.info("Initializing LangfuseConfig") + self.langfuse = LangfuseConfig() + logger.info("LangfuseConfig initialized: %s", self.langfuse.model_dump()) + + logger.info("Validating configuration") + self._validate() + logger.info("Starting config watcher") + self._start_watcher() + logger.info("Initializing Langfuse") + self._init_langfuse() + logger.info("Configuration initialized successfully") + except Exception as e: + logger.error(f"Configuration initialization failed: {e}") + logger.error("Current configuration state:") + logger.error("LogConfig: %s", self.log.model_dump() if hasattr(self, 'log') else 'Not initialized') + logger.error("LLMConfig: %s", self.llm.model_dump() if hasattr(self, 'llm') else 'Not initialized') + logger.error("LangfuseConfig: %s", self.langfuse.model_dump() if hasattr(self, 'langfuse') else 'Not initialized') + raise def _validate(self): logger.info(f"LLM mode set to: '{self.llm.mode}'") @@ -65,6 +132,35 @@ class Settings: raise ValueError("LLM mode is 'ollama', but OLLAMA_MODEL is not set.") logger.info("Configuration validated successfully.") + + def _init_langfuse(self): + if self.langfuse.enabled: + try: + # Verify all required credentials are present + if not all([self.langfuse.public_key, self.langfuse.secret_key, self.langfuse.host]): + raise ValueError("Missing required Langfuse credentials") + + # Initialize Langfuse client + self.langfuse_client = Langfuse( + public_key=self.langfuse.public_key, + secret_key=self.langfuse.secret_key, + host=self.langfuse.host + ) + + # Initialize CallbackHandler + self.langfuse_handler = CallbackHandler( + public_key=self.langfuse.public_key, + secret_key=self.langfuse.secret_key, + host=self.langfuse.host + ) + + logger.info("Langfuse client and handler initialized successfully") + except ValueError as e: + logger.warning(f"Langfuse configuration error: {e}. Disabling Langfuse.") + self.langfuse.enabled = False + except Exception as e: + logger.error(f"Failed to initialize Langfuse: {e}") + self.langfuse.enabled = False def _start_watcher(self): def watch_config(): diff --git a/config/application.yml b/config/application.yml index 9d917b2..4ad1ffb 100644 --- a/config/application.yml +++ b/config/application.yml @@ -29,4 +29,16 @@ llm: model: "phi4-mini:latest" # model: "qwen3:1.7b" # model: "smollm:360m" - # model: "qwen3:0.6b" \ No newline at end of file + # model: "qwen3:0.6b" +# Langfuse configuration for observability and analytics +langfuse: + # Enable or disable Langfuse integration + # Can be overridden by LANGFUSE_ENABLED environment variable + enabled: true + + # Langfuse API credentials + # It's HIGHLY recommended to set these via environment variables + # instead of saving them in this file + public_key: "pk-lf-17dfde63-93e2-4983-8aa7-2673d3ecaab8" + secret_key: "sk-lf-ba41a266-6fe5-4c90-a483-bec8a7aaa321" + host: "https://cloud.langfuse.com" \ No newline at end of file diff --git a/jira-webhook-llm.py b/jira-webhook-llm.py index cce29af..8a0c421 100644 --- a/jira-webhook-llm.py +++ b/jira-webhook-llm.py @@ -98,5 +98,6 @@ async def test_llm(): ) return await webhook_handler.handle_webhook(test_payload) -# To run this: -# 1. Start FastAPI: uvicorn main:app --host 0.0.0.0 --port 8000 --reload \ No newline at end of file +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/llm/chains.py b/llm/chains.py index 72cbea4..b4b0cbf 100644 --- a/llm/chains.py +++ b/llm/chains.py @@ -1,3 +1,4 @@ +from typing import Union from langchain_ollama import OllamaLLM from langchain_openai import ChatOpenAI from langchain_core.prompts import PromptTemplate @@ -106,18 +107,44 @@ FALLBACK_PROMPT = PromptTemplate( def create_analysis_chain(): try: prompt_template = load_prompt_template() - return prompt_template | llm | parser + chain = prompt_template | llm | parser + + # Add langfuse handler if enabled + if settings.langfuse.enabled: + chain = chain.with_config( + callbacks=[settings.langfuse_handler] + ) + + return chain except Exception as e: logger.warning(f"Using fallback prompt due to error: {str(e)}") - return FALLBACK_PROMPT | llm | parser + chain = FALLBACK_PROMPT | llm | parser + + if settings.langfuse.enabled: + chain = chain.with_config( + callbacks=[settings.langfuse_handler] + ) + + return chain # Initialize analysis chain analysis_chain = create_analysis_chain() # Enhanced response validation function -def validate_response(response: dict) -> bool: +def validate_response(response: Union[dict, str]) -> bool: """Validate the JSON response structure and content""" try: + # If response is a string, attempt to parse it as JSON + if isinstance(response, str): + try: + response = json.loads(response) + except json.JSONDecodeError: + return False + + # Ensure response is a dictionary + if not isinstance(response, dict): + return False + # Check required fields required_fields = ["hasMultipleEscalations", "customerSentiment"] if not all(field in response for field in required_fields): @@ -139,5 +166,4 @@ def validate_response(response: dict) -> bool: return False except Exception: - return False - return all(field in response for field in required_fields) \ No newline at end of file + return False \ No newline at end of file diff --git a/llm/models.py b/llm/models.py index ae57c4a..0693168 100644 --- a/llm/models.py +++ b/llm/models.py @@ -1,5 +1,6 @@ from typing import Optional, List, Union from pydantic import BaseModel, ConfigDict, field_validator, Field +from config import settings class JiraWebhookPayload(BaseModel): model_config = ConfigDict(alias_generator=lambda x: ''.join(word.capitalize() if i > 0 else word for i, word in enumerate(x.split('_'))), populate_by_name=True) @@ -23,4 +24,24 @@ class JiraWebhookPayload(BaseModel): class AnalysisFlags(BaseModel): hasMultipleEscalations: bool = Field(description="Is there evidence of multiple escalation attempts?") - customerSentiment: Optional[str] = Field(description="Overall customer sentiment (e.g., 'neutral', 'frustrated', 'calm').") \ No newline at end of file + customerSentiment: Optional[str] = Field(description="Overall customer sentiment (e.g., 'neutral', 'frustrated', 'calm').") + + def __init__(self, **data): + super().__init__(**data) + + # Track model usage if Langfuse is enabled + if settings.langfuse.enabled: + try: + settings.langfuse_client.trace( + name="LLM Model Usage", + input=data, + metadata={ + "model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model, + "analysis_flags": { + "hasMultipleEscalations": self.hasMultipleEscalations, + "customerSentiment": self.customerSentiment + } + } + ) + except Exception as e: + logger.error(f"Failed to track model usage: {e}") \ No newline at end of file diff --git a/webhooks/handlers.py b/webhooks/handlers.py index 7b576ab..f0d81c0 100644 --- a/webhooks/handlers.py +++ b/webhooks/handlers.py @@ -6,8 +6,9 @@ from pydantic import BaseModel, ConfigDict, field_validator from datetime import datetime from config import settings +from langfuse import Langfuse from llm.models import JiraWebhookPayload, AnalysisFlags -from llm.chains import analysis_chain +from llm.chains import analysis_chain, validate_response class BadRequestError(HTTPException): def __init__(self, detail: str): @@ -37,6 +38,21 @@ class JiraWebhookHandler: issue_key=payload.issueKey, timestamp=datetime.utcnow().isoformat() ).info("Received webhook") + + # Create Langfuse trace if enabled + trace = None + if settings.langfuse.enabled: + trace = settings.langfuse_client.trace( + Langfuse().trace( + id=f"webhook-{payload.issueKey}", + name="Jira Webhook", + input=payload.dict(), + metadata={ + "issue_key": payload.issueKey, + "timestamp": datetime.utcnow().isoformat() + } + ) + ) llm_input = { "issueKey": payload.issueKey, @@ -48,10 +64,25 @@ class JiraWebhookHandler: "updated": payload.updated if payload.updated else "Unknown", "comment": payload.comment if payload.comment else "No new comment provided." } + + # Create Langfuse span for LLM processing if enabled + llm_span = None + if settings.langfuse.enabled and trace: + llm_span = trace.span( + name="LLM Processing", + input=llm_input, + metadata={ + "model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model + } + ) try: analysis_result = await self.analysis_chain.ainvoke(llm_input) + # Update Langfuse span with output if enabled + if settings.langfuse.enabled and llm_span: + llm_span.end(output=analysis_result) + # Validate LLM response if not validate_response(analysis_result): logger.warning(f"Invalid LLM response format for {payload.issueKey}") @@ -65,6 +96,10 @@ class JiraWebhookHandler: except Exception as e: logger.error(f"LLM processing failed for {payload.issueKey}: {str(e)}") + + # Log error to Langfuse if enabled + if settings.langfuse.enabled and llm_span: + llm_span.end(error=e) return { "status": "error", "analysis_flags": { @@ -78,4 +113,9 @@ class JiraWebhookHandler: logger.error(f"Error processing webhook: {str(e)}") import traceback logger.error(f"Stack trace: {traceback.format_exc()}") + + # Log error to Langfuse if enabled + if settings.langfuse.enabled and trace: + trace.end(error=e) + raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") \ No newline at end of file