jira-webhook-llm/shared_store.py
Ireneusz Bachanowicz d1fa9385e7
Some checks failed
CI/CD Pipeline / test (push) Has been cancelled
Release for qwen3:4b model
2025-08-01 18:15:57 +02:00

120 lines
4.6 KiB
Python

from typing import List, Dict, Optional, Any # Import Any
import threading
from datetime import datetime, timezone
from enum import Enum
class RequestStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
TIMEOUT = "timeout"
# Thread-safe storage for requests and responses
from queue import Queue
from dataclasses import dataclass, field
@dataclass
class ProcessingRequest:
id: int
payload: Dict
status: RequestStatus = RequestStatus.PENDING
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
error: Optional[str] = None
retry_count: int = 0
response: Optional[Any] = None # Add response field
class RequestQueue:
def __init__(self):
self._queue: Queue[ProcessingRequest] = Queue()
self._requests: List[ProcessingRequest] = [] # To store all requests for retrieval
self._processing_lock = threading.Lock()
self._id_lock = threading.Lock()
self._current_id = 0
def _get_next_id(self) -> int:
"""Generate and return the next available request ID"""
with self._id_lock:
self._current_id += 1
return self._current_id
def add_request(self, payload: Dict) -> int:
"""Adds a new request to the queue and returns its ID"""
request_id = self._get_next_id()
request = ProcessingRequest(id=request_id, payload=payload)
self._queue.put(request)
with self._processing_lock: # Protect access to _requests list
self._requests.append(request)
return request_id
def get_next_request(self) -> Optional[ProcessingRequest]:
"""Fetches the next available request from the queue"""
with self._processing_lock:
if not self._queue.empty():
return self._queue.get()
return None
def get_all_requests(self) -> List[ProcessingRequest]:
"""Returns a list of all requests currently in the store."""
with self._processing_lock:
return list(self._requests) # Return a copy to prevent external modification
def get_request_by_id(self, request_id: int) -> Optional[ProcessingRequest]:
"""Retrieves a specific request by its ID."""
with self._processing_lock:
return next((req for req in self._requests if req.id == request_id), None)
def get_latest_completed_by_issue_key(self, issue_key: str) -> Optional[ProcessingRequest]:
"""
Retrieves the latest successfully completed request for a given issue key.
Skips pending or failed requests.
"""
with self._processing_lock:
completed_requests = [
req for req in self._requests
if req.payload.get("issueKey") == issue_key and req.status == RequestStatus.COMPLETED
]
# Sort by completed_at to get the latest, if available
completed_requests.sort(key=lambda req: req.completed_at if req.completed_at else datetime.min, reverse=True)
return completed_requests[0] if completed_requests else None
def are_there_any_open_requests_for_issue_key(self, issue_key: str) -> bool:
"""
Checks if there are any pending or processing requests for a given issue key.
"""
with self._processing_lock:
for req in self._requests:
if req.payload.get("issueKey") == issue_key and \
(req.status == RequestStatus.PENDING or req.status == RequestStatus.PROCESSING):
return True
return False
def delete_request_by_id(self, request_id: int) -> bool:
"""Deletes a specific request by its ID."""
with self._processing_lock:
initial_length = len(self._requests)
self._requests = [req for req in self._requests if req.id != request_id]
return len(self._requests) < initial_length
def clear_all_requests(self):
"""Clears all requests from the store."""
with self._processing_lock:
self._requests.clear()
# Clear the queue as well, though it's generally processed
while not self._queue.empty():
try:
self._queue.get_nowait()
except Exception:
continue
def task_done(self):
"""Indicates that a formerly enqueued task is complete."""
self._queue.task_done()
def join(self):
"""Blocks until all items in the queue have been gotten and processed."""
self._queue.join()
requests_queue = RequestQueue()