feat: Refactor Jira webhook handling; introduce new routes for request processing and response retrieval; update configuration settings and dependencies
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
Some checks are pending
CI/CD Pipeline / test (push) Waiting to run
This commit is contained in:
parent
cbe1a430ad
commit
79bf65265d
6
.env
6
.env
@ -1,5 +1,7 @@
|
|||||||
# Ollama configuration
|
# Ollama configuration
|
||||||
LLM_OLLAMA_BASE_URL=http://192.168.0.140:11434
|
# LLM_OLLAMA_BASE_URL=http://192.168.0.140:11434
|
||||||
|
# LLM_OLLAMA_BASE_URL=http://192.168.0.122:11434
|
||||||
|
LLM_OLLAMA_BASE_URL="https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
|
||||||
LLM_OLLAMA_MODEL=phi4-mini:latest
|
LLM_OLLAMA_MODEL=phi4-mini:latest
|
||||||
# LLM_OLLAMA_MODEL=smollm:360m
|
# LLM_OLLAMA_MODEL=smollm:360m
|
||||||
# LLM_OLLAMA_MODEL=qwen3:0.6b
|
# LLM_OLLAMA_MODEL=qwen3:0.6b
|
||||||
@ -8,7 +10,7 @@ LLM_OLLAMA_MODEL=phi4-mini:latest
|
|||||||
LOG_LEVEL=DEBUG
|
LOG_LEVEL=DEBUG
|
||||||
# Ollama API Key (required when using Ollama mode)
|
# Ollama API Key (required when using Ollama mode)
|
||||||
# Langfuse configuration
|
# Langfuse configuration
|
||||||
LANGFUSE_ENABLED=true
|
LANGFUSE_ENABLED=false
|
||||||
LANGFUSE_PUBLIC_KEY="pk-lf-17dfde63-93e2-4983-8aa7-2673d3ecaab8"
|
LANGFUSE_PUBLIC_KEY="pk-lf-17dfde63-93e2-4983-8aa7-2673d3ecaab8"
|
||||||
LANGFUSE_SECRET_KEY="sk-lf-ba41a266-6fe5-4c90-a483-bec8a7aaa321"
|
LANGFUSE_SECRET_KEY="sk-lf-ba41a266-6fe5-4c90-a483-bec8a7aaa321"
|
||||||
LANGFUSE_HOST="https://cloud.langfuse.com"
|
LANGFUSE_HOST="https://cloud.langfuse.com"
|
||||||
@ -6,49 +6,65 @@ from llm.models import JiraWebhookPayload
|
|||||||
from shared_store import requests_queue, ProcessingRequest
|
from shared_store import requests_queue, ProcessingRequest
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
router = APIRouter(
|
jira_router = APIRouter(
|
||||||
prefix="/api",
|
prefix="/jira",
|
||||||
tags=["API"]
|
tags=["Jira"]
|
||||||
)
|
)
|
||||||
|
|
||||||
webhook_router = APIRouter(
|
queue_router = APIRouter(
|
||||||
prefix="/webhooks",
|
prefix="/queue",
|
||||||
tags=["Webhooks"]
|
tags=["Queue"]
|
||||||
)
|
)
|
||||||
|
|
||||||
@router.post("/jira_webhook", status_code=201)
|
@jira_router.post("/sendRequest", status_code=201)
|
||||||
async def receive_jira_webhook(payload: JiraWebhookPayload):
|
async def send_jira_request(payload: JiraWebhookPayload):
|
||||||
"""Handle incoming Jira webhook and store request"""
|
"""Send requests to add to queue for further processing"""
|
||||||
request_id = requests_queue.add_request(payload.model_dump())
|
request_id = requests_queue.add_request(payload.model_dump())
|
||||||
return {"request_id": request_id}
|
return {"request_id": request_id}
|
||||||
|
|
||||||
@router.get("/pending_requests")
|
class GetResponseRequest(BaseModel):
|
||||||
async def get_pending_requests():
|
issueKey: str
|
||||||
"""Return all pending requests"""
|
|
||||||
|
@jira_router.post("/getResponse")
|
||||||
|
async def get_jira_response(request: GetResponseRequest):
|
||||||
|
"""Get response attribute provided by ollama for a given issueKey."""
|
||||||
|
matched_request = requests_queue.get_latest_completed_by_issue_key(request.issueKey)
|
||||||
|
if not matched_request:
|
||||||
|
raise HTTPException(status_code=404, detail=f"No completed request found for issueKey: {request.issueKey}")
|
||||||
|
return matched_request.response if matched_request.response else "No response yet"
|
||||||
|
|
||||||
|
# @queue_router.get("/{issueKey}")
|
||||||
|
# async def get_queue_element_by_issue_key(issueKey: str):
|
||||||
|
# """Get the element with specific issueKey. Return latest which was successfully processed by ollama. Skip pending or failed."""
|
||||||
|
# matched_request = requests_queue.get_latest_completed_by_issue_key(issueKey)
|
||||||
|
# if not matched_request:
|
||||||
|
# raise HTTPException(status_code=404, detail=f"No completed request found for issueKey: {issueKey}")
|
||||||
|
# return matched_request
|
||||||
|
|
||||||
|
@queue_router.get("/getAll")
|
||||||
|
async def get_all_requests_in_queue():
|
||||||
|
"""Gets all requests"""
|
||||||
|
all_requests = requests_queue.get_all_requests()
|
||||||
|
return {"requests": all_requests}
|
||||||
|
|
||||||
|
@queue_router.get("/getPending")
|
||||||
|
async def get_pending_requests_in_queue():
|
||||||
|
"""Gets all requests waiting to be processed"""
|
||||||
all_requests = requests_queue.get_all_requests()
|
all_requests = requests_queue.get_all_requests()
|
||||||
pending = [req for req in all_requests if req.status == "pending"]
|
pending = [req for req in all_requests if req.status == "pending"]
|
||||||
return {"requests": pending}
|
return {"requests": pending}
|
||||||
|
|
||||||
@router.delete("/requests/{request_id}")
|
@queue_router.delete("/clearAll")
|
||||||
async def delete_specific_request(request_id: int):
|
async def clear_all_requests_in_queue():
|
||||||
"""Delete specific request by ID"""
|
"""Clear all the requests from the queue"""
|
||||||
if requests_queue.delete_request_by_id(request_id):
|
|
||||||
return {"deleted": True}
|
|
||||||
raise HTTPException(status_code=404, detail="Request not found")
|
|
||||||
|
|
||||||
@router.delete("/requests")
|
|
||||||
async def delete_all_requests():
|
|
||||||
"""Clear all requests"""
|
|
||||||
requests_queue.clear_all_requests()
|
requests_queue.clear_all_requests()
|
||||||
return {"status": "cleared"}
|
return {"status": "cleared"}
|
||||||
|
|
||||||
@router.get("/requests/{request_id}/response")
|
# Original webhook_router remains unchanged for now, as it's not part of the /jira or /queue prefixes
|
||||||
async def get_request_response(request_id: int):
|
webhook_router = APIRouter(
|
||||||
"""Get response for specific request"""
|
prefix="/webhooks",
|
||||||
matched_request = requests_queue.get_request_by_id(request_id)
|
tags=["Webhooks"]
|
||||||
if not matched_request:
|
)
|
||||||
raise HTTPException(status_code=404, detail="Request not found")
|
|
||||||
return matched_request.response if matched_request.response else "No response yet"
|
|
||||||
|
|
||||||
@webhook_router.post("/jira")
|
@webhook_router.post("/jira")
|
||||||
async def handle_jira_webhook():
|
async def handle_jira_webhook():
|
||||||
|
|||||||
@ -21,8 +21,8 @@ llm:
|
|||||||
# Settings for Ollama
|
# Settings for Ollama
|
||||||
ollama:
|
ollama:
|
||||||
# Can be overridden by OLLAMA_BASE_URL
|
# Can be overridden by OLLAMA_BASE_URL
|
||||||
base_url: "http://192.168.0.140:11434"
|
#base_url: "http://192.168.0.140:11434"
|
||||||
# base_url: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
|
base_url: "https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama"
|
||||||
|
|
||||||
|
|
||||||
# Can be overridden by OLLAMA_MODEL
|
# Can be overridden by OLLAMA_MODEL
|
||||||
@ -47,7 +47,7 @@ langfuse:
|
|||||||
processor:
|
processor:
|
||||||
# Interval in seconds between polling for new Jira analysis requests
|
# Interval in seconds between polling for new Jira analysis requests
|
||||||
# Can be overridden by PROCESSOR_POLL_INTERVAL_SECONDS environment variable
|
# Can be overridden by PROCESSOR_POLL_INTERVAL_SECONDS environment variable
|
||||||
poll_interval_seconds: 30
|
poll_interval_seconds: 10
|
||||||
|
|
||||||
# Maximum number of retries for failed Jira analysis requests
|
# Maximum number of retries for failed Jira analysis requests
|
||||||
# Can be overridden by PROCESSOR_MAX_RETRIES environment variable
|
# Can be overridden by PROCESSOR_MAX_RETRIES environment variable
|
||||||
|
|||||||
@ -24,7 +24,7 @@ from loguru import logger
|
|||||||
from shared_store import RequestStatus, requests_queue, ProcessingRequest
|
from shared_store import RequestStatus, requests_queue, ProcessingRequest
|
||||||
from llm.models import JiraWebhookPayload
|
from llm.models import JiraWebhookPayload
|
||||||
from llm.chains import analysis_chain, validate_response
|
from llm.chains import analysis_chain, validate_response
|
||||||
from app.handlers import router, webhook_router # Import from unified handlers
|
from app.handlers import jira_router, queue_router, webhook_router # Import new routers
|
||||||
from config import settings
|
from config import settings
|
||||||
|
|
||||||
async def process_single_jira_request(request: ProcessingRequest):
|
async def process_single_jira_request(request: ProcessingRequest):
|
||||||
@ -124,7 +124,8 @@ def create_app():
|
|||||||
|
|
||||||
# Include routers
|
# Include routers
|
||||||
_app.include_router(webhook_router)
|
_app.include_router(webhook_router)
|
||||||
_app.include_router(router)
|
_app.include_router(jira_router)
|
||||||
|
_app.include_router(queue_router)
|
||||||
|
|
||||||
# Add health check endpoint
|
# Add health check endpoint
|
||||||
@_app.get("/health")
|
@_app.get("/health")
|
||||||
|
|||||||
@ -1,29 +0,0 @@
|
|||||||
import unittest
|
|
||||||
from llm.chains import load_prompt_template, validate_response
|
|
||||||
from llm.models import AnalysisFlags
|
|
||||||
|
|
||||||
class PromptTests(unittest.TestCase):
|
|
||||||
def test_prompt_loading(self):
|
|
||||||
"""Test that prompt template loads correctly"""
|
|
||||||
try:
|
|
||||||
template = load_prompt_template()
|
|
||||||
self.assertIsNotNone(template)
|
|
||||||
self.assertIn("issueKey", template.input_variables)
|
|
||||||
except Exception as e:
|
|
||||||
self.fail(f"Prompt loading failed: {str(e)}")
|
|
||||||
|
|
||||||
def test_response_validation(self):
|
|
||||||
"""Test response validation logic"""
|
|
||||||
valid_response = {
|
|
||||||
"hasMultipleEscalations": False,
|
|
||||||
"customerSentiment": "neutral"
|
|
||||||
}
|
|
||||||
invalid_response = {
|
|
||||||
"customerSentiment": "neutral"
|
|
||||||
}
|
|
||||||
|
|
||||||
self.assertTrue(validate_response(valid_response))
|
|
||||||
self.assertFalse(validate_response(invalid_response))
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
||||||
@ -1,5 +1,5 @@
|
|||||||
fastapi==0.111.0
|
fastapi==0.111.0
|
||||||
pydantic==2.7.1
|
pydantic==2.7.4
|
||||||
pydantic-settings>=2.0.0
|
pydantic-settings>=2.0.0
|
||||||
langchain>=0.1.0
|
langchain>=0.1.0
|
||||||
langchain-ollama>=0.1.0
|
langchain-ollama>=0.1.0
|
||||||
|
|||||||
@ -66,6 +66,20 @@ class RequestQueue:
|
|||||||
with self._processing_lock:
|
with self._processing_lock:
|
||||||
return next((req for req in self._requests if req.id == request_id), None)
|
return next((req for req in self._requests if req.id == request_id), None)
|
||||||
|
|
||||||
|
def get_latest_completed_by_issue_key(self, issue_key: str) -> Optional[ProcessingRequest]:
|
||||||
|
"""
|
||||||
|
Retrieves the latest successfully completed request for a given issue key.
|
||||||
|
Skips pending or failed requests.
|
||||||
|
"""
|
||||||
|
with self._processing_lock:
|
||||||
|
completed_requests = [
|
||||||
|
req for req in self._requests
|
||||||
|
if req.payload.get("issueKey") == issue_key and req.status == RequestStatus.COMPLETED
|
||||||
|
]
|
||||||
|
# Sort by completed_at to get the latest, if available
|
||||||
|
completed_requests.sort(key=lambda req: req.completed_at if req.completed_at else datetime.min, reverse=True)
|
||||||
|
return completed_requests[0] if completed_requests else None
|
||||||
|
|
||||||
def delete_request_by_id(self, request_id: int) -> bool:
|
def delete_request_by_id(self, request_id: int) -> bool:
|
||||||
"""Deletes a specific request by its ID."""
|
"""Deletes a specific request by its ID."""
|
||||||
with self._processing_lock:
|
with self._processing_lock:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user