From de4758a26f3548ea21a0807b0c0db17b04807a06 Mon Sep 17 00:00:00 2001 From: Ireneusz Bachanowicz Date: Wed, 16 Jul 2025 00:27:45 +0200 Subject: [PATCH] feat: Improve graceful shutdown handling and enhance application initialization logging --- jira-webhook-llm.py | 118 +++++++++++++++++++++++++++++++++----------- requirements.txt | 6 +-- 2 files changed, 92 insertions(+), 32 deletions(-) diff --git a/jira-webhook-llm.py b/jira-webhook-llm.py index e7ad818..903aa4f 100644 --- a/jira-webhook-llm.py +++ b/jira-webhook-llm.py @@ -11,7 +11,7 @@ import sys from typing import Optional from datetime import datetime import asyncio -from functools import wraps +from functools import wraps, partial from config import settings from webhooks.handlers import JiraWebhookHandler @@ -23,40 +23,104 @@ configure_logging(log_level="DEBUG") import signal -try: - app = FastAPI() - logger.info("FastAPI application initialized") +from contextlib import asynccontextmanager - @app.on_event("shutdown") - async def shutdown_event(): - """Handle application shutdown""" + + +# Setup async-compatible signal handling +def handle_shutdown_signal(signum, loop): + """Graceful shutdown signal handler""" + logger.info(f"Received signal {signum}, initiating shutdown...") + # Set shutdown flag and remove signal handlers to prevent reentrancy + if not hasattr(loop, '_shutdown'): + loop._shutdown = True + + # Prevent further signal handling + for sig in (signal.SIGTERM, signal.SIGINT): + loop.remove_signal_handler(sig) +@asynccontextmanager +async def lifespan(app: FastAPI): + """Handle startup and shutdown events""" + # Startup + try: + logger.info("Initializing application...") + + # Initialize event loop + loop = asyncio.get_running_loop() + logger.debug("Event loop initialized") + + # Setup signal handlers + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, partial(handle_shutdown_signal, sig, loop)) + logger.info("Signal handlers configured successfully") + + # Verify critical components + if not hasattr(settings, 'langfuse_handler'): + logger.error("Langfuse handler not found in settings") + raise RuntimeError("Langfuse handler not initialized") + + logger.info("Application initialized successfully") + yield + + # Check shutdown flag before cleanup + loop = asyncio.get_running_loop() + if hasattr(loop, '_shutdown'): + logger.info("Shutdown initiated, starting cleanup...") + except Exception as e: + logger.critical(f"Application initialization failed: {str(e)}") + raise + finally: + # Shutdown logger.info("Shutting down application...") try: - # Cleanup Langfuse client if exists + # Cleanup sequence with async safety + cleanup_tasks = [] + shutdown_success = True + + # Close langfuse with retry if hasattr(settings, 'langfuse_handler') and hasattr(settings.langfuse_handler, 'close'): + async def close_langfuse(): + try: + await asyncio.wait_for(settings.langfuse_handler.close(), timeout=5.0) + logger.info("Langfuse client closed successfully") + except asyncio.TimeoutError: + logger.warning("Timeout while closing Langfuse client") + except Exception as e: + logger.error(f"Error closing Langfuse client: {str(e)}") + cleanup_tasks.append(close_langfuse()) + + # Remove confirm_shutdown entirely + # Execute all cleanup tasks with timeout + try: + await asyncio.wait_for(asyncio.gather(*cleanup_tasks), timeout=10.0) + except asyncio.TimeoutError: + logger.warning("Timeout during cleanup sequence") + loop.stop() # Explicit loop stop after cleanup + # Cancel all pending tasks + async def cancel_pending_tasks(): try: - await settings.langfuse_handler.close() + pending = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + for task in pending: + task.cancel() + await asyncio.gather(*pending, return_exceptions=True) + logger.info("Pending tasks cancelled successfully") except Exception as e: - logger.warning(f"Error closing handler: {str(e)}") - logger.info("Cleanup completed successfully") + logger.error(f"Error cancelling pending tasks: {str(e)}") + cleanup_tasks.append(cancel_pending_tasks()) + + # Execute all cleanup tasks with timeout + try: + await asyncio.wait_for(asyncio.gather(*cleanup_tasks), timeout=10.0) + except asyncio.TimeoutError: + logger.warning("Timeout during cleanup sequence") + loop.stop() # Add explicit loop stop after cleanup + except Exception as e: logger.error(f"Error during shutdown: {str(e)}") raise - def handle_shutdown_signal(signum, frame): - """Handle OS signals for graceful shutdown""" - logger.info(f"Received signal {signum}, initiating shutdown...") - # Exit immediately after cleanup is complete - os._exit(0) - - # Register signal handlers - signal.signal(signal.SIGTERM, handle_shutdown_signal) - signal.signal(signal.SIGINT, handle_shutdown_signal) - -except Exception as e: - logger.critical(f"Failed to initialize FastAPI: {str(e)}") - logger.warning("Application cannot continue without FastAPI initialization") - sys.exit(1) +# Initialize FastAPI app after lifespan definition +app = FastAPI(lifespan=lifespan) def retry(max_retries: int = 3, delay: float = 1.0): """Decorator for retrying failed operations""" @@ -138,7 +202,3 @@ async def test_llm(): updated="2025-07-04T21:40:00Z" ) return await webhook_handler.handle_webhook(test_payload) - -# if __name__ == "__main__": -# import uvicorn -# uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 1fb9c85..d4771f7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ fastapi==0.111.0 -pydantic==2.9.0 # Changed from 2.7.4 to meet ollama's requirement -pydantic-settings==2.0.0 +pydantic>=2.9.0 +pydantic-settings>=2.0.0 langchain==0.3.26 langchain-ollama==0.3.3 langchain-openai==0.3.27 @@ -16,4 +16,4 @@ pytest==8.2.0 pytest-asyncio==0.23.5 pytest-cov==4.1.0 httpx==0.27.0 -PyYAML \ No newline at end of file +PyYAML>=6.0.2 \ No newline at end of file