120 lines
4.6 KiB
Python
120 lines
4.6 KiB
Python
from typing import List, Dict, Optional, Any # Import Any
|
|
import threading
|
|
from datetime import datetime, timezone
|
|
from enum import Enum
|
|
|
|
class RequestStatus(str, Enum):
|
|
PENDING = "pending"
|
|
PROCESSING = "processing"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
TIMEOUT = "timeout"
|
|
|
|
# Thread-safe storage for requests and responses
|
|
from queue import Queue
|
|
from dataclasses import dataclass, field
|
|
|
|
@dataclass
|
|
class ProcessingRequest:
|
|
id: int
|
|
payload: Dict
|
|
status: RequestStatus = RequestStatus.PENDING
|
|
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
|
started_at: Optional[datetime] = None
|
|
completed_at: Optional[datetime] = None
|
|
error: Optional[str] = None
|
|
retry_count: int = 0
|
|
response: Optional[Any] = None # Add response field
|
|
|
|
class RequestQueue:
|
|
def __init__(self):
|
|
self._queue: Queue[ProcessingRequest] = Queue()
|
|
self._requests: List[ProcessingRequest] = [] # To store all requests for retrieval
|
|
self._processing_lock = threading.Lock()
|
|
self._id_lock = threading.Lock()
|
|
self._current_id = 0
|
|
|
|
def _get_next_id(self) -> int:
|
|
"""Generate and return the next available request ID"""
|
|
with self._id_lock:
|
|
self._current_id += 1
|
|
return self._current_id
|
|
|
|
def add_request(self, payload: Dict) -> int:
|
|
"""Adds a new request to the queue and returns its ID"""
|
|
request_id = self._get_next_id()
|
|
request = ProcessingRequest(id=request_id, payload=payload)
|
|
self._queue.put(request)
|
|
with self._processing_lock: # Protect access to _requests list
|
|
self._requests.append(request)
|
|
return request_id
|
|
|
|
def get_next_request(self) -> Optional[ProcessingRequest]:
|
|
"""Fetches the next available request from the queue"""
|
|
with self._processing_lock:
|
|
if not self._queue.empty():
|
|
return self._queue.get()
|
|
return None
|
|
|
|
def get_all_requests(self) -> List[ProcessingRequest]:
|
|
"""Returns a list of all requests currently in the store."""
|
|
with self._processing_lock:
|
|
return list(self._requests) # Return a copy to prevent external modification
|
|
|
|
def get_request_by_id(self, request_id: int) -> Optional[ProcessingRequest]:
|
|
"""Retrieves a specific request by its ID."""
|
|
with self._processing_lock:
|
|
return next((req for req in self._requests if req.id == request_id), None)
|
|
|
|
def get_latest_completed_by_issue_key(self, issue_key: str) -> Optional[ProcessingRequest]:
|
|
"""
|
|
Retrieves the latest successfully completed request for a given issue key.
|
|
Skips pending or failed requests.
|
|
"""
|
|
with self._processing_lock:
|
|
completed_requests = [
|
|
req for req in self._requests
|
|
if req.payload.get("issueKey") == issue_key and req.status == RequestStatus.COMPLETED
|
|
]
|
|
# Sort by completed_at to get the latest, if available
|
|
completed_requests.sort(key=lambda req: req.completed_at if req.completed_at else datetime.min, reverse=True)
|
|
return completed_requests[0] if completed_requests else None
|
|
|
|
def are_there_any_open_requests_for_issue_key(self, issue_key: str) -> bool:
|
|
"""
|
|
Checks if there are any pending or processing requests for a given issue key.
|
|
"""
|
|
with self._processing_lock:
|
|
for req in self._requests:
|
|
if req.payload.get("issueKey") == issue_key and \
|
|
(req.status == RequestStatus.PENDING or req.status == RequestStatus.PROCESSING):
|
|
return True
|
|
return False
|
|
|
|
def delete_request_by_id(self, request_id: int) -> bool:
|
|
"""Deletes a specific request by its ID."""
|
|
with self._processing_lock:
|
|
initial_length = len(self._requests)
|
|
self._requests = [req for req in self._requests if req.id != request_id]
|
|
return len(self._requests) < initial_length
|
|
|
|
def clear_all_requests(self):
|
|
"""Clears all requests from the store."""
|
|
with self._processing_lock:
|
|
self._requests.clear()
|
|
# Clear the queue as well, though it's generally processed
|
|
while not self._queue.empty():
|
|
try:
|
|
self._queue.get_nowait()
|
|
except Exception:
|
|
continue
|
|
|
|
def task_done(self):
|
|
"""Indicates that a formerly enqueued task is complete."""
|
|
self._queue.task_done()
|
|
|
|
def join(self):
|
|
"""Blocks until all items in the queue have been gotten and processed."""
|
|
self._queue.join()
|
|
|
|
requests_queue = RequestQueue() |