from typing import List, Dict, Optional, Any # Import Any import threading from datetime import datetime, timezone from enum import Enum class RequestStatus(str, Enum): PENDING = "pending" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" TIMEOUT = "timeout" # Thread-safe storage for requests and responses from queue import Queue from dataclasses import dataclass, field @dataclass class ProcessingRequest: id: int payload: Dict status: RequestStatus = RequestStatus.PENDING created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) started_at: Optional[datetime] = None completed_at: Optional[datetime] = None error: Optional[str] = None retry_count: int = 0 response: Optional[Any] = None # Add response field class RequestQueue: def __init__(self): self._queue: Queue[ProcessingRequest] = Queue() self._requests: List[ProcessingRequest] = [] # To store all requests for retrieval self._processing_lock = threading.Lock() self._id_lock = threading.Lock() self._current_id = 0 def _get_next_id(self) -> int: """Generate and return the next available request ID""" with self._id_lock: self._current_id += 1 return self._current_id def add_request(self, payload: Dict) -> int: """Adds a new request to the queue and returns its ID""" request_id = self._get_next_id() request = ProcessingRequest(id=request_id, payload=payload) self._queue.put(request) with self._processing_lock: # Protect access to _requests list self._requests.append(request) return request_id def get_next_request(self) -> Optional[ProcessingRequest]: """Fetches the next available request from the queue""" with self._processing_lock: if not self._queue.empty(): return self._queue.get() return None def get_all_requests(self) -> List[ProcessingRequest]: """Returns a list of all requests currently in the store.""" with self._processing_lock: return list(self._requests) # Return a copy to prevent external modification def get_request_by_id(self, request_id: int) -> Optional[ProcessingRequest]: """Retrieves a specific request by its ID.""" with self._processing_lock: return next((req for req in self._requests if req.id == request_id), None) def get_latest_completed_by_issue_key(self, issue_key: str) -> Optional[ProcessingRequest]: """ Retrieves the latest successfully completed request for a given issue key. Skips pending or failed requests. """ with self._processing_lock: completed_requests = [ req for req in self._requests if req.payload.get("issueKey") == issue_key and req.status == RequestStatus.COMPLETED ] # Sort by completed_at to get the latest, if available completed_requests.sort(key=lambda req: req.completed_at if req.completed_at else datetime.min, reverse=True) return completed_requests[0] if completed_requests else None def are_there_any_open_requests_for_issue_key(self, issue_key: str) -> bool: """ Checks if there are any pending or processing requests for a given issue key. """ with self._processing_lock: for req in self._requests: if req.payload.get("issueKey") == issue_key and \ (req.status == RequestStatus.PENDING or req.status == RequestStatus.PROCESSING): return True return False def delete_request_by_id(self, request_id: int) -> bool: """Deletes a specific request by its ID.""" with self._processing_lock: initial_length = len(self._requests) self._requests = [req for req in self._requests if req.id != request_id] return len(self._requests) < initial_length def clear_all_requests(self): """Clears all requests from the store.""" with self._processing_lock: self._requests.clear() # Clear the queue as well, though it's generally processed while not self._queue.empty(): try: self._queue.get_nowait() except Exception: continue def task_done(self): """Indicates that a formerly enqueued task is complete.""" self._queue.task_done() def join(self): """Blocks until all items in the queue have been gotten and processed.""" self._queue.join() requests_queue = RequestQueue()