Compare commits

..

1 Commits

Author SHA1 Message Date
730a5e7c69 some shit 2025-03-14 00:59:09 +01:00
21 changed files with 658 additions and 1608 deletions

View File

@ -1,112 +0,0 @@
# File Manager Enhancement Plan
This document outlines the plan to enhance the `my-app/utils/file_manager.py` script based on user feedback.
**Goals:**
1. Add support for loading configuration from a `config.yaml` file.
2. Implement a new action (`--move-cold`) to move inactive ("cold") files from fast storage back to slow storage based on modification time.
3. Add an `--interactive` flag to prompt for confirmation before moving files.
4. Implement a new action (`--generate-stats`) to create a JSON file containing storage statistics (file counts, sizes by age) for both source and target directories.
5. Calculate and log the total size of files being moved by the `--move-cold` action.
**Detailed Plan:**
1. **Configuration File (`config.yaml`):**
* **Goal:** Allow users to define common settings in a YAML file.
* **Implementation:**
* Define structure for `config.yaml` (e.g., `~/.config/file_manager/config.yaml` or specified via `--config`).
* Use `PyYAML` library (requires `pip install PyYAML`).
* Modify `parse_arguments` to load settings, allowing command-line overrides.
* Add `--config` argument.
2. **Move Cold Files Back (`--move-cold` action):**
* **Goal:** Move files from fast (target) to slow (source) storage if inactive.
* **Implementation:**
* Add action: `--move-cold`.
* Add argument: `--stale-days` (default 30, uses modification time `st_mtime`).
* New function `find_stale_files(directory, days)`: Scans `target_dir` based on `st_mtime`.
* New function `move_files_cold(relative_file_list, source_dir, target_dir, dry_run, interactive)`:
* Similar to `move_files`.
* Moves files from `target_dir` to `source_dir` using `rsync`.
* Handles paths relative to `target_dir`.
* Calculates and logs total size of files to be moved before `rsync`.
* Incorporates interactive confirmation.
3. **Interactive Confirmation (`--interactive` flag):**
* **Goal:** Add a safety check before moving files.
* **Implementation:**
* Add global flag: `--interactive`.
* Modify `move_files` and `move_files_cold`:
* If `--interactive` and not `--dry-run`:
* Log files/count.
* Use `input()` for user confirmation (`yes/no`).
* Proceed only on "yes".
4. **Enhanced Reporting/Stats File (`--generate-stats` action):**
* **Goal:** Create a persistent JSON file with storage statistics.
* **Implementation:**
* Add action: `--generate-stats`.
* Add argument: `--stats-file` (overrides config).
* New function `analyze_directory(directory)`:
* Walks directory, calculates total count/size, count/size by modification time brackets.
* Returns data as a dictionary.
* Modify `main` or create orchestrator for `--generate-stats`:
* Call `analyze_directory` for source and target.
* Combine results with a timestamp.
* Write dictionary to `stats_file` using `json`.
* **(Optional):** Modify `--summarize-unused` to potentially use the stats file.
**Workflow Visualization (Mermaid):**
```mermaid
graph TD
Start --> ReadConfig{Read config.yaml (Optional)}
ReadConfig --> ParseArgs[Parse Command Line Args]
ParseArgs --> ValidateArgs{Validate Args & Config}
ValidateArgs --> ActionRouter{Route based on Action}
ActionRouter -- --generate-stats --> AnalyzeSrc[Analyze Source Dir]
AnalyzeSrc --> AnalyzeTgt[Analyze Target Dir]
AnalyzeTgt --> WriteStatsFile[Write stats.json]
WriteStatsFile --> End
ActionRouter -- --move --> FindRecent[Find Recent Files (Source)]
FindRecent --> CheckInteractiveHot{Interactive?}
CheckInteractiveHot -- Yes --> ConfirmHot(Confirm Move Hot?)
CheckInteractiveHot -- No --> ExecuteMoveHot[Execute rsync Hot (Source->Target)]
ConfirmHot -- Yes --> ExecuteMoveHot
ConfirmHot -- No --> AbortHot(Abort Hot Move)
AbortHot --> End
ExecuteMoveHot --> End
ActionRouter -- --move-cold --> FindStale[Find Stale Files (Target)]
FindStale --> CalculateColdSize[Calculate Total Size of Cold Files]
CalculateColdSize --> CheckInteractiveCold{Interactive?}
CheckInteractiveCold -- Yes --> ConfirmCold(Confirm Move Cold?)
CheckInteractiveCold -- No --> ExecuteMoveCold[Execute rsync Cold (Target->Source)]
ConfirmCold -- Yes --> ExecuteMoveCold
ConfirmCold -- No --> AbortCold(Abort Cold Move)
AbortCold --> End
ExecuteMoveCold --> End
ActionRouter -- --count --> FindRecentForCount[Find Recent Files (Source)]
FindRecentForCount --> CountFiles[Log Count]
CountFiles --> End
ActionRouter -- --summarize-unused --> SummarizeUnused[Summarize Unused (Target)]
SummarizeUnused --> LogSummary[Log Summary]
LogSummary --> End
ActionRouter -- No Action/Error --> ShowHelp[Show Help / Error]
ShowHelp --> End
```
**Summary of Changes:**
* New dependencies: `PyYAML`.
* New command-line arguments: `--move-cold`, `--stale-days`, `--interactive`, `--generate-stats`, `--stats-file`, `--config`.
* New functions: `find_stale_files`, `move_files_cold`, `analyze_directory`.
* Modifications to existing functions: `parse_arguments`, `move_files`, `main`.
* Introduction of `config.yaml` for settings.
* Introduction of a JSON stats file for persistent reporting.

View File

@ -1,160 +0,0 @@
# Plan refaktoryzacji integracji OpenRouter
## Cel
Refaktoryzacja kodu w `resume_analysis.py` w celu eliminacji wszystkich zależności od OpenAI API i wykorzystania wyłącznie OpenRouter API, z poprawą obecnej implementacji połączenia z OpenRouter.
## Diagram przepływu zmian
```mermaid
graph TD
A[Obecna implementacja] --> B[Faza 1: Usunięcie zależności OpenAI]
B --> C[Faza 2: Refaktoryzacja klienta OpenRouter]
C --> D[Faza 3: Optymalizacja obsługi odpowiedzi]
D --> E[Faza 4: Testy i walidacja]
subgraph "Faza 1: Usunięcie zależności OpenAI"
B1[Usuń importy OpenAI]
B2[Usuń zmienne konfiguracyjne OpenAI]
B3[Usuń logikę wyboru klienta]
end
subgraph "Faza 2: Refaktoryzacja klienta OpenRouter"
C1[Stwórz dedykowaną klasę OpenRouterClient]
C2[Implementuj prawidłową konfigurację nagłówków]
C3[Dodaj obsługę różnych modeli]
end
subgraph "Faza 3: Optymalizacja obsługi odpowiedzi"
D1[Ujednolicenie formatu odpowiedzi]
D2[Implementacja lepszej obsługi błędów]
D3[Dodanie walidacji odpowiedzi]
end
subgraph "Faza 4: Testy i walidacja"
E1[Testy jednostkowe]
E2[Testy integracyjne]
E3[Dokumentacja zmian]
end
```
## Szczegółowa implementacja
### 1. Dedykowana klasa OpenRouterClient
```python
class OpenRouterClient:
def __init__(self, api_key: str, model_name: str):
self.api_key = api_key
self.model_name = model_name
self.base_url = "https://openrouter.ai/api/v1"
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"HTTP-Referer": "https://github.com/OpenRouterTeam/openrouter-examples",
"X-Title": "CV Analysis Tool"
})
def create_chat_completion(self, messages: list, max_tokens: int = None):
endpoint = f"{self.base_url}/chat/completions"
payload = {
"model": self.model_name,
"messages": messages,
"max_tokens": max_tokens
}
response = self.session.post(endpoint, json=payload)
response.raise_for_status()
return response.json()
def get_available_models(self):
endpoint = f"{self.base_url}/models"
response = self.session.get(endpoint)
response.raise_for_status()
return response.json()
```
### 2. Konfiguracja i inicjalizacja
```python
def initialize_openrouter_client():
if not OPENROUTER_API_KEY:
raise ValueError("OPENROUTER_API_KEY is required")
client = OpenRouterClient(
api_key=OPENROUTER_API_KEY,
model_name=OPENROUTER_MODEL_NAME
)
# Verify connection and model availability
try:
models = client.get_available_models()
if not any(model["id"] == OPENROUTER_MODEL_NAME for model in models):
raise ValueError(f"Model {OPENROUTER_MODEL_NAME} not available")
logger.debug(f"Successfully connected to OpenRouter. Available models: {models}")
return client
except Exception as e:
logger.error(f"Failed to initialize OpenRouter client: {e}")
raise
```
### 3. Obsługa odpowiedzi
```python
class OpenRouterResponse:
def __init__(self, raw_response: dict):
self.raw_response = raw_response
self.choices = self._parse_choices()
self.usage = self._parse_usage()
self.model = raw_response.get("model")
def _parse_choices(self):
choices = self.raw_response.get("choices", [])
return [
{
"message": choice.get("message", {}),
"finish_reason": choice.get("finish_reason"),
"index": choice.get("index")
}
for choice in choices
]
def _parse_usage(self):
usage = self.raw_response.get("usage", {})
return {
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0)
}
```
### 4. Obsługa błędów
```python
class OpenRouterError(Exception):
def __init__(self, message: str, status_code: int = None, response: dict = None):
super().__init__(message)
self.status_code = status_code
self.response = response
def handle_openrouter_error(error: Exception) -> OpenRouterError:
if isinstance(error, requests.exceptions.RequestException):
if error.response is not None:
try:
error_data = error.response.json()
message = error_data.get("error", {}).get("message", str(error))
return OpenRouterError(
message=message,
status_code=error.response.status_code,
response=error_data
)
except ValueError:
pass
return OpenRouterError(str(error))
```
## Kolejne kroki
1. Implementacja powyższych klas i funkcji
2. Usunięcie wszystkich zależności OpenAI
3. Aktualizacja istniejącego kodu do korzystania z nowego klienta
4. Dodanie testów jednostkowych i integracyjnych
5. Aktualizacja dokumentacji

View File

@ -7,7 +7,9 @@
"build": "next build --no-lint", "build": "next build --no-lint",
"start": "next start", "start": "next start",
"lint": "next lint", "lint": "next lint",
"debug": "NODE_DEBUG=next node server.js" "debug": "NODE_DEBUG=next node server.js",
"test": "pytest utils/tests/test_resume_analysis.py",
"count_documents": "mongosh mongodb://127.0.0.1:27017/cv_summary_db --eval 'db.cv_processing_collection.countDocuments()'"
}, },
"dependencies": { "dependencies": {
"@ai-sdk/google": "^1.1.17", "@ai-sdk/google": "^1.1.17",

View File

@ -1,5 +0,0 @@
source_dir: /mnt/archive_nfs
target_dir: /mnt/local_ssd
recent_days: 2
stale_days: 45
stats_file: /home/user/logs/file_manager_stats.json

View File

@ -0,0 +1,87 @@
```json
{
"sections": {
"Summary": {
"score": 8,
"suggestions": [
"Consider adding specific achievements or metrics to highlight impact.",
"Simplify language for clearer understanding."
],
"summary": "The summary provides a clear overview of the candidate's experience and roles in business analysis and IT management but can be improved by adding specific achievements to quantify their contributions.",
"keywords": {
"analityk": 3,
"doświadczenie": 2,
"architekt": 1,
"manager": 1
}
},
"Work Experience": {
"score": 9,
"suggestions": [],
"summary": "The work experience section is detailed, presenting clear job roles, responsibilities, and contributions. It utilizes strong action verbs but could be enhanced with quantifiable results in some roles.",
"keywords": {
"analiz": 5,
"biznesowy": 4,
"systemowy": 4,
"projekt": 4,
"współpraca": 3,
"wymagania": 2
}
},
"Education": {
"score": 8,
"suggestions": [
"Specify the graduation status for higher education.",
"Consider listing any honors or relevant coursework."
],
"summary": "The education section is comprehensive, including degrees and specialized training, but it lacks mention of graduation status and could highlight additional relevant coursework.",
"keywords": {
"Politechnika": 2,
"CISCO": 1,
"Magisterskie": 1,
"Inżynierskie": 1
}
},
"Skills": {
"score": 7,
"suggestions": [
"Categorize skills into technical and soft skills for clarity.",
"Add more specific technologies or methodologies relevant to the roles applied for."
],
"summary": "The skills section is minimal and lacks depth. Categorizing skills can improve clarity and relevance, and including specific technologies or methodologies would strengthen the section.",
"keywords": {
"szkoleń": 4,
"certyfikaty": 2,
"prawo jazdy": 1
}
},
"Certifications": {
"score": 9,
"suggestions": [],
"summary": "The certifications section is strong, detailing relevant training and certifications that add credibility to the candidate's qualifications.",
"keywords": {
"certyfikat": 1,
"szkolenie": 9
}
},
"Projects": {
"score": 6,
"suggestions": [
"Create a separate section for key projects with descriptions and outcomes.",
"Highlight individual contributions to collaborative projects."
],
"summary": "The projects are mentioned informally within work experience; however, creating a dedicated section would better emphasize significant projects and achievements.",
"keywords": {
"projekt": 4,
"wymagania": 2
}
}
},
"openai_stats": {
"input_tokens": 2585,
"output_tokens": 677,
"total_tokens": 3262,
"cost": 0.01308
}
}
```

View File

@ -0,0 +1,19 @@
{
"choices": [
{
"message": {
"content": "```json\n{\n \"sections\": {\n \"Summary\": {\n \"score\": 8,\n \"suggestions\": [\n \"Consider adding specific achievements or metrics to illustrate impact.\",\n \"Make the summary more concise by focusing on key strengths.\"\n ],\n \"summary\": \"The summary provides a brief overview of experience and roles but lacks specific accomplishments and is slightly verbose.\",\n \"keywords\": { \"analityk\": 3, \"doświadczenie\": 2, \"systemowy\": 2, \"technologicznych\": 1, \"menedżer\": 1 }\n },\n \"Work Experience\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"The work experience section is detailed and relevant, showcasing roles and responsibilities effectively, with clear job titles and dates.\",\n \"keywords\": { \"analityk\": 4, \"systemów\": 4, \"IT\": 6, \"projekty\": 4, \"współpraca\": 3 }\n },\n \"Education\": {\n \"score\": 7,\n \"suggestions\": [\n \"Provide dates for all educational entries for consistency.\",\n \"Consider adding any relevant coursework or projects to enhance completeness.\"\n ],\n \"summary\": \"The education section lists qualifications but lacks specific dates for every entry and does not include additional relevant details.\",\n \"keywords\": { \"studia\": 3, \"Politechnika\": 3, \"certyfikaty\": 1, \"sieci\": 1 }\n },\n \"Skills\": {\n \"score\": 8,\n \"suggestions\": [\n \"Group skills into categories (e.g., technical skills, soft skills) for clarity.\",\n \"Add specific software or tools to demonstrate technical expertise.\"\n ],\n \"summary\": \"The skills section summarizes capabilities but could benefit from organization and inclusion of specific skills relevant to jobs being applied for.\",\n \"keywords\": { \"techniczne\": 1, \"wiedza\": 1, \"umiejętności\": 1 }\n },\n \"Certifications\": {\n \"score\": 8,\n \"suggestions\": [\n \"Organize certifications in chronological order or by relevance.\",\n \"Include the dates of certifications for better context.\"\n ],\n \"summary\": \"The certifications are relevant but could be polished by adding organization and dates to enhance clarity.\",\n \"keywords\": { \"certyfikat\": 2, \"szkolenie\": 6, \"ITIL\": 2 }\n },\n \"Projects\": {\n \"score\": 6,\n \"suggestions\": [\n \"Provide more detail on individual projects, focusing on specific roles and outcomes.\",\n \"Include dates for project completion to establish a timeline.\"\n ],\n \"summary\": \"The projects section is present but lacks depth regarding specific responsibilities or results, making it less impactful.\",\n \"keywords\": { \"projekt\": 3, \"systemy\": 2, \"migrować\": 1 }\n }\n },\n \"openai_stats\": {\n \"input_tokens\": 1424,\n \"output_tokens\": 668,\n \"total_tokens\": 2092,\n \"cost\": 0.002092\n }\n}\n```",
"role": "assistant"
},
"finish_reason": "stop",
"index": 0
}
],
"usage": {
"prompt_tokens": 2668,
"completion_tokens": 679,
"total_tokens": 3347
},
"cost": 0,
"model": "gpt-4o-mini-2024-07-18"
}

View File

@ -0,0 +1,19 @@
{
"choices": [
{
"message": {
"content": "```json\n{\n \"sections\": {\n \"Summary\": {\n \"score\": 8,\n \"suggestions\": [\n \"Consider elaborating on specific achievements or key projects to highlight impact.\",\n \"Include more quantifiable metrics to showcase successful outcomes.\"\n ],\n \"summary\": \"The summary provides a clear overview of the candidate's professional background and experience in business analysis and system architecture. It indicates substantial experience but lacks specific examples of accomplishments.\",\n \"keywords\": {\n \"Analityk\": 4,\n \"biznesowy\": 2,\n \"systemowy\": 2,\n \"doświadczenie\": 1,\n \"technologicznych\": 1\n }\n },\n \"Work Experience\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"The work experience section is comprehensive, detailing various roles and responsibilities across multiple companies. It demonstrates a strong background in the IT sector with clear responsibilities and contributions but could benefit from more quantifiable outcomes.\",\n \"keywords\": {\n \"analityk\": 6,\n \"systemów\": 5,\n \"projekt\": 4,\n \"współpraca\": 3,\n \"technologii\": 3,\n \"wymagań\": 2,\n \"usług\": 2\n }\n },\n \"Education\": {\n \"score\": 8,\n \"suggestions\": [\n \"Specify the dates for when the education was completed.\",\n \"Only include institutions that are directly relevant to the position being applied for.\"\n ],\n \"summary\": \"The education section lists relevant degrees and institutions, highlighting a solid academic background in technology and information systems. Adding completion dates could enhance clarity.\",\n \"keywords\": {\n \"studia\": 3,\n \"Politechnika\": 2,\n \"informatycznych\": 2,\n \"CISCO\": 1,\n \"specjalność\": 1\n }\n },\n \"Skills\": {\n \"score\": 7,\n \"suggestions\": [\n \"Add more technical skills that are specifically relevant to the industry.\",\n \"Provide a clearer structure, possibly categorizing hard and soft skills.\"\n ],\n \"summary\": \"The skills section is notably brief. It lists language proficiency but lacks a comprehensive enumeration of technical and soft skills essential for the role of a business analyst.\",\n \"keywords\": {\n \"angielski\": 1,\n \"niemiecki\": 1\n }\n },\n \"Certifications\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"The certifications are presented clearly, showing a variety of relevant courses and certifications. This indicates a commitment to professional development and continuous learning.\",\n \"keywords\": {\n \"certyfikat\": 2,\n \"szkolenie\": 8,\n \"ITIL\": 2,\n \"IBM\": 3\n }\n },\n \"Projects\": {\n \"score\": 6,\n \"suggestions\": [\n \"Include specific projects with concise descriptions and impacts.\",\n \"List projects in a structured format, summarizing outcomes and key learnings.\"\n ],\n \"summary\": \"The projects section is not explicitly defined and lacks specifics. While detailed experience is found in work experience, this section would benefit from a clear presentation of significant projects and their outcomes.\",\n \"keywords\": {}\n }\n },\n \"openai_stats\": {\n \"input_tokens\": 1244,\n \"output_tokens\": 646,\n \"total_tokens\": 1890,\n \"cost\": 0.002\n }\n}\n```",
"role": "assistant"
},
"finish_reason": "stop",
"index": 0
}
],
"usage": {
"prompt_tokens": 2668,
"completion_tokens": 760,
"total_tokens": 3428
},
"cost": 0.0,
"model": "gpt-4o-mini-2024-07-18"
}

View File

@ -0,0 +1,19 @@
{
"choices": [
{
"message": {
"content": "```json\n{\n \"sections\": {\n \"Summary\": {\n \"score\": 8,\n \"suggestions\": [\n \"Consider adding specific achievements or metrics to quantify your impact.\",\n \"Refine language to be more concise and impactful.\"\n ],\n \"summary\": \"The summary provides a clear professional profile highlighting experience in business analysis and technology. However, it lacks specific achievements.\",\n \"keywords\": {\n \"Analityk\": 3,\n \"biznesowy\": 3,\n \"systemowy\": 3,\n \"doświadczenie\": 2,\n \"technologicznych\": 1\n }\n },\n \"Work Experience\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"The work experience section is comprehensive, detailing roles and responsibilities with an emphasis on contributions to projects. The use of bullet points enhances readability.\",\n \"keywords\": {\n \"analityk\": 4,\n \"programów\": 3,\n \"systemów\": 4,\n \"projektów\": 4,\n \"współpraca\": 3\n }\n },\n \"Education\": {\n \"score\": 8,\n \"suggestions\": [\n \"Specify the completion dates for each education entry.\",\n \"Include any honors or relevant courses to enhance detail.\"\n ],\n \"summary\": \"The education section lists relevant degrees and certifications, but lacks completion dates and additional achievements.\",\n \"keywords\": {\n \"studia\": 3,\n \"Politechnika\": 2,\n \"CISCO\": 1,\n \"certyfikat\": 1\n }\n },\n \"Skills\": {\n \"score\": 7,\n \"suggestions\": [\n \"List specific technical skills or tools you are proficient in.\",\n \"Group skills into categories for improved clarity.\"\n ],\n \"summary\": \"The skills section is minimal and lacks specificity. Adding more detailed skills related to business analysis and technology would be beneficial.\",\n \"keywords\": {\n \"analityka\": 1,\n \"systemowy\": 1\n }\n },\n \"Certifications\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"The certifications section is well-detailed and relevant, showcasing important qualifications for the field.\",\n \"keywords\": {\n \"certyfikat\": 1,\n \"szkolenie\": 5\n }\n },\n \"Projects\": {\n \"score\": 6,\n \"suggestions\": [\n \"Add specific project names and outcomes to illustrate contributions.\",\n \"Include metrics or results achieved in projects.\"\n ],\n \"summary\": \"The projects section is lacking, as it does not list projects explicitly or specify contributions. More detail could improve understanding of expertise.\",\n \"keywords\": {\n \"projekt\": 1,\n \"analiz\": 1\n }\n }\n },\n \"openai_stats\": {\n \"input_tokens\": 1318,\n \"output_tokens\": 509,\n \"total_tokens\": 1827,\n \"cost\": 0.002053\n }\n}\n```",
"role": "assistant"
},
"finish_reason": "stop",
"index": 0
}
],
"usage": {
"prompt_tokens": 2668,
"completion_tokens": 658,
"total_tokens": 3326
},
"cost": 0,
"model": "gpt-4o-mini-2024-07-18"
}

View File

@ -0,0 +1,19 @@
{
"choices": [
{
"message": {
"content": "```json\n{\n \"sections\": {\n \"Summary\": {\n \"score\": 8,\n \"suggestions\": [\"Add specific metrics to quantify achievements.\", \"Clarify the type of industries and roles you are most experienced in.\"],\n \"summary\": \"The summary provides a brief professional profile, emphasizing business and system analysis experience. However, it lacks specific metrics or examples of achievements.\",\n \"keywords\": {\n \"analityk\": 3,\n \"doświadczenie\": 2,\n \"systemowy\": 2,\n \"architekt\": 1,\n \"manager\": 1\n }\n },\n \"Work Experience\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"The work experience section is comprehensive, detailing roles, responsibilities, and projects. Each role is clearly delineated, showcasing relevant experience and contributions.\",\n \"keywords\": {\n \"analityk\": 5,\n \"system\": 4,\n \"projekt\": 4,\n \"zespół\": 2,\n \"usługi\": 3,\n \"współpraca\": 2\n }\n },\n \"Education\": {\n \"score\": 8,\n \"suggestions\": [\"Add graduation dates for each educational experience.\", \"Clearly specify the fields of study.\"],\n \"summary\": \"The education section provides various qualifications, but it could benefit from specific graduation dates and clarification of study fields.\",\n \"keywords\": {\n \"Politechnika\": 2,\n \"studia\": 3,\n \"CISCO\": 1,\n \"magisterskie\": 1,\n \"inżynierskie\": 1\n }\n },\n \"Skills\": {\n \"score\": 7,\n \"suggestions\": [\"List both hard and soft skills explicitly.\", \"Include any technical skills relevant to the roles applied for.\"],\n \"summary\": \"The skills section needs improvement; it lacks a clear list of both hard and soft skills that could enhance the individual's candidacy.\",\n \"keywords\": {\n \"CRM\": 2,\n \"analiza\": 2,\n \"zrozumienie\": 1,\n \"systemowy\": 1,\n \"projektowanie\": 1\n }\n },\n \"Certifications\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"The certifications section is strong with relevant certifications listed, demonstrating a commitment to professional development.\",\n \"keywords\": {\n \"certyfikat\": 2,\n \"ITIL\": 2,\n \"szkolenie\": 5,\n \"IBM\": 3\n }\n },\n \"Projects\": {\n \"score\": 7,\n \"suggestions\": [\"Add more details about specific projects (e.g., outcomes, skills used).\", \"Highlight any leadership roles in projects.\"],\n \"summary\": \"The projects section is present but lacks depth; it could highlight key achievements and the impact of each project.\",\n \"keywords\": {\n \"projekt\": 4,\n \"systemowy\": 2,\n \"analiza\": 1,\n \"zespół\": 2\n }\n }\n },\n \"openai_stats\": {\n \"input_tokens\": 1291,\n \"output_tokens\": 566,\n \"total_tokens\": 1857,\n \"cost\": 0.004\n }\n}\n```",
"role": "assistant"
},
"finish_reason": "stop",
"index": 0
}
],
"usage": {
"prompt_tokens": 2668,
"completion_tokens": 720,
"total_tokens": 3388
},
"cost": 0.0,
"model": "gpt-4o-mini-2024-07-18"
}

View File

@ -0,0 +1,19 @@
{
"choices": [
{
"message": {
"content": "```json\n{\n \"sections\": {\n \"Summary\": {\n \"score\": 8,\n \"suggestions\": [\n \"Make the summary more concise by focusing on key skills and achievements.\",\n \"Add specific examples of business analysis and architecture achievements.\"\n ],\n \"summary\": \"Strong professional summary indicating a solid background in business and system analysis with over 10 years of relevant experience, but lacks specific accomplishments.\",\n \"keywords\": {\n \"business analyst\": 1,\n \"system architect\": 1,\n \"manager\": 1,\n \"experience\": 1\n }\n },\n \"Work Experience\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"Detailed work experience in various roles with a focus on business analysis and IT management. Effective descriptions of responsibilities and contributions, although some job roles could highlight specific achievements more clearly.\",\n \"keywords\": {\n \"business analysis\": 5,\n \"system\": 6,\n \"IT\": 4,\n \"project\": 2,\n \"analysis\": 3,\n \"documentation\": 2\n }\n },\n \"Education\": {\n \"score\": 8,\n \"suggestions\": [\n \"Specify graduation dates for each educational qualification.\",\n \"Include any honors or distinctions received during studies.\"\n ],\n \"summary\": \"Solid educational background with relevant degrees and certifications in technology and electronics, but lacks detail on specific achievements or honors.\",\n \"keywords\": {\n \"degree\": 3,\n \"education\": 2,\n \"network associate\": 1\n }\n },\n \"Skills\": {\n \"score\": 7,\n \"suggestions\": [\n \"Expand on the range of technical and soft skills relevant to the positions sought.\",\n \"Organize skills into categories (e.g., Technical, Analytical, Interpersonal) for better clarity.\"\n ],\n \"summary\": \"Skills listed are somewhat general; better categorization and specificity could improve overall relevance.\",\n \"keywords\": {\n \"skills\": 1,\n \"analysis\": 2,\n \"communication\": 1\n }\n },\n \"Certifications\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"The section is well-structured and lists relevant certifications clearly, showcasing continuous professional development.\",\n \"keywords\": {\n \"certification\": 1,\n \"ITIL\": 1,\n \"CISCO\": 1\n }\n },\n \"Projects\": {\n \"score\": 6,\n \"suggestions\": [\n \"Provide more specific details about project outcomes or impacts.\",\n \"Highlight personal contributions or leadership roles in notable projects.\"\n ],\n \"summary\": \"Projects are mentioned but lack depth regarding impact and individual contributions. More concrete successes would strengthen the narrative.\",\n \"keywords\": {\n \"project\": 3,\n \"migration\": 1,\n \"implementation\": 1\n }\n }\n },\n \"openai_stats\": {\n \"input_tokens\": 2155,\n \"output_tokens\": 722,\n \"total_tokens\": 2877,\n \"cost\": 0.002877\n }\n}\n```",
"role": "assistant"
},
"finish_reason": "stop",
"index": 0
}
],
"usage": {
"prompt_tokens": 2668,
"completion_tokens": 668,
"total_tokens": 3336
},
"cost": 0.0,
"model": "gpt-4o-mini-2024-07-18"
}

View File

@ -0,0 +1,19 @@
{
"choices": [
{
"message": {
"content": "```json\n{\n \"sections\": {\n \"Summary\": {\n \"score\": 8,\n \"suggestions\": [\"Consider including specific achievements or metrics to highlight your impact.\", \"Make the language more concise and powerful.\"],\n \"summary\": \"The summary provides a clear overview of the candidate's role and experience but lacks specific accomplishments that could strengthen it.\",\n \"keywords\": { \"Analityk\": 2, \"doświadczenie\": 1, \"manager\": 1, \"architekt\": 1 }\n },\n \"Work Experience\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"The work experience section is detailed and comprehensive, showcasing a strong career progression and relevant expertise in various roles.\",\n \"keywords\": { \"IT\": 6, \"analityk\": 5, \"systemów\": 5, \"projekt\": 5, \"współpraca\": 4, \"klientów\": 3, \"usług\": 3 }\n },\n \"Education\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"Education section is informative and highlights relevant degrees and certifications, showcasing the candidate's academic background.\",\n \"keywords\": { \"studia\": 3, \"Politechnika Warszawska\": 2, \"CISCO\": 1, \"Magister\": 1, \"Inżynierskie\": 1 }\n },\n \"Skills\": {\n \"score\": 7,\n \"suggestions\": [\"List skills specifically related to the positions applied for.\", \"Consider organizing skills into relevant categories.\"],\n \"summary\": \"Skills section is not explicitly defined, making it difficult to quickly assess the candidate's qualifications. Specific skills and categories would add clarity.\",\n \"keywords\": { \"analiza\": 2, \"systemy\": 1, \"współpraca\": 1, \"usługi\": 1 }\n },\n \"Certifications\": {\n \"score\": 8,\n \"suggestions\": [\"Add the date for each certification obtained for better clarity.\", \"Consider grouping certifications by relevance.\"],\n \"summary\": \"The certifications section lists various relevant training and qualifications but would benefit from more organization and specificity.\",\n \"keywords\": { \"certyfikat\": 1, \"szkolenie\": 1, \"ITIL\": 2 }\n },\n \"Projects\": {\n \"score\": 7,\n \"suggestions\": [\"Include specific project names and outcomes to enhance detail.\", \"Highlight individual contributions more clearly.\"],\n \"summary\": \"The projects section provides some context but lacks clear delineation of specific projects or the candidate's individual contributions and results.\",\n \"keywords\": { \"projekt\": 3, \"współpraca\": 2, \"systemy\": 1 }\n }\n },\n \"openai_stats\": {\n \"input_tokens\": 1526,\n \"output_tokens\": 469,\n \"total_tokens\": 1995,\n \"cost\": 0.09975\n }\n}\n```",
"role": "assistant"
},
"finish_reason": "stop",
"index": 0
}
],
"usage": {
"prompt_tokens": 2668,
"completion_tokens": 647,
"total_tokens": 3315
},
"cost": 0.0,
"model": "gpt-4o-mini-2024-07-18"
}

View File

@ -0,0 +1,19 @@
{
"choices": [
{
"message": {
"content": "```json\n{\n \"sections\": {\n \"Summary\": {\n \"score\": 8,\n \"suggestions\": [\n \"Consider adding specific achievements or metrics to highlight impact.\",\n \"Simplify language for clearer understanding.\"\n ],\n \"summary\": \"The summary provides a clear overview of the candidate's experience and roles in business analysis and IT management but can be improved by adding specific achievements to quantify their contributions.\",\n \"keywords\": {\n \"analityk\": 3,\n \"doświadczenie\": 2,\n \"architekt\": 1,\n \"manager\": 1\n }\n },\n \"Work Experience\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"The work experience section is detailed, presenting clear job roles, responsibilities, and contributions. It utilizes strong action verbs but could be enhanced with quantifiable results in some roles.\",\n \"keywords\": {\n \"analiz\": 5,\n \"biznesowy\": 4,\n \"systemowy\": 4,\n \"projekt\": 4,\n \"współpraca\": 3,\n \"wymagania\": 2\n }\n },\n \"Education\": {\n \"score\": 8,\n \"suggestions\": [\n \"Specify the graduation status for higher education.\",\n \"Consider listing any honors or relevant coursework.\"\n ],\n \"summary\": \"The education section is comprehensive, including degrees and specialized training, but it lacks mention of graduation status and could highlight additional relevant coursework.\",\n \"keywords\": {\n \"Politechnika\": 2,\n \"CISCO\": 1,\n \"Magisterskie\": 1,\n \"Inżynierskie\": 1\n }\n },\n \"Skills\": {\n \"score\": 7,\n \"suggestions\": [\n \"Categorize skills into technical and soft skills for clarity.\",\n \"Add more specific technologies or methodologies relevant to the roles applied for.\"\n ],\n \"summary\": \"The skills section is minimal and lacks depth. Categorizing skills can improve clarity and relevance, and including specific technologies or methodologies would strengthen the section.\",\n \"keywords\": {\n \"szkoleń\": 4,\n \"certyfikaty\": 2,\n \"prawo jazdy\": 1\n }\n },\n \"Certifications\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"The certifications section is strong, detailing relevant training and certifications that add credibility to the candidate's qualifications.\",\n \"keywords\": {\n \"certyfikat\": 1,\n \"szkolenie\": 9\n }\n },\n \"Projects\": {\n \"score\": 6,\n \"suggestions\": [\n \"Create a separate section for key projects with descriptions and outcomes.\",\n \"Highlight individual contributions to collaborative projects.\"\n ],\n \"summary\": \"The projects are mentioned informally within work experience; however, creating a dedicated section would better emphasize significant projects and achievements.\",\n \"keywords\": {\n \"projekt\": 4,\n \"wymagania\": 2\n }\n }\n },\n \"openai_stats\": {\n \"input_tokens\": 2585,\n \"output_tokens\": 677,\n \"total_tokens\": 3262,\n \"cost\": 0.01308\n }\n}\n```",
"role": "assistant"
},
"finish_reason": "stop",
"index": 0
}
],
"usage": {
"prompt_tokens": 2668,
"completion_tokens": 706,
"total_tokens": 3374
},
"cost": 0.0,
"model": "gpt-4o-mini-2024-07-18"
}

View File

@ -0,0 +1,19 @@
{
"choices": [
{
"message": {
"content": "```json\n{\n \"sections\": {\n \"Summary\": {\n \"score\": 8,\n \"suggestions\": [\n \"Add specific accomplishments or metrics to demonstrate impact\",\n \"Consider using bullet points for easier readability\"\n ],\n \"summary\": \"The summary provides a clear overview of the candidate's experience and roles as a business analyst, architect, and manager. However, it lacks specific details on achievements or metrics that could enhance its impact.\",\n \"keywords\": {\n \"Analityk biznesowy\": 1,\n \"systemowy\": 1,\n \"architekt\": 1,\n \"manager\": 1,\n \"doświadczenie\": 1\n }\n },\n \"Work Experience\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"This section provides comprehensive details about the candidate's relevant work experience, including roles, responsibilities, and achievements. It is well-structured and effectively highlights the candidates expertise.\",\n \"keywords\": {\n \"analityk\": 5,\n \"systemowy\": 2,\n \"kierownik\": 2,\n \"dzieło\": 2,\n \"projekt\": 3,\n \"współpraca\": 2,\n \"systemy\": 3,\n \"dokumentacja\": 2\n }\n },\n \"Education\": {\n \"score\": 8,\n \"suggestions\": [\n \"Include graduation years for better context\",\n \"Consider adding any honors or relevant coursework\"\n ],\n \"summary\": \"The education section lists relevant degrees and certifications, but lacks graduation dates and specifics about honors which could strengthen the presentation.\",\n \"keywords\": {\n \"Magisterskie\": 1,\n \"Inżynierskie\": 1,\n \"Politechnika\": 2,\n \"CISCO\": 1,\n \"specjalność\": 3\n }\n },\n \"Skills\": {\n \"score\": 7,\n \"suggestions\": [\n \"Add more specific technical and soft skills\",\n \"Group skills into categories for clarity\"\n ],\n \"summary\": \"The skills section is brief and could benefit from more detail. Including specific technical skills, soft skills, and grouping them would enhance this sections effectiveness.\",\n \"keywords\": {}\n },\n \"Certifications\": {\n \"score\": 9,\n \"suggestions\": [],\n \"summary\": \"The certifications section is well-detailed, showcasing a range of relevant training and certifications that support the candidate's qualifications. No improvements needed.\",\n \"keywords\": {\n \"certyfikat\": 3,\n \"szkolenie\": 6,\n \"ITIL\": 2\n }\n },\n \"Projects\": {\n \"score\": 8,\n \"suggestions\": [\n \"Provide more detailed descriptions of key projects\",\n \"Highlight any specific outcomes or results achieved\"\n ],\n \"summary\": \"The projects section includes relevant experiences but would be improved by elaborating on the specifics of projects and their outcomes, including metrics or achievements.\",\n \"keywords\": {\n \"projekt\": 4,\n \"analiza\": 2,\n \"współpraca\": 1\n }\n }\n },\n \"openai_stats\": {\n \"input_tokens\": 1695,\n \"output_tokens\": 712,\n \"total_tokens\": 2407,\n \"cost\": 0.0035\n }\n}\n```",
"role": "assistant"
},
"finish_reason": "stop",
"index": 0
}
],
"usage": {
"prompt_tokens": 2668,
"completion_tokens": 729,
"total_tokens": 3397
},
"cost": 0,
"model": "gpt-4o-mini-2024-07-18"
}

View File

@ -1,819 +0,0 @@
#!/usr/bin/env python3
import argparse
import os
import subprocess
import sys
import time
import logging
import json # Added for stats file
from datetime import datetime, timedelta
from pathlib import Path # Added for easier path handling
# --- Dependencies ---
# Requires PyYAML: pip install PyYAML
try:
import yaml
except ImportError:
print("Error: PyYAML library not found. Please install it using: pip install PyYAML", file=sys.stderr)
sys.exit(1)
# --- Configuration ---
# These act as fallback defaults if not specified in config file or command line
DEFAULT_SOURCE_DIR = "/mnt/slow_storage"
DEFAULT_TARGET_DIR = "/mnt/fast_storage"
DEFAULT_RECENT_DAYS = 1
DEFAULT_STALE_DAYS = 30 # Default for moving cold files back
DEFAULT_STATS_FILE = None # Default: Don't generate stats unless requested
DEFAULT_MIN_SIZE = "0" # Default: No minimum size filter
DEFAULT_CONFIG_PATH = Path.home() / ".config" / "file_manager" / "config.yaml"
# --- Logging Setup ---
def setup_logging():
"""Configures basic logging."""
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# --- Helper Function ---
def format_bytes(size):
"""Converts bytes to a human-readable string (KB, MB, GB)."""
if size is None: return "N/A"
if size < 1024:
return f"{size} B"
elif size < 1024**2:
return f"{size / 1024:.2f} KB"
elif size < 1024**3:
return f"{size / 1024**2:.2f} MB"
else:
return f"{size / 1024**3:.2f} GB"
# --- Helper Function: Parse Size String ---
def parse_size_string(size_str):
"""Converts a size string (e.g., '10G', '500M', '10k') to bytes."""
size_str = str(size_str).strip().upper()
if not size_str:
return 0
if size_str == '0':
return 0
units = {"B": 1, "K": 1024, "M": 1024**2, "G": 1024**3, "T": 1024**4}
unit = "B" # Default unit
# Check last character for unit
if size_str[-1] in units:
unit = size_str[-1]
numeric_part = size_str[:-1]
else:
numeric_part = size_str
if not numeric_part.replace('.', '', 1).isdigit(): # Allow float for parsing e.g. 1.5G
raise ValueError(f"Invalid numeric part in size string: '{numeric_part}'")
try:
value = float(numeric_part)
except ValueError:
raise ValueError(f"Cannot convert numeric part to float: '{numeric_part}'")
return int(value * units[unit])
# --- Configuration Loading ---
def load_config(config_path):
"""Loads configuration from a YAML file."""
config = {}
resolved_path = Path(config_path).resolve() # Resolve potential symlinks/relative paths
if resolved_path.is_file():
try:
with open(resolved_path, 'r') as f:
config = yaml.safe_load(f)
if config is None: # Handle empty file case
config = {}
logging.info(f"Loaded configuration from: {resolved_path}")
except yaml.YAMLError as e:
logging.warning(f"Error parsing config file {resolved_path}: {e}. Using defaults.")
except OSError as e:
logging.warning(f"Error reading config file {resolved_path}: {e}. Using defaults.")
else:
# It's okay if the default config doesn't exist, don't log warning unless user specified one
if str(resolved_path) != str(DEFAULT_CONFIG_PATH.resolve()):
logging.warning(f"Specified config file not found at {resolved_path}. Using defaults/CLI args.")
else:
logging.info(f"Default config file not found at {resolved_path}. Using defaults/CLI args.")
return config
# --- Argument Parsing ---
def parse_arguments():
"""Parses command line arguments, considering config file defaults."""
# Initial minimal parse to find config path *before* defining all args
pre_parser = argparse.ArgumentParser(add_help=False)
pre_parser.add_argument('--config', default=str(DEFAULT_CONFIG_PATH), help=f'Path to YAML configuration file (Default: {DEFAULT_CONFIG_PATH}).')
pre_args, _ = pre_parser.parse_known_args()
# Load config based on pre-parsed path
config = load_config(pre_args.config)
# Get defaults from config or fallback constants
cfg_source_dir = config.get('source_dir', DEFAULT_SOURCE_DIR)
cfg_target_dir = config.get('target_dir', DEFAULT_TARGET_DIR)
cfg_recent_days = config.get('recent_days', DEFAULT_RECENT_DAYS)
cfg_stale_days = config.get('stale_days', DEFAULT_STALE_DAYS)
cfg_stats_file = config.get('stats_file', DEFAULT_STATS_FILE)
cfg_min_size = config.get('min_size', DEFAULT_MIN_SIZE)
# Main parser using loaded config defaults
parser = argparse.ArgumentParser(
description="Manages files between storage tiers based on access/modification time, generates stats, and summarizes.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=f"""Examples:
# Move hot files (accessed < {cfg_recent_days}d ago) from {cfg_source_dir} to {cfg_target_dir}
{sys.argv[0]} --move
# Move cold files (modified > {cfg_stale_days}d ago) from {cfg_target_dir} to {cfg_source_dir} (interactive)
{sys.argv[0]} --move-cold --interactive
# Simulate moving hot files with custom settings
{sys.argv[0]} --move --recent-days 3 --source-dir /data/archive --target-dir /data/hot --dry-run
# Count potential hot files larger than 100MB to move
{sys.argv[0]} --count --min-size 100M
{sys.argv[0]} --count
# Summarize unused files in target directory
{sys.argv[0]} --summarize-unused
# Generate storage statistics report
{sys.argv[0]} --generate-stats --stats-file /var/log/file_manager_stats.json
# Use a specific configuration file
{sys.argv[0]} --config /path/to/my_config.yaml --move
"""
)
action_group = parser.add_argument_group('Actions (at least one required)')
action_group.add_argument('--move', action='store_true', help='Move recently accessed ("hot") files from source to target.')
action_group.add_argument('--move-cold', action='store_true', help='Move old unmodified ("cold") files from target back to source.')
action_group.add_argument('--count', action='store_true', help='Count hot files in source that would be moved (based on access time).')
action_group.add_argument('--summarize-unused', action='store_true', help='Analyze target directory for unused files based on modification time.')
action_group.add_argument('--generate-stats', action='store_true', help='Generate a JSON stats report for source and target directories.')
config_group = parser.add_argument_group('Configuration Options (Overrides config file)')
config_group.add_argument('--config', default=str(DEFAULT_CONFIG_PATH), help=f'Path to YAML configuration file (Default: {DEFAULT_CONFIG_PATH}).') # Re-add for help text
config_group.add_argument('--source-dir', default=cfg_source_dir, help=f'Source directory (Default: "{cfg_source_dir}").')
config_group.add_argument('--target-dir', default=cfg_target_dir, help=f'Target directory (Default: "{cfg_target_dir}").')
config_group.add_argument('--recent-days', type=int, default=cfg_recent_days, help=f'Define "recent" access in days for --move/--count (Default: {cfg_recent_days}).')
config_group.add_argument('--stale-days', type=int, default=cfg_stale_days, help=f'Define "stale" modification in days for --move-cold (Default: {cfg_stale_days}).')
config_group.add_argument('--stats-file', default=cfg_stats_file, help=f'Output file for --generate-stats (Default: {"None" if cfg_stats_file is None else cfg_stats_file}).')
config_group.add_argument('--min-size', default=cfg_min_size, help=f'Minimum file size to consider for move actions (e.g., 100M, 1G, 0 to disable). (Default: {cfg_min_size})')
behavior_group = parser.add_argument_group('Behavior Modifiers')
behavior_group.add_argument('--dry-run', action='store_true', help='Simulate move actions without actual changes.')
behavior_group.add_argument('--interactive', action='store_true', help='Prompt for confirmation before executing move actions (ignored if --dry-run).')
# If no arguments were given (just script name), print help
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
# Validate that at least one action is selected
action_selected = args.move or args.move_cold or args.count or args.summarize_unused or args.generate_stats
if not action_selected:
parser.error("At least one action flag (--move, --move-cold, --count, --summarize-unused, --generate-stats) is required.")
# Validate days arguments
if args.recent_days <= 0:
parser.error("--recent-days must be a positive integer.")
if args.stale_days <= 0:
parser.error("--stale-days must be a positive integer.")
# Validate stats file if action is selected
if args.generate_stats and not args.stats_file:
parser.error("--stats-file must be specified when using --generate-stats (or set in config file).")
# Validate and parse min_size
try:
args.min_size_bytes = parse_size_string(args.min_size)
if args.min_size_bytes < 0:
parser.error("--min-size cannot be negative.")
except ValueError as e:
parser.error(f"Invalid --min-size value: {e}")
return args
# --- Core Logic Functions ---
def find_recent_files(source_dir, days, min_size_bytes):
"""Finds files accessed within the last 'days' in the source directory."""
size_filter_msg = f" and size >= {format_bytes(min_size_bytes)}" if min_size_bytes > 0 else ""
logging.info(f"Scanning '{source_dir}' for files accessed within the last {days} day(s){size_filter_msg}...")
recent_files = []
cutoff_time = time.time() - (days * 86400) # 86400 seconds in a day
try:
for root, _, files in os.walk(source_dir):
for filename in files:
filepath = os.path.join(root, filename)
try:
# Check if it's a file and not a broken symlink etc.
if not os.path.isfile(filepath) or os.path.islink(filepath):
continue
stat_result = os.stat(filepath)
# Check access time AND minimum size
if stat_result.st_atime > cutoff_time and stat_result.st_size >= min_size_bytes:
# Get path relative to source_dir for rsync --files-from
relative_path = os.path.relpath(filepath, source_dir)
recent_files.append(relative_path)
except FileNotFoundError:
logging.warning(f"File not found during scan, skipping: {filepath}")
continue # File might have been deleted during scan
except OSError as e:
logging.warning(f"Cannot access file stats, skipping: {filepath} ({e})")
continue
except FileNotFoundError:
logging.error(f"Source directory '{source_dir}' not found during scan.")
return None # Indicate error
except Exception as e:
logging.error(f"An unexpected error occurred during 'recent' file scan: {e}")
return None
logging.info(f"Found {len(recent_files)} files matching the 'recent' criteria.")
return recent_files
# --- New Function: Find Stale Files ---
def find_stale_files(target_dir, days, min_size_bytes):
"""Finds files modified more than 'days' ago in the target directory."""
size_filter_msg = f" and size >= {format_bytes(min_size_bytes)}" if min_size_bytes > 0 else ""
logging.info(f"Scanning '{target_dir}' for files modified more than {days} day(s) ago{size_filter_msg}...")
stale_files = []
# Cutoff time is *before* this time
cutoff_time = time.time() - (days * 86400) # 86400 seconds in a day
try:
for root, _, files in os.walk(target_dir):
for filename in files:
filepath = os.path.join(root, filename)
try:
# Check if it's a file and not a broken symlink etc.
if not os.path.isfile(filepath) or os.path.islink(filepath):
continue
stat_result = os.stat(filepath)
# Check modification time
# Check modification time AND minimum size
if stat_result.st_mtime < cutoff_time and stat_result.st_size >= min_size_bytes:
# Get path relative to target_dir for rsync --files-from
relative_path = os.path.relpath(filepath, target_dir)
stale_files.append(relative_path)
except FileNotFoundError:
logging.warning(f"File not found during stale scan, skipping: {filepath}")
continue # File might have been deleted during scan
except OSError as e:
logging.warning(f"Cannot access file stats during stale scan, skipping: {filepath} ({e})")
continue
except FileNotFoundError:
logging.error(f"Target directory '{target_dir}' not found during stale scan.")
return None # Indicate error
except Exception as e:
logging.error(f"An unexpected error occurred during 'stale' file scan: {e}")
return None
logging.info(f"Found {len(stale_files)} files matching the 'stale' criteria (modified > {days} days ago).")
return stale_files
def move_files(relative_file_list, source_dir, target_dir, dry_run, interactive): # Added interactive
"""Moves files using rsync (hot files: source -> target)."""
if not relative_file_list:
logging.info("No 'hot' files found to move.")
return True # Nothing to do, considered success
action_desc = "move hot files"
simulating = dry_run
num_files = len(relative_file_list)
logging.info(f"--- {'Simulating ' if simulating else ''}{action_desc.capitalize()} ---")
logging.info(f"Source Base: {source_dir}")
logging.info(f"Target Base: {target_dir}")
logging.info(f"Files to process: {num_files}")
logging.info("--------------------")
# Interactive prompt
if interactive and not simulating:
try:
confirm = input(f"Proceed with moving {num_files} hot files from '{source_dir}' to '{target_dir}'? (yes/no): ").lower().strip()
if confirm != 'yes':
logging.warning("Move operation cancelled by user.")
return False # Indicate cancellation
except EOFError: # Handle non-interactive environments gracefully
logging.warning("Cannot prompt in non-interactive mode. Aborting move.")
return False
rsync_cmd = ['rsync', '-avP', '--relative', '--info=progress2'] # archive, verbose, progress/partial, relative paths
if simulating:
rsync_cmd.append('--dry-run')
else:
rsync_cmd.append('--remove-source-files')
# Use --files-from=- with source as '.' because paths are relative to source_dir
# Target directory is the destination for the relative structure
rsync_cmd.extend(['--files-from=-', '.', target_dir])
# Prepare file list for stdin (newline separated)
files_input = "\n".join(relative_file_list).encode('utf-8')
try:
logging.info(f"Executing rsync command: {' '.join(rsync_cmd)}")
# Run rsync in the source directory context
process = subprocess.run(
rsync_cmd,
input=files_input,
capture_output=True,
# text=True, # Removed: Input is bytes, output will be bytes
check=False, # Don't raise exception on non-zero exit
cwd=source_dir # Execute rsync from the source directory
)
# Decode output/error streams
stdout_str = process.stdout.decode('utf-8', errors='replace') if process.stdout else ""
stderr_str = process.stderr.decode('utf-8', errors='replace') if process.stderr else ""
if stdout_str:
logging.info("rsync output:\n" + stdout_str)
if stderr_str:
# rsync often prints stats to stderr, log as info unless exit code is bad
log_level = logging.WARNING if process.returncode != 0 else logging.INFO
logging.log(log_level, "rsync stderr:\n" + stderr_str)
if process.returncode == 0:
logging.info(f"rsync {'simulation' if simulating else action_desc} completed successfully.")
logging.info("--------------------")
return True
else:
logging.error(f"rsync {'simulation' if simulating else action_desc} failed with exit code {process.returncode}.")
logging.info("--------------------")
return False
except FileNotFoundError:
logging.error("Error: 'rsync' command not found. Please ensure rsync is installed and in your PATH.")
return False
except Exception as e:
logging.error(f"An unexpected error occurred during rsync execution for hot files: {e}")
return False
# --- New Function: Move Cold Files ---
def move_files_cold(relative_file_list, source_dir, target_dir, dry_run, interactive):
"""Moves files using rsync (cold files: target -> source)."""
if not relative_file_list:
logging.info("No 'cold' files found to move back.")
return True # Nothing to do, considered success
action_desc = "move cold files back"
simulating = dry_run
num_files = len(relative_file_list)
total_size = 0
# Calculate total size before prompt/move
logging.info("Calculating total size of cold files...")
for rel_path in relative_file_list:
try:
full_path = os.path.join(target_dir, rel_path)
if os.path.isfile(full_path): # Check again in case it vanished
total_size += os.path.getsize(full_path)
except OSError as e:
logging.warning(f"Could not get size for {rel_path}: {e}")
logging.info(f"--- {'Simulating ' if simulating else ''}{action_desc.capitalize()} ---")
logging.info(f"Source (of cold files): {target_dir}")
logging.info(f"Destination (archive): {source_dir}")
logging.info(f"Files to process: {num_files}")
logging.info(f"Total size: {format_bytes(total_size)}")
logging.info("--------------------")
# Interactive prompt
if interactive and not simulating:
try:
confirm = input(f"Proceed with moving {num_files} cold files ({format_bytes(total_size)}) from '{target_dir}' to '{source_dir}'? (yes/no): ").lower().strip()
if confirm != 'yes':
logging.warning("Move operation cancelled by user.")
return False # Indicate cancellation
except EOFError: # Handle non-interactive environments gracefully
logging.warning("Cannot prompt in non-interactive mode. Aborting move.")
return False
# Note: We run rsync from the TARGET directory now
rsync_cmd = ['rsync', '-avP', '--relative'] # archive, verbose, progress/partial, relative paths
if simulating:
rsync_cmd.append('--dry-run')
else:
rsync_cmd.append('--remove-source-files') # Remove from TARGET after successful transfer
# Use --files-from=- with source as '.' (relative to target_dir)
# Target directory is the destination (source_dir in this context)
rsync_cmd.extend(['--files-from=-', '.', source_dir])
# Prepare file list for stdin (newline separated)
files_input = "\n".join(relative_file_list).encode('utf-8')
try:
logging.info(f"Executing rsync command: {' '.join(rsync_cmd)}")
# Run rsync in the TARGET directory context
process = subprocess.run(
rsync_cmd,
input=files_input,
capture_output=True,
# text=True, # Removed: Input is bytes, output will be bytes
check=False, # Don't raise exception on non-zero exit
cwd=target_dir # <<< Execute rsync from the TARGET directory
)
# Decode output/error streams
stdout_str = process.stdout.decode('utf-8', errors='replace') if process.stdout else ""
stderr_str = process.stderr.decode('utf-8', errors='replace') if process.stderr else ""
if stdout_str:
logging.info("rsync output:\n" + stdout_str)
if stderr_str:
log_level = logging.WARNING if process.returncode != 0 else logging.INFO
logging.log(log_level, "rsync stderr:\n" + stderr_str)
if process.returncode == 0:
logging.info(f"rsync {'simulation' if simulating else action_desc} completed successfully.")
logging.info("--------------------")
return True
else:
logging.error(f"rsync {'simulation' if simulating else action_desc} failed with exit code {process.returncode}.")
logging.info("--------------------")
return False
except FileNotFoundError:
logging.error("Error: 'rsync' command not found. Please ensure rsync is installed and in your PATH.")
return False
except Exception as e:
logging.error(f"An unexpected error occurred during rsync execution for cold files: {e}")
return False
def count_files(file_list):
"""Logs the count of files found."""
logging.info("--- Counting Hot Move Candidates ---")
if file_list is None:
logging.warning("File list is not available (likely due to earlier error).")
else:
logging.info(f"Found {len(file_list)} potential hot files to move based on access time.")
logging.info("----------------------------")
def summarize_unused(target_dir):
"""Summarizes unused files in the target directory based on modification time."""
logging.info("--- Summarizing Unused Files in Target ---")
logging.info(f"Target Directory: {target_dir}")
logging.info("Criteria: Based on modification time (-mtime)")
logging.info("------------------------------------------")
periods_days = [1, 3, 7, 14, 30]
now = time.time()
period_cutoffs = {days: now - (days * 86400) for days in periods_days}
# Add a bucket for > 30 days
size_by_period = {days: 0 for days in periods_days + ['30+']}
count_by_period = {days: 0 for days in periods_days + ['30+']} # Also count files
file_count = 0
total_processed_size = 0
try:
for root, _, files in os.walk(target_dir):
for filename in files:
filepath = os.path.join(root, filename)
try:
# Check if it's a file and not a broken symlink etc.
if not os.path.isfile(filepath) or os.path.islink(filepath):
continue
stat_result = os.stat(filepath)
mtime = stat_result.st_mtime
fsize = stat_result.st_size
file_count += 1
total_processed_size += fsize
# Check against periods in descending order of age (longest first)
period_assigned = False
if mtime < period_cutoffs[30]:
size_by_period['30+'] += fsize
count_by_period['30+'] += 1
period_assigned = True
elif mtime < period_cutoffs[14]:
size_by_period[30] += fsize
count_by_period[30] += 1
period_assigned = True
elif mtime < period_cutoffs[7]:
size_by_period[14] += fsize
count_by_period[14] += 1
period_assigned = True
elif mtime < period_cutoffs[3]:
size_by_period[7] += fsize
count_by_period[7] += 1
period_assigned = True
elif mtime < period_cutoffs[1]:
size_by_period[3] += fsize
count_by_period[3] += 1
period_assigned = True
# else: # Modified within the last day - doesn't count for these summaries
except FileNotFoundError:
logging.warning(f"File not found during summary, skipping: {filepath}")
continue
except OSError as e:
logging.warning(f"Cannot access file stats during summary, skipping: {filepath} ({e})")
continue
logging.info(f"Scanned {file_count} files, total size: {format_bytes(total_processed_size)}")
# Calculate cumulative sizes and counts
cumulative_size = {days: 0 for days in periods_days + ['30+']}
cumulative_count = {days: 0 for days in periods_days + ['30+']}
# Iterate backwards through sorted periods for cumulative calculation
# These keys represent the *lower bound* of the age bucket (e.g., key '30' means 14 < age <= 30 days)
# The cumulative value for key 'X' means "total size/count of files older than X days"
sorted_periods_desc = ['30+'] + sorted(periods_days, reverse=True) # e.g., ['30+', 30, 14, 7, 3, 1]
last_period_size = 0
last_period_count = 0
temp_cumulative_size = {}
temp_cumulative_count = {}
for period_key in sorted_periods_desc:
current_size = size_by_period[period_key]
current_count = count_by_period[period_key]
temp_cumulative_size[period_key] = current_size + last_period_size
temp_cumulative_count[period_key] = current_count + last_period_count
last_period_size = temp_cumulative_size[period_key]
last_period_count = temp_cumulative_count[period_key]
# Map temporary cumulative values to the correct "older than X days" meaning
# cumulative_size[1] should be size of files older than 1 day (i.e. temp_cumulative_size[3])
cumulative_size[1] = temp_cumulative_size.get(3, 0)
cumulative_count[1] = temp_cumulative_count.get(3, 0)
cumulative_size[3] = temp_cumulative_size.get(7, 0)
cumulative_count[3] = temp_cumulative_count.get(7, 0)
cumulative_size[7] = temp_cumulative_size.get(14, 0)
cumulative_count[7] = temp_cumulative_count.get(14, 0)
cumulative_size[14] = temp_cumulative_size.get(30, 0)
cumulative_count[14] = temp_cumulative_count.get(30, 0)
cumulative_size[30] = temp_cumulative_size.get('30+', 0)
cumulative_count[30] = temp_cumulative_count.get('30+', 0)
cumulative_size['30+'] = temp_cumulative_size.get('30+', 0) # Redundant but harmless
cumulative_count['30+'] = temp_cumulative_count.get('30+', 0)
logging.info("Cumulative stats for files NOT modified for more than:")
# Display in ascending order of days for clarity
logging.info(f" > 1 day: {format_bytes(cumulative_size[1])} ({cumulative_count[1]} files)")
logging.info(f" > 3 days: {format_bytes(cumulative_size[3])} ({cumulative_count[3]} files)")
logging.info(f" > 7 days: {format_bytes(cumulative_size[7])} ({cumulative_count[7]} files)")
logging.info(f" > 14 days:{format_bytes(cumulative_size[14])} ({cumulative_count[14]} files)")
logging.info(f" > 30 days:{format_bytes(cumulative_size[30])} ({cumulative_count[30]} files)")
except FileNotFoundError:
logging.error(f"Target directory '{target_dir}' not found for summary.")
except Exception as e:
logging.error(f"An unexpected error occurred during unused file summary: {e}")
logging.info("------------------------------------------")
# --- New Function: Analyze Directory for Stats ---
def analyze_directory(directory):
"""Analyzes a directory and returns statistics."""
logging.info(f"Analyzing directory for statistics: {directory}")
stats = {
'total_files': 0,
'total_size': 0,
'size_by_mod_time_days': { # Buckets represent age > X days (key '1' means 0 < age <= 1 day)
'1': {'count': 0, 'size': 0}, # <= 1 day old
'3': {'count': 0, 'size': 0}, # > 1 day, <= 3 days old
'7': {'count': 0, 'size': 0}, # > 3 days, <= 7 days old
'14': {'count': 0, 'size': 0},# > 7 days, <= 14 days old
'30': {'count': 0, 'size': 0}, # > 14 days, <= 30 days old
'over_30': {'count': 0, 'size': 0} # > 30 days old
},
'error_count': 0,
}
periods_days = [1, 3, 7, 14, 30]
now = time.time()
# Cutoffs: if mtime < cutoff[X], file is older than X days
period_cutoffs = {days: now - (days * 86400) for days in periods_days}
try:
for root, _, files in os.walk(directory):
for filename in files:
filepath = os.path.join(root, filename)
try:
if not os.path.isfile(filepath) or os.path.islink(filepath):
continue
stat_result = os.stat(filepath)
mtime = stat_result.st_mtime
fsize = stat_result.st_size
stats['total_files'] += 1
stats['total_size'] += fsize
# Assign to age buckets based on modification time (oldest first)
if mtime < period_cutoffs[30]:
stats['size_by_mod_time_days']['over_30']['count'] += 1
stats['size_by_mod_time_days']['over_30']['size'] += fsize
elif mtime < period_cutoffs[14]:
stats['size_by_mod_time_days']['30']['count'] += 1
stats['size_by_mod_time_days']['30']['size'] += fsize
elif mtime < period_cutoffs[7]:
stats['size_by_mod_time_days']['14']['count'] += 1
stats['size_by_mod_time_days']['14']['size'] += fsize
elif mtime < period_cutoffs[3]:
stats['size_by_mod_time_days']['7']['count'] += 1
stats['size_by_mod_time_days']['7']['size'] += fsize
elif mtime < period_cutoffs[1]:
stats['size_by_mod_time_days']['3']['count'] += 1
stats['size_by_mod_time_days']['3']['size'] += fsize
else: # Modified within the last day
stats['size_by_mod_time_days']['1']['count'] += 1
stats['size_by_mod_time_days']['1']['size'] += fsize
except FileNotFoundError:
logging.warning(f"File not found during stats analysis, skipping: {filepath}")
stats['error_count'] += 1
continue
except OSError as e:
logging.warning(f"Cannot access file stats during stats analysis, skipping: {filepath} ({e})")
stats['error_count'] += 1
continue
logging.info(f"Analysis complete for {directory}: Found {stats['total_files']} files, total size {format_bytes(stats['total_size'])}.")
if stats['error_count'] > 0:
logging.warning(f"Encountered {stats['error_count']} errors during analysis of {directory}.")
return stats
except FileNotFoundError:
logging.error(f"Directory '{directory}' not found for statistics analysis.")
return None # Indicate error
except Exception as e:
logging.error(f"An unexpected error occurred during statistics analysis of {directory}: {e}")
return None
# --- New Function: Generate Stats Report ---
def generate_stats(args):
"""Generates a JSON statistics report for source and target directories."""
logging.info("--- Generating Statistics Report ---")
report = {
'report_generated_utc': datetime.utcnow().isoformat() + 'Z',
'source_directory': args.source_dir,
'target_directory': args.target_dir,
'source_stats': None,
'target_stats': None,
}
success = True
# Analyze source directory if it exists
if os.path.isdir(args.source_dir):
logging.info(f"Analyzing source directory: {args.source_dir}")
source_stats = analyze_directory(args.source_dir)
if source_stats is None:
logging.error(f"Failed to analyze source directory: {args.source_dir}")
success = False # Mark as partial failure, but continue
report['source_stats'] = source_stats
else:
logging.warning(f"Source directory '{args.source_dir}' not found, skipping analysis.")
report['source_stats'] = {'error': 'Directory not found'}
# Analyze target directory if it exists
if os.path.isdir(args.target_dir):
logging.info(f"Analyzing target directory: {args.target_dir}")
target_stats = analyze_directory(args.target_dir)
if target_stats is None:
logging.error(f"Failed to analyze target directory: {args.target_dir}")
success = False # Mark as partial failure
report['target_stats'] = target_stats
else:
logging.warning(f"Target directory '{args.target_dir}' not found, skipping analysis.")
report['target_stats'] = {'error': 'Directory not found'}
if not success:
logging.warning("Stats generation encountered errors analyzing one or both directories.")
# Continue to write partial report
# Write the report to the specified file
stats_file_path = Path(args.stats_file)
try:
# Create parent directories if they don't exist
stats_file_path.parent.mkdir(parents=True, exist_ok=True)
with open(stats_file_path, 'w') as f:
json.dump(report, f, indent=4)
logging.info(f"Successfully wrote statistics report to: {stats_file_path}")
return success # Return True if both analyses succeeded, False otherwise
except OSError as e:
logging.error(f"Error writing statistics report to {stats_file_path}: {e}")
return False
except Exception as e:
logging.error(f"An unexpected error occurred while writing stats report: {e}")
return False
# --- Main Execution ---
def main():
"""Main function to orchestrate the script."""
setup_logging()
args = parse_arguments() # Now handles config loading
# --- Directory Validation ---
# Check source if needed
source_ok = True
if (args.move or args.count or args.generate_stats or args.move_cold): # move_cold needs source as destination
if not os.path.isdir(args.source_dir):
logging.error(f"Source directory '{args.source_dir}' not found or is not a directory.")
source_ok = False
else:
logging.debug(f"Source directory validated: {args.source_dir}")
# Check target if needed
target_ok = True
if (args.move or args.summarize_unused or args.generate_stats or args.move_cold): # move_cold needs target as source
if not os.path.isdir(args.target_dir):
logging.error(f"Target directory '{args.target_dir}' not found or is not a directory.")
target_ok = False
else:
logging.debug(f"Target directory validated: {args.target_dir}")
# Exit if essential directories are missing for the requested actions that *require* them
if not source_ok and (args.move or args.count):
logging.error("Aborting: Source directory required for --move or --count is invalid.")
sys.exit(1)
if not target_ok and (args.summarize_unused):
logging.error("Aborting: Target directory required for --summarize-unused is invalid.")
sys.exit(1)
if (not source_ok or not target_ok) and args.move_cold:
logging.error("Aborting: Both source and target directories required for --move-cold are invalid.")
sys.exit(1)
# Note: generate_stats handles missing dirs internally
# --- Action Execution ---
exit_code = 0 # Track if any operation fails
# --- Find files first if needed by multiple actions ---
hot_files_to_process = None
if args.move or args.count:
# We already checked source_ok above for these actions
hot_files_to_process = find_recent_files(args.source_dir, args.recent_days, args.min_size_bytes)
if hot_files_to_process is None:
logging.error("Aborting due to error finding recent 'hot' files.")
sys.exit(1) # Abort if find failed
cold_files_to_process = None
if args.move_cold:
# We already checked target_ok above for this action
cold_files_to_process = find_stale_files(args.target_dir, args.stale_days, args.min_size_bytes)
if cold_files_to_process is None:
logging.error("Aborting due to error finding 'cold' files.")
sys.exit(1) # Abort if find failed
# --- Execute Actions ---
if args.count:
count_files(hot_files_to_process) # Counts hot files
if args.move:
# We already checked source_ok and target_ok for this action
move_success = move_files(hot_files_to_process, args.source_dir, args.target_dir, args.dry_run, args.interactive)
if not move_success and not args.dry_run:
logging.error("Move 'hot' files operation failed or was cancelled.")
exit_code = 1 # Mark failure
if args.move_cold:
# We already checked source_ok and target_ok for this action
move_cold_success = move_files_cold(cold_files_to_process, args.source_dir, args.target_dir, args.dry_run, args.interactive)
if not move_cold_success and not args.dry_run:
logging.error("Move 'cold' files operation failed or was cancelled.")
exit_code = 1 # Mark failure
if args.summarize_unused:
# We already checked target_ok for this action
summarize_unused(args.target_dir)
if args.generate_stats:
# generate_stats handles its own directory checks internally now
stats_success = generate_stats(args)
if not stats_success:
# generate_stats already logged errors
exit_code = 1
logging.info("Script finished.")
sys.exit(exit_code) # Exit with 0 on success, 1 on failure
if __name__ == "__main__":
main()

View File

@ -1,186 +0,0 @@
#!/usr/bin/env python3
import logging
import requests
from typing import Optional, Dict, List, Any
logger = logging.getLogger(__name__)
class OpenRouterError(Exception):
"""Custom exception for OpenRouter API errors."""
def __init__(self, message: str, status_code: int = None, response: dict = None):
super().__init__(message)
self.status_code = status_code
self.response = response
class OpenRouterResponse:
"""Wrapper for OpenRouter API responses."""
def __init__(self, raw_response: dict):
self.raw_response = raw_response
self.choices = self._parse_choices()
self.usage = self._parse_usage()
self.model = raw_response.get("model")
def _parse_choices(self) -> List[Dict[str, Any]]:
choices = self.raw_response.get("choices", [])
return [
{
"message": choice.get("message", {}),
"finish_reason": choice.get("finish_reason"),
"index": choice.get("index")
}
for choice in choices
]
def _parse_usage(self) -> Dict[str, int]:
usage = self.raw_response.get("usage", {})
return {
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0)
}
class OpenRouterClient:
"""Client for interacting with the OpenRouter API."""
def __init__(self, api_key: str, model_name: str):
if not api_key:
raise ValueError("OpenRouter API key is required")
if not model_name:
raise ValueError("Model name is required")
self.api_key = api_key
self.model_name = model_name
self.base_url = "https://openrouter.ai/api/v1"
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"HTTP-Referer": "https://github.com/OpenRouterTeam/openrouter-examples",
"X-Title": "CV Analysis Tool",
"Content-Type": "application/json"
})
def create_chat_completion(
self,
messages: List[Dict[str, str]],
max_tokens: Optional[int] = None
) -> OpenRouterResponse:
"""
Create a chat completion using the OpenRouter API.
Args:
messages: List of message dictionaries with 'role' and 'content' keys
max_tokens: Maximum number of tokens to generate
Returns:
OpenRouterResponse object containing the API response
Raises:
OpenRouterError: If the API request fails
"""
endpoint = f"{self.base_url}/chat/completions"
payload = {
"model": self.model_name,
"messages": messages
}
if max_tokens is not None:
payload["max_tokens"] = max_tokens
try:
response = self.session.post(endpoint, json=payload)
response.raise_for_status()
return OpenRouterResponse(response.json())
except requests.exceptions.RequestException as e:
raise self._handle_request_error(e)
def get_available_models(self) -> List[Dict[str, Any]]:
"""
Get list of available models from OpenRouter API.
Returns:
List of model information dictionaries
Raises:
OpenRouterError: If the API request fails
"""
endpoint = f"{self.base_url}/models"
try:
logger.debug(f"Fetching available models from: {endpoint}")
response = self.session.get(endpoint)
response.raise_for_status()
data = response.json()
logger.debug(f"Raw API response: {data}")
if not isinstance(data, dict) or "data" not in data:
raise OpenRouterError(
message="Invalid response format from OpenRouter API",
response=data
)
return data
except requests.exceptions.RequestException as e:
raise self._handle_request_error(e)
def verify_model_availability(self) -> bool:
"""
Verify if the configured model is available.
Returns:
True if model is available, False otherwise
"""
try:
response = self.get_available_models()
# OpenRouter API zwraca listę modeli w formacie:
# {"data": [{"id": "model_name", ...}, ...]}
models = response.get("data", [])
logger.debug(f"Available models: {[model.get('id') for model in models]}")
return any(model.get("id") == self.model_name for model in models)
except OpenRouterError as e:
logger.error(f"Failed to verify model availability: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error while verifying model availability: {e}")
return False
def _handle_request_error(self, error: requests.exceptions.RequestException) -> OpenRouterError:
"""Convert requests exceptions to OpenRouterError."""
if error.response is not None:
try:
error_data = error.response.json()
message = error_data.get("error", {}).get("message", str(error))
return OpenRouterError(
message=message,
status_code=error.response.status_code,
response=error_data
)
except ValueError:
pass
return OpenRouterError(str(error))
def initialize_openrouter_client(api_key: str, model_name: str) -> OpenRouterClient:
"""
Initialize and verify OpenRouter client.
Args:
api_key: OpenRouter API key
model_name: Name of the model to use
Returns:
Initialized OpenRouterClient
Raises:
ValueError: If client initialization or verification fails
"""
try:
client = OpenRouterClient(api_key=api_key, model_name=model_name)
# Verify connection and model availability
if not client.verify_model_availability():
raise ValueError(f"Model {model_name} not available")
logger.debug(f"Successfully initialized OpenRouter client with model: {model_name}")
return client
except Exception as e:
logger.error(f"Failed to initialize OpenRouter client: {e}")
raise

View File

@ -6,31 +6,20 @@ import json
import logging import logging
from datetime import datetime, timezone from datetime import datetime, timezone
import uuid import uuid
from typing import Optional, Any, Dict from typing import Optional, Any
import time import time
from dotenv import load_dotenv from dotenv import load_dotenv
import pymongo import pymongo
import openai
from pdfminer.high_level import extract_text from pdfminer.high_level import extract_text
from openrouter_client import initialize_openrouter_client, OpenRouterError, OpenRouterResponse
# Load environment variables # Load environment variables
load_dotenv() load_dotenv()
# Configuration # Configuration
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENROUTER_API_KEY: MODEL_NAME = os.getenv("MODEL_NAME")
# Use logger here if possible, but it might not be configured yet.
# Consider raising the error later or logging after basicConfig.
print("ERROR: OPENROUTER_API_KEY environment variable is required", file=sys.stderr)
sys.exit(1)
OPENROUTER_MODEL_NAME = os.getenv("OPENROUTER_MODEL_NAME")
if not OPENROUTER_MODEL_NAME:
print("ERROR: OPENROUTER_MODEL_NAME environment variable is required", file=sys.stderr)
sys.exit(1)
MAX_TOKENS = int(os.getenv("MAX_TOKENS", 500)) MAX_TOKENS = int(os.getenv("MAX_TOKENS", 500))
USE_MOCKUP = os.getenv("USE_MOCKUP", "false").lower() == "true" USE_MOCKUP = os.getenv("USE_MOCKUP", "false").lower() == "true"
MOCKUP_FILE_PATH = os.getenv("MOCKUP_FILE_PATH") MOCKUP_FILE_PATH = os.getenv("MOCKUP_FILE_PATH")
@ -39,177 +28,109 @@ MONGODB_DATABASE = os.getenv("MONGODB_DATABASE")
MONGO_COLLECTION_NAME = "cv_processing_collection" MONGO_COLLECTION_NAME = "cv_processing_collection"
# Initialize OpenAI client
openai.api_key = OPENAI_API_KEY
# Logging setup # Logging setup
LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper() LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper()
logging.basicConfig( logging.basicConfig(
level=LOG_LEVEL, level=LOG_LEVEL,
format="[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s", format='[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s',
datefmt="%Y-%m-%dT%H:%M:%S%z", datefmt='%Y-%m-%dT%H:%M:%S%z'
) )
logger = logging.getLogger(__name__) # Define logger earlier
# Global variable to hold the client instance
_opernrouter_client_instance = None
def get_opernrouter_client():
"""
Initializes and returns the OpenRouter client instance (lazy initialization).
Ensures the client is initialized only once.
"""
global _opernrouter_client_instance
if _opernrouter_client_instance is None:
logger.info("Initializing OpenRouter client for the first time...")
logger.debug(f"Using model: {OPENROUTER_MODEL_NAME}")
logger.debug("API Key present and valid format: %s", bool(OPENROUTER_API_KEY and OPENROUTER_API_KEY.startswith("sk-or-v1-")))
try:
_opernrouter_client_instance = initialize_openrouter_client(
api_key=OPENROUTER_API_KEY,
model_name=OPENROUTER_MODEL_NAME
)
logger.info(f"Successfully initialized OpenRouter client with model: {OPENROUTER_MODEL_NAME}")
except ValueError as e:
logger.error(f"Configuration error during client initialization: {e}")
# Re-raise or handle appropriately, maybe return None or raise specific error
raise # Re-raise the ValueError to be caught higher up if needed
except Exception as e:
logger.error(f"Failed to initialize OpenRouter client: {e}", exc_info=True)
# Re-raise or handle appropriately
raise # Re-raise the exception
else:
logger.debug("Returning existing OpenRouter client instance.")
return _opernrouter_client_instance
def get_mongo_collection(): def get_mongo_collection():
"""Initialize and return MongoDB collection.""" """Initialize and return MongoDB collection."""
# Consider lazy initialization for MongoDB as well if beneficial
mongo_client = pymongo.MongoClient(MONGODB_URI) mongo_client = pymongo.MongoClient(MONGODB_URI)
db = mongo_client[MONGODB_DATABASE] db = mongo_client[MONGODB_DATABASE]
return db[MONGO_COLLECTION_NAME] return db[MONGO_COLLECTION_NAME]
logger = logging.getLogger(__name__)
def parse_arguments():
"""Parses command line arguments."""
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="""This tool analyzes resumes using the OpenRouter API. Parameters are required to run the analysis.
Required Environment Variables:
- OPENROUTER_API_KEY: Your OpenRouter API key
- OPENROUTER_MODEL_NAME: OpenRouter model to use (e.g. google/gemma-7b-it)
- MONGODB_URI: MongoDB connection string (optional for mockup mode)
- MAX_TOKENS: Maximum tokens for response (default: 500)""",
usage="resume_analysis.py [-h] [-f FILE] [-m]",
epilog="""Examples:
Analyze a resume: resume_analysis.py -f my_resume.pdf
Test with mockup data: resume_analysis.py -f test.pdf -m
Note: Make sure your OpenRouter API key and model name are properly configured in the .env file.""",
)
parser.add_argument(
"-f", "--file", help="Path to the resume file to analyze (PDF or text)"
)
parser.add_argument(
"-m", "--mockup", action="store_true", help="Use mockup response instead of calling LLM API"
)
if len(sys.argv) == 1:
parser.print_help()
return None
return parser.parse_args()
def load_resume_text(args):
"""Loads resume text from a file or uses mockup text."""
use_mockup = args.mockup
if use_mockup:
resume_text = "Mockup resume text"
else:
if not os.path.exists(args.file):
logger.error(f"File not found: {args.file}")
sys.exit(1)
start_file_read_time = time.time()
if args.file.lower().endswith(".pdf"):
logger.debug(f"Using pdfminer to extract text from PDF: {args.file}")
resume_text = extract_text(args.file)
else:
with open(
args.file, "r", encoding="utf-8"
) as f: # Explicitly specify utf-8 encoding for text files
resume_text = f.read()
file_read_time = time.time() - start_file_read_time
logger.debug(f"File read time: {file_read_time:.2f} seconds")
return resume_text
def analyze_resume_with_llm(resume_text, use_mockup):
"""Analyzes resume text using OpenRouter API."""
start_time = time.time()
response = call_llm_api(resume_text, use_mockup)
llm_api_time = time.time() - start_time
logger.debug(f"LLM API call time: {llm_api_time:.2f} seconds")
return response
def store_llm_response(response, use_mockup, input_file_path):
"""Writes raw LLM response to a file."""
write_llm_response(response, use_mockup, input_file_path)
def save_processing_data(resume_text, summary, response, args, processing_id, use_mockup, cv_collection):
"""Saves processing data to MongoDB."""
insert_processing_data(
resume_text,
summary,
response,
args,
processing_id,
use_mockup,
cv_collection,
)
def get_cv_summary_from_response(response):
"""Extracts CV summary from LLM response."""
if response and hasattr(response, "choices"):
message_content = response.choices[0]['message']['content']
try:
summary = json.loads(message_content)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse LLM response: {e}")
summary = {"error": "Invalid JSON response from LLM"}
else:
summary = {"error": "No response from LLM"}
return summary
def main(): def main():
"""Main function to process the resume.""" """Main function to process the resume."""
args = parse_arguments() parser = argparse.ArgumentParser(
if args is None: formatter_class=argparse.RawDescriptionHelpFormatter,
return description="""This tool analyzes resumes using OpenAI's API. Parameters are required to run the analysis.
use_mockup = args.mockup # Ustal, czy używać makiety na podstawie flagi -m
Required Environment Variables:
- OPENAI_API_KEY: Your OpenAI API key
- MODEL_NAME: OpenAI model to use (e.g. gpt-3.5-turbo)
- MONGODB_URI: MongoDB connection string (optional for mockup mode)""",
usage="resume_analysis.py [-h] [-f FILE] [-m]",
epilog="""Examples:
Analyze a resume: resume_analysis.py -f my_resume.txt
Test with mockup data: resume_analysis.py -f test.txt -m"""
)
parser.add_argument('-f', '--file', help='Path to the resume file to analyze (TXT)')
parser.add_argument('-p', '--pdf', help='Path to the resume file to analyze (PDF)')
parser.add_argument('-m', '--mockup', action='store_true', help='Use mockup response instead of calling OpenAI API')
# If no arguments provided, show help and exit
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
args = parser.parse_args()
# Determine whether to use mockup based on the -m flag, overriding USE_MOCKUP
use_mockup = args.mockup
# Load the resume text from the provided file or use mockup
if use_mockup:
resume_text = "Mockup resume text"
else:
if args.pdf:
if not os.path.exists(args.pdf):
logger.error(f"PDF file not found: {args.pdf}")
sys.exit(1)
start_file_read_time = time.time()
try:
resume_text = extract_text(args.pdf)
except Exception as e:
logger.error(f"Error extracting text from PDF: {e}", exc_info=True)
sys.exit(1)
file_read_time = time.time() - start_file_read_time
logger.debug(f"PDF file read time: {file_read_time:.2f} seconds")
# Save extracted text to file
pdf_filename = os.path.splitext(os.path.basename(args.pdf))[0]
text_file_path = os.path.join(os.path.dirname(args.pdf), f"{pdf_filename}_text.txt")
with open(text_file_path, "w", encoding="utf-8") as text_file:
text_file.write(resume_text)
logger.debug(f"Extracted text saved to: {text_file_path}")
elif args.file:
if not os.path.exists(args.file):
logger.error(f"File not found: {args.file}")
sys.exit(1)
start_file_read_time = time.time()
with open(args.file, 'r', encoding='latin-1') as f:
resume_text = f.read()
file_read_time = time.time() - start_file_read_time
logger.debug(f"File read time: {file_read_time:.2f} seconds")
else:
parser.print_help()
sys.exit(1)
# Call the OpenAI API with the resume text
start_time = time.time()
try: try:
resume_text = load_resume_text(args) response = call_openai_api(resume_text, use_mockup)
except FileNotFoundError as e: openai_api_time = time.time() - start_time
logger.error(f"File error: {e}") logger.debug(f"OpenAI API call time: {openai_api_time:.2f} seconds")
sys.exit(1)
except Exception as e: except Exception as e:
logger.error(f"Error loading resume text: {e}") logger.error(f"Error during OpenAI API call: {e}", exc_info=True)
sys.exit(1) response = None
# Initialize MongoDB collection only when needed
response = analyze_resume_with_llm(resume_text, use_mockup)
store_llm_response(response, use_mockup, args.file)
cv_collection = get_mongo_collection() cv_collection = get_mongo_collection()
processing_id = str(uuid.uuid4())
summary = get_cv_summary_from_response(response)
save_processing_data(resume_text, summary, response, args, processing_id, use_mockup, cv_collection)
logger.info(f"Resume analysis completed. Processing ID: {processing_id}")
# Measure MongoDB insertion time
start_mongo_time = time.time()
cost = insert_processing_data(resume_text, {}, response, args, str(uuid.uuid4()), use_mockup, cv_collection)
mongo_insert_time = time.time() - start_mongo_time
logger.debug(f"MongoDB insert time: {mongo_insert_time:.2f} seconds")
write_openai_response(response, use_mockup, args.file, cost)
def load_mockup_response(mockup_file_path: str) -> dict: def load_mockup_response(mockup_file_path: str) -> dict:
"""Load mockup response from a JSON file.""" """Load mockup response from a JSON file."""
@ -218,190 +139,154 @@ def load_mockup_response(mockup_file_path: str) -> dict:
raise FileNotFoundError(f"Mockup file not found at: {mockup_file_path}") raise FileNotFoundError(f"Mockup file not found at: {mockup_file_path}")
with open(mockup_file_path, "r") as f: with open(mockup_file_path, "r") as f:
response = json.load(f) response = json.load(f)
response.setdefault( #response.setdefault("openai_stats", {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
"llm_stats", {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
)
return response return response
def call_openai_api(text: str, use_mockup: bool) -> Optional[Any]:
def call_llm_api(text: str, use_mockup: bool) -> Optional[OpenRouterResponse]: """Call OpenAI API to analyze resume text."""
"""Call OpenRouter API to analyze resume text.""" logger.debug("Calling OpenAI API.")
if use_mockup:
logger.debug("Using mockup response.")
return load_mockup_response(MOCKUP_FILE_PATH)
prompt_path = os.path.join(os.path.dirname(__file__), "prompt.txt")
logger.debug(f"Loading system prompt from: {prompt_path}")
try: try:
# Load system prompt if use_mockup:
if not os.path.exists(prompt_path): return load_mockup_response(os.path.join(os.path.dirname(__file__), 'tests', 'mockup_response.json'))
raise FileNotFoundError(f"System prompt file not found: {prompt_path}")
with open(os.path.join(os.path.dirname(__file__), "prompt.txt"), "r") as prompt_file:
with open(prompt_path, "r") as prompt_file:
system_content = prompt_file.read() system_content = prompt_file.read()
if not system_content.strip():
raise ValueError("System prompt file is empty")
# Prepare messages response = openai.chat.completions.create(
messages = [ model=MODEL_NAME,
{"role": "system", "content": system_content}, messages=[
{"role": "user", "content": text} {"role": "system", "content": system_content},
] {"role": "user", "content": text}
],
logger.debug("Prepared messages for API call:")
logger.debug(f"System message length: {len(system_content)} chars")
logger.debug(f"User message length: {len(text)} chars")
# Call OpenRouter API
logger.info(f"Calling OpenRouter API with model: {OPENROUTER_MODEL_NAME}")
logger.debug(f"Max tokens set to: {MAX_TOKENS}")
# Get the client instance (initializes on first call)
try:
client = get_opernrouter_client()
except Exception as e:
logger.error(f"Failed to get OpenRouter client: {e}")
return None # Cannot proceed without a client
response = client.create_chat_completion(
messages=messages,
max_tokens=MAX_TOKENS max_tokens=MAX_TOKENS
) )
logger.debug(f"OpenAI API response: {response}")
# Validate response
if not response.choices:
logger.warning("API response contains no choices")
return None
# Log response details
logger.info("Successfully received API response")
logger.debug(f"Response model: {response.model}")
logger.debug(f"Token usage: {response.usage}")
logger.debug(f"Number of choices: {len(response.choices)}")
return response return response
except FileNotFoundError as e:
logger.error(f"File error: {e}")
return None
except OpenRouterError as e:
logger.error(f"OpenRouter API error: {e}", exc_info=True)
if hasattr(e, 'response'):
logger.error(f"Error response: {e.response}")
return None
except Exception as e: except Exception as e:
logger.error(f"Unexpected error during API call: {e}", exc_info=True) logger.error(f"Error during OpenAI API call: {e}", exc_info=True)
return None return None
def write_openai_response(response: Any, use_mockup: bool, input_file_path: str = None, cost: float = 0) -> None:
def write_llm_response( """Write raw OpenAI response to a file."""
response: Optional[OpenRouterResponse], use_mockup: bool, input_file_path: str = None
) -> None:
"""Write raw LLM response to a file."""
if use_mockup: if use_mockup:
logger.debug("Using mockup response; no LLM message to write.") logger.debug("Using mockup response; no OpenAI message to write.")
return return
if response is None: if response and response.choices:
logger.warning("No response to write") message_content = response.choices[0].message.content
return logger.debug(f"Raw OpenAI message content: {message_content}")
if not response.choices: if input_file_path:
logger.warning("No choices in LLM response") output_dir = os.path.dirname(input_file_path)
logger.debug(f"Response object: {response.raw_response}") base_filename = os.path.splitext(os.path.basename(input_file_path))[0]
return else:
logger.warning("Input file path not provided. Using default output directory and filename.")
output_dir = os.path.join(os.path.dirname(__file__)) # Default to script's directory
base_filename = "default" # Default filename
try:
# Get output directory and base filename
output_dir = os.path.dirname(input_file_path) if input_file_path else "."
base_filename = (
os.path.splitext(os.path.basename(input_file_path))[0]
if input_file_path
else "default"
)
# Generate unique file path
processing_id = str(uuid.uuid4()) processing_id = str(uuid.uuid4())
now = datetime.now() file_path = os.path.join(output_dir, f"{base_filename}_openai_response_{processing_id}") + ".json"
timestamp_str = now.strftime("%Y%m%d_%H%M%S") openai_file_path = os.path.join(output_dir, f"{base_filename}_openai.txt")
file_path = os.path.join(
output_dir, f"{base_filename}_llm_response_{timestamp_str}_{processing_id}"
) + ".json"
# Prepare serializable response try:
serializable_response = { message_content = response.choices[0].message.content if response and response.choices else "No content"
"choices": response.choices, with open(openai_file_path, "w", encoding="utf-8") as openai_file:
"usage": response.usage, openai_file.write(message_content)
"model": response.model, logger.debug(f"OpenAI response written to {openai_file_path}")
"raw_response": response.raw_response
}
# Write response to file serializable_response = {
with open(file_path, "w") as f: "choices": [
json.dump(serializable_response, f, indent=2) {
logger.debug(f"LLM response written to {file_path}") "message": {
"content": choice.message.content,
except IOError as e: "role": choice.message.role
logger.error(f"Failed to write LLM response to file: {e}") },
except Exception as e: "finish_reason": choice.finish_reason,
logger.error(f"Unexpected error while writing response: {e}", exc_info=True) "index": choice.index
} for choice in response.choices
],
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
},
"cost": cost, # Include cost in the output JSON
"model": response.model
}
with open(file_path, "w") as f:
json.dump(serializable_response, f, indent=2, ensure_ascii=False)
logger.debug(f"OpenAI response written to {file_path}")
except IOError as e:
logger.error(f"Failed to write OpenAI response to file: {e}")
else:
logger.warning("No choices in OpenAI response to extract message from.")
logger.debug(f"Response object: {response}")
def insert_processing_data(text_content: str, summary: dict, response: Any, args: argparse.Namespace, processing_id: str, use_mockup: bool, cv_collection) -> float:
def insert_processing_data(
text_content: str,
summary: dict,
response: Optional[OpenRouterResponse],
args: argparse.Namespace,
processing_id: str,
use_mockup: bool,
cv_collection,
) -> None:
"""Insert processing data into MongoDB.""" """Insert processing data into MongoDB."""
if use_mockup: logger.debug("Inserting processing data into MongoDB.")
logger.debug("Using mockup; skipping MongoDB insertion.") cost = 0.0 # Initialize cost to 0.0
return if not use_mockup:
if response and response.choices:
message_content = response.choices[0].message.content
openai_stats = {} # Initialize openai_stats
try:
# Attempt to decode JSON, handling potential decode errors
openai_stats_content = json.loads(message_content.encode('utf-8').decode('unicode_escape'))
openai_stats = openai_stats_content.get("openai_stats", {})
cost = openai_stats.get("cost", 0.0)
except json.JSONDecodeError as e:
logger.error(f"JSONDecodeError in message_content: {e}", exc_info=True)
cost = 0.0
except AttributeError as e:
logger.error(f"AttributeError accessing openai_stats: {e}", exc_info=True)
cost = 0.0
except Exception as e:
logger.error(f"Unexpected error extracting cost: {e}", exc_info=True)
cost = 0.0
logger.debug("Preparing processing data for MongoDB insertion.") except AttributeError as e:
logger.error(f"AttributeError when accessing openai_stats or cost: {e}", exc_info=True)
# Initialize default values cost = 0.0
usage_data = {
"input_tokens": 0, try:
"output_tokens": 0, usage = response.usage
"total_tokens": 0 input_tokens = usage.prompt_tokens
} output_tokens = usage.completion_tokens
total_tokens = usage.total_tokens
# Extract usage data if available except Exception as e:
if response and response.usage: logger.error(f"Error extracting usage data: {e}", exc_info=True)
usage_data = { input_tokens = output_tokens = total_tokens = 0
"input_tokens": response.usage.get("prompt_tokens", 0),
"output_tokens": response.usage.get("completion_tokens", 0), else:
"total_tokens": response.usage.get("total_tokens", 0) logger.error("Invalid response format or missing usage data.")
input_tokens = output_tokens = total_tokens = 0
cost = 0.0
openai_stats = {}
usage = {}
processing_data = {
"processing_id": processing_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"text_content": text_content,
"summary": summary,
"usage_prompt_tokens": input_tokens, # Renamed to avoid collision
"usage_completion_tokens": output_tokens, # Renamed to avoid collision
"usage_total_tokens": total_tokens, # Renamed to avoid collision
"cost": cost
} }
# Prepare processing data try:
processing_data = { cv_collection.insert_one(processing_data)
"processing_id": processing_id, logger.debug(f"Inserted processing data for ID: {processing_id}")
"timestamp": datetime.now(timezone.utc).isoformat(), return cost # Return the cost
"text_content": text_content, except Exception as e:
"summary": summary, logger.error(f"Failed to insert processing data into MongoDB: {e}", exc_info=True)
"model": response.model if response else None, else:
**usage_data, logger.debug("Using mockup; skipping MongoDB insertion.")
"raw_response": response.raw_response if response else None return cost # Return 0 for mockup mode
}
# Insert into MongoDB
try:
cv_collection.insert_one(processing_data)
logger.debug(f"Successfully inserted processing data for ID: {processing_id}")
logger.debug(f"Token usage - Input: {usage_data['input_tokens']}, "
f"Output: {usage_data['output_tokens']}, "
f"Total: {usage_data['total_tokens']}")
except Exception as e:
logger.error(f"Failed to insert processing data into MongoDB: {e}", exc_info=True)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -0,0 +1,174 @@
import os
import sys
import pytest
from unittest.mock import patch, MagicMock
import json
import logging
import argparse # Import argparse
from dotenv import load_dotenv
# Add the project root to the sys path to allow imports from the main package
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from resume_analysis import (
call_openai_api,
insert_processing_data,
load_mockup_response,
main,
get_mongo_collection
)
# Load environment variables for testing
load_dotenv()
# Constants for Mocking
MOCKUP_FILE_PATH = os.path.join(os.path.dirname(__file__), 'mockup_response.json')
TEST_RESUME_PATH = os.path.join(os.path.dirname(__file__), 'test_resume.txt')
# Create a logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Create a handler and set the formatter
ch = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
# Add the handler to the logger
logger.addHandler(ch)
# Mockup response data
MOCKUP_RESPONSE_DATA = {
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1677652288,
"model": "gpt-3.5-turbo-0301",
"usage": {
"prompt_tokens": 100,
"completion_tokens": 200,
"total_tokens": 300
},
"choices": [
{
"message": {
"role": "assistant",
"content": '{"openai_stats": {"prompt_tokens": 100, "completion_tokens": 200, "total_tokens": 300}}'
},
"finish_reason": "stop",
"index": 0
}
]
}
# Fixtures
@pytest.fixture
def mock_openai_response():
mock_response = MagicMock()
mock_response.id = "chatcmpl-123"
mock_response.object = "chat.completion"
mock_response.created = 1677652288
mock_response.model = "gpt-3.5-turbo-0301"
mock_response.usage = MagicMock(prompt_tokens=100, completion_tokens=200, total_tokens=300)
mock_response.choices = [MagicMock(message=MagicMock(role="assistant", content='{"openai_stats": {"prompt_tokens": 100, "completion_tokens": 200, "total_tokens": 300}}'), finish_reason="stop", index=0)]
return mock_response
@pytest.fixture
def test_resume_file():
# Create a dummy resume file for testing
with open(TEST_RESUME_PATH, 'w') as f:
f.write("This is a test resume.")
yield TEST_RESUME_PATH
os.remove(TEST_RESUME_PATH)
@pytest.fixture
def mock_mongo_collection():
# Mock MongoDB collection for testing
class MockMongoCollection:
def __init__(self):
self.inserted_data = None
def insert_one(self, data):
self.inserted_data = data
return MockMongoCollection()
# Unit Tests
def test_load_mockup_response():
# Create a mockup response file
with open(MOCKUP_FILE_PATH, 'w') as f:
json.dump(MOCKUP_RESPONSE_DATA, f)
response = load_mockup_response(MOCKUP_FILE_PATH)
assert response == MOCKUP_RESPONSE_DATA
os.remove(MOCKUP_FILE_PATH)
def test_load_mockup_response_file_not_found():
with pytest.raises(FileNotFoundError):
load_mockup_response("non_existent_file.json")
@patch("resume_analysis.openai.chat.completions.create")
def test_call_openai_api_success(mock_openai_chat_completions_create, mock_openai_response):
mock_openai_chat_completions_create.return_value = mock_openai_response
response = call_openai_api("test resume text", False)
assert response == mock_openai_response
@patch("resume_analysis.openai.chat.completions.create")
def test_call_openai_api_failure(mock_openai_chat_completions_create):
mock_openai_chat_completions_create.side_effect = Exception("API error")
response = call_openai_api("test resume text", False)
assert response is None
def test_call_openai_api_mockup_mode():
# Create a mockup response file
with open(MOCKUP_FILE_PATH, 'w') as f:
json.dump(MOCKUP_RESPONSE_DATA, f)
response = call_openai_api("test resume text", True)
assert response == MOCKUP_RESPONSE_DATA
os.remove(MOCKUP_FILE_PATH)
def test_insert_processing_data_success(mock_openai_response, mock_mongo_collection):
args = argparse.Namespace(file="test.pdf")
cost = insert_processing_data("test resume text", {}, mock_openai_response, args, "test_id", False, mock_mongo_collection)
assert mock_mongo_collection.inserted_data is not None
assert cost == 0
def test_insert_processing_data_mockup_mode(mock_mongo_collection):
args = argparse.Namespace(file="test.pdf")
cost = insert_processing_data("test resume text", {}, MOCKUP_RESPONSE_DATA, args, "test_id", True, mock_mongo_collection)
assert mock_mongo_collection.inserted_data is None
assert cost == 0
@patch("resume_analysis.get_mongo_collection")
def test_main_success(mock_get_mongo_collection, test_resume_file, mock_openai_response):
mock_get_mongo_collection.return_value.insert_one.return_value = None
with patch("resume_analysis.call_openai_api") as mock_call_openai_api:
mock_call_openai_api.return_value = mock_openai_response
with patch("resume_analysis.write_openai_response") as mock_write_openai_response:
sys.argv = ["resume_analysis.py", "-f", test_resume_file]
main()
assert mock_call_openai_api.called
assert mock_write_openai_response.called
@patch("resume_analysis.get_mongo_collection")
def test_main_mockup_mode(mock_get_mongo_collection, test_resume_file, mock_openai_response):
mock_get_mongo_collection.return_value.insert_one.return_value = None
with patch("resume_analysis.call_openai_api") as mock_call_openai_api:
mock_call_openai_api.return_value = mock_openai_response
with patch("resume_analysis.write_openai_response") as mock_write_openai_response:
sys.argv = ["resume_analysis.py", "-f", test_resume_file, "-m"]
main()
assert mock_call_openai_api.called
assert mock_write_openai_response.called
def test_main_file_not_found():
with pytest.raises(SystemExit) as pytest_wrapped_e:
sys.argv = ["resume_analysis.py", "-f", "non_existent_file.pdf"]
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
def test_get_mongo_collection():
# Test that the function returns a valid MongoDB collection object
collection = get_mongo_collection()
assert collection is not None

32
plan.md Normal file
View File

@ -0,0 +1,32 @@
# Plan for Modifying resume_analysis.py
## Objective
Modify the `my-app/utils/resume_analysis.py` script to save the extracted text from a PDF file and the OpenAI response to separate text files, with filenames derived from the original PDF's basename.
## Steps
1. **Examine `resume_analysis.py`:** Read the file to understand the existing PDF processing logic and how the OpenAI response is handled.
2. **Clarify Naming Convention:** Confirm the exact naming convention for the output files.
3. **Implement Changes:** Modify the script to:
* Extract the PDF's basename.
* Save the extracted text to a file named `basename._text.txt` in the same directory as the PDF.
* Save the OpenAI response to a file named `basename_openai.txt` in the same directory.
4. **Test:** Ensure that the changes work correctly for different PDF files and that the output files are created with the correct content and naming.
5. **Create a Plan File:** Create a markdown file with the plan.
6. **Switch Mode:** Switch to code mode to implement the changes.
## Mermaid Diagram
```mermaid
graph LR
A[Start] --> B{Examine resume_analysis.py};
B --> C{Clarify Naming Convention};
C --> D{Modify Script};
D --> E{Extract PDF Basename};
E --> F{Save Extracted Text};
F --> G{Save OpenAI Response};
G --> H{Test Changes};
H --> I{Create Plan File};
I --> J{Switch to Code Mode};
J --> K[End];