From 9e698e40f96bd56765ec3392bce24543ac71bcc5 Mon Sep 17 00:00:00 2001 From: Ireneusz Bachanowicz Date: Tue, 22 Jul 2025 00:41:17 +0200 Subject: [PATCH] STABLE feat: Implement Gemini integration; update configuration for Gemini API and model; enhance Jira webhook processing; refactor application structure and dependencies --- .env | 12 ++++-- .gitignore | 1 + app/handlers.py | 30 --------------- config.py | 46 ++++++++++++++++------- config/application.yml | 25 +++++++++++-- llm/chains.py | 59 +++++++++++++++++++++--------- llm/jira_analysis_v1.2.0.txt | 67 ++++++++++++++++++++++++++++------ llm/models.py | 50 ++++++++++++------------- jira_webhook_llm.py => main.py | 43 ++++++++++++++++++++-- requirements.txt | 12 +++--- shared_store.py | 5 ++- 11 files changed, 234 insertions(+), 116 deletions(-) rename jira_webhook_llm.py => main.py (79%) diff --git a/.env b/.env index b7e101c..676b715 100644 --- a/.env +++ b/.env @@ -1,7 +1,7 @@ # Ollama configuration -# LLM_OLLAMA_BASE_URL=http://192.168.0.140:11434 +LLM_OLLAMA_BASE_URL=http://192.168.0.140:11434 # LLM_OLLAMA_BASE_URL=http://192.168.0.122:11434 -LLM_OLLAMA_BASE_URL="https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama" +# LLM_OLLAMA_BASE_URL="https://api-amer-sandbox-gbl-mdm-hub.pfizer.com/ollama" LLM_OLLAMA_MODEL=phi4-mini:latest # LLM_OLLAMA_MODEL=smollm:360m # LLM_OLLAMA_MODEL=qwen3:0.6b @@ -10,7 +10,11 @@ LLM_OLLAMA_MODEL=phi4-mini:latest LOG_LEVEL=DEBUG # Ollama API Key (required when using Ollama mode) # Langfuse configuration -LANGFUSE_ENABLED=false +LANGFUSE_ENABLED=true LANGFUSE_PUBLIC_KEY="pk-lf-17dfde63-93e2-4983-8aa7-2673d3ecaab8" LANGFUSE_SECRET_KEY="sk-lf-ba41a266-6fe5-4c90-a483-bec8a7aaa321" -LANGFUSE_HOST="https://cloud.langfuse.com" \ No newline at end of file +LANGFUSE_HOST="https://cloud.langfuse.com" +# Gemini configuration +LLM_GEMINI_API_KEY="AIzaSyDl12gxyTf2xCaTbT6OMJg0I-Rc82Ib77c" +LLM_GEMINI_MODEL="gemini-2.5-flash" +LLM_MODE=gemini diff --git a/.gitignore b/.gitignore index b369363..f1d38a5 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ venv/ *.egg-info/ build/ dist/ +.roo/* # Editor files (e.g., Visual Studio Code, Sublime Text, Vim) .vscode/ diff --git a/app/handlers.py b/app/handlers.py index c26c6d5..7b9de3e 100644 --- a/app/handlers.py +++ b/app/handlers.py @@ -33,13 +33,6 @@ async def get_jira_response(request: GetResponseRequest): raise HTTPException(status_code=404, detail=f"No completed request found for issueKey: {request.issueKey}") return matched_request.response if matched_request.response else "No response yet" -# @queue_router.get("/{issueKey}") -# async def get_queue_element_by_issue_key(issueKey: str): -# """Get the element with specific issueKey. Return latest which was successfully processed by ollama. Skip pending or failed.""" -# matched_request = requests_queue.get_latest_completed_by_issue_key(issueKey) -# if not matched_request: -# raise HTTPException(status_code=404, detail=f"No completed request found for issueKey: {issueKey}") -# return matched_request @queue_router.get("/getAll") async def get_all_requests_in_queue(): @@ -59,26 +52,3 @@ async def clear_all_requests_in_queue(): """Clear all the requests from the queue""" requests_queue.clear_all_requests() return {"status": "cleared"} - -# Original webhook_router remains unchanged for now, as it's not part of the /jira or /queue prefixes -webhook_router = APIRouter( - prefix="/webhooks", - tags=["Webhooks"] -) - -@webhook_router.post("/jira") -async def handle_jira_webhook(): - return {"status": "webhook received"} - -@webhook_router.post("/ollama") -async def handle_ollama_webhook(request: Request): - """Handle incoming Ollama webhook and capture raw output""" - try: - raw_body = await request.body() - response_data = raw_body.decode('utf-8') - logger.info(f"Received raw Ollama webhook response: {response_data}") - # Here you would process the raw_body, e.g., store it or pass it to another component - return {"status": "ollama webhook received", "data": response_data} - except Exception as e: - logger.error(f"Error processing Ollama webhook: {e}") - raise HTTPException(status_code=500, detail=f"Error processing webhook: {e}") \ No newline at end of file diff --git a/config.py b/config.py index 32958ce..6cb94b5 100644 --- a/config.py +++ b/config.py @@ -1,4 +1,5 @@ import os +import logging import sys from typing import Optional from pydantic_settings import BaseSettings @@ -6,6 +7,7 @@ from langfuse._client.client import Langfuse from pydantic import field_validator from pydantic_settings import SettingsConfigDict import yaml +_logger = logging.getLogger(__name__) from pathlib import Path class LangfuseConfig(BaseSettings): @@ -33,10 +35,15 @@ class LLMConfig(BaseSettings): ollama_base_url: Optional[str] = None ollama_model: Optional[str] = None + # Gemini settings + gemini_api_key: Optional[str] = None + gemini_model: Optional[str] = None + gemini_api_base_url: Optional[str] = None # Add this for Gemini + @field_validator('mode') def validate_mode(cls, v): - if v not in ['openai', 'ollama']: - raise ValueError("LLM mode must be either 'openai' or 'ollama'") + if v not in ['openai', 'ollama', 'gemini']: # Add 'gemini' + raise ValueError("LLM mode must be 'openai', 'ollama', or 'gemini'") return v model_config = SettingsConfigDict( @@ -46,15 +53,6 @@ class LLMConfig(BaseSettings): extra='ignore' ) -class ApiConfig(BaseSettings): - api_key: Optional[str] = None - - model_config = SettingsConfigDict( - env_prefix='API_', - env_file='.env', - env_file_encoding='utf-8', - extra='ignore' - ) class ProcessorConfig(BaseSettings): poll_interval_seconds: int = 10 @@ -75,8 +73,23 @@ class Settings: yaml_config = self._load_yaml_config() # Initialize configurations - self.llm = LLMConfig(**yaml_config.get('llm', {})) - self.api = ApiConfig(**yaml_config.get('api', {})) + llm_config_data = yaml_config.get('llm', {}) + + # Extract and flatten nested LLM configurations + mode = llm_config_data.get('mode', 'ollama') + openai_settings = llm_config_data.get('openai') or {} + ollama_settings = llm_config_data.get('ollama') or {} + gemini_settings = llm_config_data.get('gemini') or {} # New: Get Gemini settings + + # Combine all LLM settings, prioritizing top-level 'mode' + combined_llm_settings = { + 'mode': mode, + **{f'openai_{k}': v for k, v in openai_settings.items()}, + **{f'ollama_{k}': v for k, v in ollama_settings.items()}, + **{f'gemini_{k}': v for k, v in gemini_settings.items()} # New: Add Gemini settings + } + + self.llm = LLMConfig(**combined_llm_settings) self.processor = ProcessorConfig(**yaml_config.get('processor', {})) self.langfuse = LangfuseConfig(**yaml_config.get('langfuse', {})) @@ -90,7 +103,7 @@ class Settings: host=self.langfuse.host ) else: - print("Langfuse is enabled but missing one or more of LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, or LANGFUSE_HOST. Langfuse client will not be initialized.") + _logger.warning("Langfuse is enabled but missing one or more of LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, or LANGFUSE_HOST. Langfuse client will not be initialized.") self._validate() @@ -121,6 +134,11 @@ class Settings: raise ValueError("OLLAMA_BASE_URL is not set.") if not self.llm.ollama_model: raise ValueError("OLLAMA_MODEL is not set.") + elif self.llm.mode == 'gemini': # New: Add validation for Gemini mode + if not self.llm.gemini_api_key: + raise ValueError("GEMINI_API_KEY is not set.") + if not self.llm.gemini_model: + raise ValueError("GEMINI_MODEL is not set.") # Create settings instance settings = Settings() \ No newline at end of file diff --git a/config/application.yml b/config/application.yml index dbbb4a3..3d56b38 100644 --- a/config/application.yml +++ b/config/application.yml @@ -3,20 +3,37 @@ llm: # The mode to run the application in. # Can be 'openai' or 'ollama'. # This can be overridden by the LLM_MODE environment variable. - mode: ollama + mode: gemini # Change mode to gemini # Settings for OpenAI-compatible APIs (like OpenRouter) openai: # It's HIGHLY recommended to set this via an environment variable # instead of saving it in this file. # Can be overridden by OPENAI_API_KEY - api_key: "sk-or-v1-..." + # api_key: "sk-or-v1-..." + # api_key: "your-openai-api-key" # Keep this commented out or set to a placeholder # Can be overridden by OPENAI_API_BASE_URL - api_base_url: "https://openrouter.ai/api/v1" + # api_base_url: "https://openrouter.ai/api/v1" + # api_base_url: "https://api.openai.com/v1" # Remove or comment out this line # Can be overridden by OPENAI_MODEL - model: "deepseek/deepseek-chat:free" + # model: "deepseek/deepseek-chat:free" + # model: "gpt-4o" # Keep this commented out or set to a placeholder + + # Settings for Gemini + gemini: + # It's HIGHLY recommended to set this via an environment variable + # instead of saving it in this file. + # Can be overridden by GEMINI_API_KEY + api_key: "AIzaSyDl12gxyTf2xCaTbT6OMJg0I-Rc82Ib77c" # Move from openai + + # Can be overridden by GEMINI_MODEL + # model: "gemini-2.5-flash" + model: "gemini-2.5-flash-lite-preview-06-17" + + # Can be overridden by GEMINI_API_BASE_URL + api_base_url: "https://generativelanguage.googleapis.com/v1beta/" # Add for Gemini # Settings for Ollama ollama: diff --git a/llm/chains.py b/llm/chains.py index d179248..3eee15e 100644 --- a/llm/chains.py +++ b/llm/chains.py @@ -13,6 +13,7 @@ from langchain_core.output_parsers import JsonOutputParser from langchain_core.runnables import RunnablePassthrough, Runnable from langchain_ollama import OllamaLLM from langchain_openai import ChatOpenAI +from langchain_google_genai import ChatGoogleGenerativeAI # New import for Gemini from loguru import logger from llm.models import AnalysisFlags @@ -25,7 +26,7 @@ class LLMInitializationError(Exception): self.details = details # Initialize LLM -llm: Union[ChatOpenAI, OllamaLLM, None] = None +llm: Union[ChatOpenAI, OllamaLLM, ChatGoogleGenerativeAI, None] = None # Add ChatGoogleGenerativeAI if settings.llm.mode == 'openai': logger.info(f"Initializing ChatOpenAI with model: {settings.llm.openai_model}") llm = ChatOpenAI( @@ -80,6 +81,45 @@ elif settings.llm.mode == 'ollama': "\n3. The model is available", details=details ) from e +elif settings.llm.mode == 'gemini': # New: Add Gemini initialization + logger.info(f"Initializing ChatGoogleGenerativeAI with model: {settings.llm.gemini_model}") + try: + if not settings.llm.gemini_api_key: + raise ValueError("Gemini API key is not configured") + if not settings.llm.gemini_model: + raise ValueError("Gemini model is not specified") + + llm = ChatGoogleGenerativeAI( + model=settings.llm.gemini_model, + temperature=0.7, + max_tokens=2000, + google_api_key=settings.llm.gemini_api_key + ) + + # Test connection only if not in a test environment + import os + if os.getenv("IS_TEST_ENV") != "true": + logger.debug("Testing Gemini connection...") + llm.invoke("test") # Simple test request + logger.info("Gemini connection established successfully") + else: + logger.info("Skipping Gemini connection test in test environment.") + + except Exception as e: + error_msg = f"Failed to initialize Gemini: {str(e)}" + details = { + 'model': settings.llm.gemini_model, + 'error_type': type(e).__name__ + } + logger.error(error_msg) + logger.debug(f"Connection details: {details}") + raise LLMInitializationError( + "Failed to connect to Gemini service. Please check:" + "\n1. GEMINI_API_KEY is correct" + "\n2. GEMINI_MODEL is correct and accessible" + "\n3. Network connectivity to Gemini API", + details=details + ) from e if llm is None: logger.error("LLM could not be initialized. Exiting.") @@ -147,23 +187,10 @@ def create_analysis_chain(): | parser ) - # Add langfuse handler if enabled and available (assuming settings.langfuse_handler is set up elsewhere) - # if settings.langfuse.enabled and hasattr(settings, 'langfuse_handler'): - # chain = chain.with_config( - # callbacks=[settings.langfuse_handler] - # ) - return chain except Exception as e: logger.warning(f"Using fallback prompt due to error: {str(e)}") chain = FALLBACK_PROMPT | llm_runnable # Use the explicitly typed runnable - - # Add langfuse handler if enabled and available (assuming settings.langfuse_handler is set up elsewhere) - # if settings.langfuse.enabled and hasattr(settings, 'langfuse_handler'): - # chain = chain.with_config( - # callbacks=[settings.langfuse_handler] - # ) - return chain # Initialize analysis chain @@ -173,10 +200,6 @@ analysis_chain = create_analysis_chain() def validate_response(response: Union[dict, str], issue_key: str = "N/A") -> bool: """Validate the JSON response structure and content""" try: - # If LLM mode is Ollama, skip detailed validation and return raw output - if settings.llm.mode == 'ollama': - logger.info(f"[{issue_key}] Ollama mode detected. Skipping detailed response validation. Raw response: {response}") - return True # If response is a string, attempt to parse it as JSON if isinstance(response, str): diff --git a/llm/jira_analysis_v1.2.0.txt b/llm/jira_analysis_v1.2.0.txt index 018614a..7ea8e3b 100644 --- a/llm/jira_analysis_v1.2.0.txt +++ b/llm/jira_analysis_v1.2.0.txt @@ -1,17 +1,64 @@ SYSTEM: -You are an AI assistant designed to analyze Jira ticket details containing email correspondence and extract key flags and sentiment, -outputting information in a strict JSON format. +You are a precise AI assistant that analyzes Jira tickets and outputs a JSON object. +Your task is to analyze the provided Jira ticket data and generate a JSON object based on the rules below. +Your output MUST be ONLY the JSON object, with no additional text or explanations. -Your output MUST be ONLY a valid JSON object. Do NOT include any conversational text, explanations, or markdown outside the JSON. +## JSON Output Schema +{format_instructions} -The JSON structure MUST follow this exact schema. If a field cannot be determined, use `null` for strings/numbers or empty list `[]` for arrays. +## Field-by-Field Instructions -- Determine if there are signs of multiple questions attempts asking to respond, and provide information from MDM HUB team. Questions directed to other teams are not considered. --- Usually multiple requests one after another in span of days asking for immediate help of HUB team. Normal discussion, responses back and forth, are not considered as an escalation. -- Assess if the issue requires urgent attention based on language or context from the summary, description, or latest comment. --- Usually means that Customer is asking for help due to upcoming deadlines, other high priority issues which are blocked due to our stall. +### `hasMultipleEscalations` (boolean) +- Set to `true` ONLY if the user has made multiple requests for help from the "MDM HUB team" without getting a response. +- A normal back-and-forth conversation is NOT an escalation. + +### `customerSentiment` (string: "neutral", "frustrated", "calm") +- Set to `"frustrated"` if the user mentions blockers, deadlines, or uses urgent language (e.g., "urgent", "asap", "blocked"). +- Set to `"calm"` if the language is polite and patient. +- Set to `"neutral"` otherwise. + +### `issueCategory` (string: "technical_issue", "data_request", "access_problem", "general_question", "other") +- `"technical_issue"`: Errors, bugs, system failures, API problems. +- `"data_request"`: Asking for data exports, reports, or information retrieval. +- `"access_problem"`: User cannot log in, has permission issues. +- `"general_question"`: "How do I..." or other general inquiries. +- `"other"`: If it doesn't fit any other category. + +### `area` (string) +- Classify the ticket into ONE of the following areas based on keywords: +- `"Direct Channel"`: "REST API", "API Gateway", "Create/Update HCP/HCO" +- `"Streaming Channel"`: "Kafka", "SQS", "Reltio events", "Snowflake" +- `"Java Batch Channel"`: "Batch", "File loader", "Airflow" +- `"ETL Batch Channel"`: "ETL", "Informatica" +- `"DCR Service"`: "DCR", "PforceRx", "OneKey", "Veeva" +- `"API Gateway"`: "Kong", "authentication", "routing" +- `"Callback Service"`: "Callback", "HCO names", "ranking" +- `"Publisher"`: "Publisher", "routing rules" +- `"Reconciliation"`: "Reconciliation", "sync" +- `"Snowflake"`: "Snowflake", "Data Mart", "SQL" +- `"Authentication"`: "PingFederate", "OAuth2", "Key-Auth" +- `"Other"`: If it doesn't fit any other category. + +## Example + +### Input: +- Summary: "DCR Rejected by OneKey" +- Description: "Our DCR for PforceRx was rejected by OneKey. Can the MDM HUB team investigate?" +- Comment: "" + +### Output: +```json +{{ + "Hasmultipleescalations": false, + "CustomerSentiment": "neutral", + "IssueCategory": "technical_issue", + "Area": "DCR Service" +}} +``` USER: +Analyze the following Jira ticket: + Issue Key: {issueKey} Summary: {summary} Description: {description} @@ -19,6 +66,4 @@ Status: {status} Existing Labels: {labels} Assignee: {assignee} Last Updated: {updated} -Latest Comment (if applicable): {comment} - -{format_instructions} \ No newline at end of file +Latest Comment (if applicable): {comment} \ No newline at end of file diff --git a/llm/models.py b/llm/models.py index 35f5123..089535d 100644 --- a/llm/models.py +++ b/llm/models.py @@ -13,7 +13,28 @@ class CustomerSentiment(str, Enum): NEUTRAL = "neutral" FRUSTRATED = "frustrated" CALM = "calm" - # Add other sentiments as needed + +class IssueCategory(str, Enum): + TECHNICAL_ISSUE = "technical_issue" + DATA_REQUEST = "data_request" + ACCESS_PROBLEM = "access_problem" + GENERAL_QUESTION = "general_question" + OTHER = "other" + +# New: Add an Enum for technical areas based on Confluence doc +class Area(str, Enum): + DIRECT_CHANNEL = "Direct Channel" + STREAMING_CHANNEL = "Streaming Channel" + JAVA_BATCH_CHANNEL = "Java Batch Channel" + ETL_BATCH_CHANNEL = "ETL Batch Channel" + DCR_SERVICE = "DCR Service" + API_GATEWAY = "API Gateway" + CALLBACK_SERVICE = "Callback Service" + PUBLISHER = "Publisher" + RECONCILIATION = "Reconciliation" + SNOWFLAKE = "Snowflake" + AUTHENTICATION = "Authentication" + OTHER = "Other" class JiraWebhookPayload(BaseModel): model_config = ConfigDict(alias_generator=lambda x: ''.join(word.capitalize() if i > 0 else word for i, word in enumerate(x.split('_'))), populate_by_name=True) @@ -38,31 +59,10 @@ class JiraWebhookPayload(BaseModel): class AnalysisFlags(BaseModel): hasMultipleEscalations: bool = Field(alias="Hasmultipleescalations", description="Is there evidence of multiple escalation attempts?") customerSentiment: Optional[CustomerSentiment] = Field(alias="CustomerSentiment", description="Overall customer sentiment (e.g., 'neutral', 'frustrated', 'calm').") - model: Optional[str] = Field(None, alias="Model", description="The LLM model used for analysis.") + # New: Add category and area fields + issueCategory: IssueCategory = Field(alias="IssueCategory", description="The primary category of the Jira ticket.") + area: Area = Field(alias="Area", description="The technical area of the MDM HUB related to the issue.") - def __init__(self, **data): - super().__init__(**data) - - # Track model usage if Langfuse is enabled and client is available - if settings.langfuse.enabled and hasattr(settings, 'langfuse_client'): - try: - if settings.langfuse_client is None: - logger.warning("Langfuse client is None despite being enabled") - return - - settings.langfuse_client.start_span( # Use start_span - name="LLM Model Usage", - input=data, - metadata={ - "model": self.model, # Use the new model attribute - "analysis_flags": { - "hasMultipleEscalations": self.hasMultipleEscalations, - "customerSentiment": self.customerSentiment.value if self.customerSentiment else None - } - } - ).end() # End the trace immediately as it's just for tracking model usage - except Exception as e: - logger.error(f"Failed to track model usage: {e}") class JiraAnalysisResponse(BaseModel): model_config = ConfigDict(from_attributes=True) diff --git a/jira_webhook_llm.py b/main.py similarity index 79% rename from jira_webhook_llm.py rename to main.py index 6010a00..eb170c9 100644 --- a/jira_webhook_llm.py +++ b/main.py @@ -19,18 +19,37 @@ from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel from loguru import logger +from langfuse import Langfuse # Import Langfuse +from langfuse.langchain import CallbackHandler # Import CallbackHandler # Local application imports from shared_store import RequestStatus, requests_queue, ProcessingRequest from llm.models import JiraWebhookPayload from llm.chains import analysis_chain, validate_response -from app.handlers import jira_router, queue_router, webhook_router # Import new routers +from app.handlers import jira_router, queue_router # Import new routers from config import settings +# Initialize Langfuse client globally +langfuse_client = None +if settings.langfuse.enabled: + langfuse_client = Langfuse( + public_key=settings.langfuse.public_key, + secret_key=settings.langfuse.secret_key, + host=settings.langfuse.host + ) + logger.info("Langfuse client initialized.") +else: + logger.info("Langfuse integration is disabled.") + async def process_single_jira_request(request: ProcessingRequest): """Processes a single Jira webhook request using the LLM.""" payload = JiraWebhookPayload.model_validate(request.payload) + # Initialize Langfuse callback handler for this trace + langfuse_handler = None + if langfuse_client: + langfuse_handler = CallbackHandler() # No arguments needed for constructor + logger.bind( issue_key=payload.issueKey, request_id=request.id, @@ -49,7 +68,17 @@ async def process_single_jira_request(request: ProcessingRequest): } try: - raw_llm_response = await analysis_chain.ainvoke(llm_input) + # Pass the Langfuse callback handler to the ainvoke method + raw_llm_response = await analysis_chain.ainvoke( + llm_input, + config={ + "callbacks": [langfuse_handler], + "callbacks_extra": { + "session_id": str(request.id), + "trace_name": f"Jira-Analysis-{payload.issueKey}" + } + } if langfuse_handler else {} + ) # Store the raw LLM response request.response = raw_llm_response @@ -68,6 +97,10 @@ async def process_single_jira_request(request: ProcessingRequest): request.status = RequestStatus.FAILED request.error = str(e) raise + finally: + if langfuse_handler: + langfuse_client.flush() # Ensure all traces are sent + logger.debug(f"[{payload.issueKey}] Langfuse client flushed.") @asynccontextmanager async def lifespan(app: FastAPI): @@ -115,7 +148,11 @@ async def lifespan(app: FastAPI): logger.info("Application initialized with processing loop started") yield finally: + # Ensure all tasks are done before cancelling the processing loop + logger.info("Waiting for pending queue tasks to complete...") + requests_queue.join() task.cancel() + await task # Await the task to ensure it's fully cancelled and cleaned up logger.info("Processing loop terminated") def create_app(): @@ -123,7 +160,6 @@ def create_app(): _app = FastAPI(lifespan=lifespan) # Include routers - _app.include_router(webhook_router) _app.include_router(jira_router) _app.include_router(queue_router) @@ -172,3 +208,4 @@ class ErrorResponse(BaseModel): details: Optional[str] = None app = create_app() +app = create_app() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index b51f9e6..2885967 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,21 +1,21 @@ fastapi==0.111.0 pydantic==2.7.4 pydantic-settings>=2.0.0 -langchain>=0.1.0 +langchain>=0.2.0 langchain-ollama>=0.1.0 langchain-openai>=0.1.0 -langchain-core>=0.1.0 -langfuse>=3.0.0 +langchain-google-genai==2.1.8 +langchain-core>=0.3.68,<0.4.0 # Pin to the range required by langchain-google-genai +langfuse==3.2.1 uvicorn==0.30.1 python-multipart==0.0.9 # Good to include for FastAPI forms loguru==0.7.3 # Testing dependencies -unittest2>=1.1.0 -# Testing dependencies +# unittest2>=1.1.0 # Removed as it's an older backport pytest==8.2.0 pytest-asyncio==0.23.5 pytest-cov==4.1.0 httpx==0.27.0 -PyYAML>=6.0.2 +PyYAML==6.0.2 SQLAlchemy==2.0.30 alembic==1.13.1 \ No newline at end of file diff --git a/shared_store.py b/shared_store.py index 6059049..87e15f6 100644 --- a/shared_store.py +++ b/shared_store.py @@ -97,10 +97,13 @@ class RequestQueue: self._queue.get_nowait() except Exception: continue - self._queue.task_done() # Mark all tasks as done if clearing def task_done(self): """Indicates that a formerly enqueued task is complete.""" self._queue.task_done() + def join(self): + """Blocks until all items in the queue have been gotten and processed.""" + self._queue.join() + requests_queue = RequestQueue() \ No newline at end of file