From e73b43e00150037422d5f94b941ea9295ff036f5 Mon Sep 17 00:00:00 2001 From: Ireneusz Bachanowicz Date: Fri, 18 Jul 2025 23:05:18 +0200 Subject: [PATCH] feat: Implement Jira webhook processor with retry logic and service configuration; enhance database interaction for analysis records --- .roo/mcp.json | 11 ++- api/handlers.py | 54 ++++++++++++- database/crud.py | 48 +++++++++--- database/models.py | 5 +- jira_analyses.db | Bin 28672 -> 40960 bytes jira_processor.py | 170 ++++++++++++++++++++++++++++++++++++++++ jira_processor.service | 16 ++++ jira_webhook_llm.py | 31 +++----- tests/conftest.py | 7 +- tests/test_core.py | 174 +++++++++++++++++++++++++++++++++++++++-- webhooks/handlers.py | 131 +++---------------------------- 11 files changed, 485 insertions(+), 162 deletions(-) create mode 100644 jira_processor.py create mode 100644 jira_processor.service diff --git a/.roo/mcp.json b/.roo/mcp.json index 6b0a486..6d0c0bb 100644 --- a/.roo/mcp.json +++ b/.roo/mcp.json @@ -1 +1,10 @@ -{"mcpServers":{}} \ No newline at end of file +{"mcpServers":{ "context7": { + "command": "npx", + "args": [ + "-y", + "@upstash/context7-mcp" + ], + "env": { + "DEFAULT_MINIMUM_TOKENS": "256" + } + }}} \ No newline at end of file diff --git a/api/handlers.py b/api/handlers.py index 9b22458..7a8a72b 100644 --- a/api/handlers.py +++ b/api/handlers.py @@ -5,7 +5,7 @@ import config from llm.models import LLMResponse, JiraWebhookPayload, JiraAnalysisResponse from database.database import get_db_session # Removed Session import here from sqlalchemy.orm import Session # Added correct SQLAlchemy import -from database.crud import get_all_analysis_records, delete_all_analysis_records, get_analysis_by_id, create_analysis_record +from database.crud import get_all_analysis_records, delete_all_analysis_records, get_analysis_by_id, create_analysis_record, get_pending_analysis_records, update_record_status router = APIRouter( prefix="/api", @@ -86,4 +86,54 @@ async def get_analysis_record_endpoint(record_id: int, db: Session = Depends(get record = get_analysis_by_id(db, record_id) if not record: raise HTTPException(status_code=404, detail="Analysis record not found") - return JiraAnalysisResponse.model_validate(record) \ No newline at end of file + return JiraAnalysisResponse.model_validate(record) + +@router.get("/queue/pending") +async def get_pending_queue_records_endpoint(db: Session = Depends(get_db_session)): + """Get all pending or retrying analysis records.""" + try: + records = get_pending_analysis_records(db) + # Convert records to serializable format + serialized_records = [] + for record in records: + record_dict = JiraAnalysisResponse.model_validate(record).model_dump() + # Convert datetime fields to ISO format + record_dict["created_at"] = record_dict["created_at"].isoformat() if record_dict["created_at"] else None + record_dict["updated_at"] = record_dict["updated_at"].isoformat() if record_dict["updated_at"] else None + serialized_records.append(record_dict) + + return JSONResponse( + status_code=200, + content={"data": serialized_records} + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.post("/queue/{record_id}/retry", status_code=200) +async def retry_analysis_record_endpoint(record_id: int, db: Session = Depends(get_db_session)): + """Manually trigger a retry for a failed or validation_failed analysis record.""" + db_record = get_analysis_by_id(db, record_id) + if not db_record: + raise HTTPException(status_code=404, detail="Analysis record not found") + + if db_record.status not in ["failed", "validation_failed"]: + raise HTTPException(status_code=400, detail=f"Record status is '{db_record.status}'. Only 'failed' or 'validation_failed' records can be retried.") + + # Reset status to pending and clear error message for retry + updated_record = update_record_status( + db=db, + record_id=record_id, + status="pending", + error_message=None, + analysis_result=None, + raw_response=None, + next_retry_at=None # Reset retry time + ) + + if not updated_record: + raise HTTPException(status_code=500, detail="Failed to update record for retry.") + + return JSONResponse( + status_code=200, + content={"message": f"Record {record_id} marked for retry.", "record_id": updated_record.id} + ) \ No newline at end of file diff --git a/database/crud.py b/database/crud.py index d317385..9cd6a07 100644 --- a/database/crud.py +++ b/database/crud.py @@ -15,7 +15,10 @@ def create_analysis_record(db: Session, payload: JiraWebhookPayload) -> JiraAnal issue_summary=payload.summary, request_payload=payload.model_dump(), created_at=datetime.now(timezone.utc), - updated_at=datetime.now(timezone.utc) + updated_at=datetime.now(timezone.utc), + retry_count=0, + last_processed_at=None, + next_retry_at=None ) db.add(db_analysis) db.commit() @@ -32,30 +35,53 @@ def get_analysis_record(db: Session, issue_key: str) -> Optional[JiraAnalysis]: logger.debug(f"No analysis record found for {issue_key}") return record -def update_analysis_record( +def update_record_status( db: Session, record_id: int, status: str, analysis_result: Optional[Dict[str, Any]] = None, error_message: Optional[str] = None, - raw_response: Optional[Dict[str, Any]] = None + raw_response: Optional[Dict[str, Any]] = None, + retry_count_increment: int = 0, + last_processed_at: Optional[datetime] = None, + next_retry_at: Optional[datetime] = None ) -> Optional[JiraAnalysis]: """Updates an existing Jira analysis record.""" db_analysis = db.query(JiraAnalysis).filter(JiraAnalysis.id == record_id).first() if db_analysis: db_analysis.status = status db_analysis.updated_at = datetime.now(timezone.utc) - if analysis_result: - db_analysis.analysis_result = analysis_result - if error_message: - db_analysis.error_message = error_message - if raw_response: - db_analysis.raw_response = json.dumps(raw_response) + # Only update if not None, allowing explicit None to clear values + # Always update these fields if provided, allowing explicit None to clear them + db_analysis.analysis_result = analysis_result + db_analysis.error_message = error_message + db_analysis.raw_response = json.dumps(raw_response) if raw_response is not None else None + + if retry_count_increment > 0: + db_analysis.retry_count += retry_count_increment + + db_analysis.last_processed_at = last_processed_at + db_analysis.next_retry_at = next_retry_at + + # When status is set to "pending", clear relevant fields for retry + if status == "pending": + db_analysis.analysis_result = None + db_analysis.error_message = None + db_analysis.raw_response = None + db_analysis.next_retry_at = None db.commit() db.refresh(db_analysis) return db_analysis +def get_pending_analysis_records(db: Session) -> list[JiraAnalysis]: + """Retrieves all pending or retrying analysis records that are ready for processing.""" + now = datetime.now(timezone.utc) + return db.query(JiraAnalysis).filter( + (JiraAnalysis.status == "pending") | + ((JiraAnalysis.status == "retrying") & (JiraAnalysis.next_retry_at <= now)) + ).order_by(JiraAnalysis.created_at.asc()).all() + def get_all_analysis_records(db: Session) -> list[JiraAnalysis]: """Retrieves all analysis records from the database.""" return db.query(JiraAnalysis).all() @@ -70,6 +96,6 @@ def delete_all_analysis_records(db: Session) -> int: db.query(JiraAnalysis).delete() db.commit() return count - db.commit() db.query(JiraAnalysis).delete() - db.commit() \ No newline at end of file + db.commit() + return count \ No newline at end of file diff --git a/database/models.py b/database/models.py index 95f2be2..4bc6f34 100644 --- a/database/models.py +++ b/database/models.py @@ -16,4 +16,7 @@ class JiraAnalysis(Base): created_at = Column(DateTime, default=datetime.utcnow, nullable=False) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) error_message = Column(Text, nullable=True) # To store any error messages - raw_response = Column(JSON, nullable=True) # Store raw LLM response before validation \ No newline at end of file + raw_response = Column(JSON, nullable=True) # Store raw LLM response before validation + retry_count = Column(Integer, default=0, nullable=False) + last_processed_at = Column(DateTime, nullable=True) + next_retry_at = Column(DateTime, nullable=True) \ No newline at end of file diff --git a/jira_analyses.db b/jira_analyses.db index f913f03b4bbf456677f22f200ceca9c436416d62..9b954044f795de39b0e211cfdf4035dfa02a2b76 100644 GIT binary patch literal 40960 zcmeHQTXP#nb|$6u+VaNn<{{-LS32CPNTR`63@{)_ti2L=kwlXaX#$e0q7+nfra_E2 zn8D745Tw$BD^;n=YyLo<^E>hj@|2&jmG@LC4|&+{^jv@$KomuLZLQo?i3Db*d-{Cm z(mm6szSI6@&khCadH#Thth(^)g~i2%+l(zNEPRCjALIWA_ zK|A}GPjJE2Po6IP_3u9UFZ@wI3ITy)5Kssx1QY@a0fm4<;Qd43`PC0UYhGXc z?r&|^63^}D-DkGXyWHi@I1oYC4uVK@PsKQQ_!pap+s)24+qu8B{VjWaBes8^h;C(Yi?}i8eV z-ga@!A78UA$lBSyyM4&+?|0b!2YY)ZhQld3>|yh8^IkK5HVAnb1#?fu4TESf;Qp9( zw!cN6Q~mnl50MDM?vRfik6Y~PHh`SIAl8o^bbS#-PAD&+0hzwwp|HAmPTqooIy4`oWdIS@`_# z#J}x+_d&C{@ZHUinhW>Zt(EoKx^8-dp(9}Gn=ndvfalvD8~n$=Tej)3G28Mrc4k{V z@_oU4VfOh8Zh0@4q~Gm|@lt~=rClwR*izz&bU4@Hlc-!4>*sb#%`Fj_zC8?W&!sLl zBH=PiwzW8GTF>Ak9y9qcjC{^4!G_!ox#MtVdM-o!vrTTItq5?^p@W%7u_zB`MA6uu zhpTyr--s0QUb>5>5ZI)HV-|RhZQ8i;(DPZ3pW!zjdSiU&7zJ+5;ho30F*WX0T_!LT z(}{w~0hc`#;*7Ia1Ox+i;^S)kZ8|0<3AXK?*+3Oz4j|LOX7&NM2o5M`Al#6k`0A_8 z{Z?!Hey5A@wcRbV{MT;tLFeB7VS}|j479~v(*`Uh*5=L`y7t+IgF<~|$yTaO#;o(Hkx(U;gUv;<*7?n8hQ+v6!>ku`xd z0SRgZpn?h4M5_?OL&NCnnByRXj}_sI9_AQs@dmr^G4B`?fE$Vub3JN%MwXadK+Pqz zzdg9)`KCC0uCb%r8yqmZ-pDpzlvv9a=8JIv<>*F*kJybJU!2~2T4J5P9WZIg_>C?d zis%dwZEYU1Ez#pNA)IjsGf0VreY`le{UBsAw6i3%Yv=`W*AheFS^WLgSO~a7!JU(LqOH{BFAFKFm+5EI0)0q z3;eCd9?=jmfy5W~aUtuvSuG_X8tP&tcRW?MYutR0FG%pvI8Lk5}Hp`csRn3i9fp5ytA-;~Q6 z$bdO+Y#)Gnz5qmtE+3PWus*L_zYLczfD%FjBaLhbT;q`W*L_c9^<=u*upnBGwn^1t(#@&9BJ| z`LT!QuuF0yhD9HYHuWIKhWB~B6b?8g-D>ChK;9;ZfSBErlBG8qNf<3VFryer+*Ys= zLL+&U>7C)LrNKH+$XH(&D%g|>vE@|SQ1HPrTfq-I`o}wuy4}6~he4P0Vuz0C?ln8j zRu?x2V8~s))@{Yg>^_tMrTaiGAT=>?p0e(Zb=_d68#iIr6!sl{ESvyQ|7d9macU`E z|I#dQC@>kF2{}1%h;-r#At%Z=2R!OHL6TZz5kIr;Rjn!`{?ef4%Sv-Q|KE`@oum1O7@z)EV|9$tn|Ew)w z9kh@xgDQGGSzp834ly0x;kF~q$z^vPOzjRMt#3bHdC*2s4Taz1rkJw`LPrGi)FSA@ zy-~Zu>gS6u=&lq#rOidhjBJh}u;h-z^XPaf3-C}H&8Cj$oziH%*!id*U%((khRHhc zFje92(1R7>fsWluy#PxFp9dpooGsr$Bxo$NO+)|-v3Xo#P1iwCKXqE`-?H+M`GvBN7XRi~G>ZC~ko@AA&hu7B{k_ z(3n9+eM~+G|M5%xCy)5Kssx1QY@a0fm4y)5Kssx1QY@a0fm4hapb$_9Cy)5Kssx1QY@a0fm4cey?YnqQSl}jFozAt?1h#_M5F#G@m3z9-a`olm7uhD**-so zyd7?hKttV^tfRgdYizeuDYN?hc1Csjmc)+*9#k-SN-9*Mx@9`(Z>tFj>PCqja1fF9 zL3sYAC?qe$TT85|=?F^fF%Q7@YP&YD>J!m+ z2p0U5(R7CVmTW+h-xA5E2cPAohqE`kdI@qZbLfvCw z@(A3tJMC`!V0*WTG9Z%2R5G_p)>dB(K+Bx)u*UqMvf>EwfP&L9zM6=l&(I~6B9Sbq zzGs1Sl`L~=I2vn$9F^EkImoI75t1gnn3AS+!iXNTppKv($*h|&zY-^Uc1I#V&5dua z{BuZjiP!fsH2wlNhn;!@+sV_Dm%G#S2%L?EvDVP5+NvaQE?%Ed_DeqUJg+Ip zrDrH%0}%a6dRi6$7tpa=A_UaL#BCAQF(t0;5+}czz)KvoptdEmk%YrGi0#uL6N}I$ zc`i=cUUb7;(E@!A0;`Q`t)Uy5F$aN^pdEHE2Lqu|N}+pqW3LM$-?vCDn0;1$(5h^{qiK-AF6a}cejhziH?5c*;&W*G?L;76WM zJovqTwVCfY(Vp_$r2>l3* zNx>W>ER|}bs?UwzXb>;Z2o$!0^%cq%#49lTHW-#*a`lCo56dem^Ag7(8Q@S5$rnRs zoH{aEaCU%tRfvRu3)QZ0iXKwp$!su2X{kOf$Y1^;@u%@Y7d1KQH`ypdqy+Qrgpg^7Rx&WDvm^}X%xS>hq6!v;h!Tk;QhqM^TR_5 z)I4D?P{Sau2J#v>YYk(n97r+X;ptW`rL^qClxtDd2Kh`p3pG6y6YTLBJR74(%@8jq z3auy4#&1vD)DDjQP4M7>-K#VM(anWzglGI?B8 z2+Lj^ifanJwN3m4D$5q&uV3cm7cOu6IF>^G)?g%P=fkyL&pf13o$`?B_fVJ#=zWwX zpWF+9Rw8^!?O&$yQ|1x0R(k8__7uC^E1L7fOE4RuJ}McfUe6n+UTpxi*f?=Dv3fRQOHf!sXh1P zb@j|PhF&cD{(b7c7xM}#^36T=kSoUA%nE%fvEYc^PId-LEZHh}ifjWvDaB43HP`~@ zNI8m2yQk?Ac;7CwOyu59pWgkZyA|KREcx=jg%`@jVX@^$~+VPs-lj(h_EmH6(d1TVth%+ zvWAo(KuHEkazK(fL@@B->)K_kWN zmslg%bJSP0wK-d{xY;BrjNP8|+U_a4W1}K^2^G@$&~v;K$`T>G2{K2xCn2^eWki!O z7&3?jb{NI^p`-#T#ExR<96>EaOQhf-ACq)NnPV2dvG)E!0Qf9BK>qz5?YPID#@ZQ0m_9kUYvsd^ zWS(Pl7kQ4!c&VSEH^jyv)smM6Ek%3KUhy$O&_Ue7^DX3<&^EZtXR$D1HrIJB1KaG-}qX11xKo3Brlg~H>3m7mIc)~-jGJg*$`4;ngZg`ZX^+h4w2b*CXd;89nJmNL{ObR&WPwm zA@KU4=Fd6K@wQVaRtQee;scowIwJlc{t2qEqKPwCP?<6VnSxar$Ml;?Ss& zR!4Lb2 zQhAYbESzAe!H$-ekgKzVSQbX~ed#bxpR?#Hju3U?3PH!)Gt((u5$AANbO9xcuas9; zDl(Cma{X4dij-*SReY|#qOV~StWl<|bWN{Tt844|Gu3tEI%-C}s#nYN&s56g>Yo4! CUp{aE delta 539 zcmZoTz|`=7ae_228v_FaI}n4xL=AIcHU_=2PF^6FnRhD#-()_0-mRMj6`Xk|$MW9e zYRY0{7gtwjY!%+@$`{C}6cFU;>lhTN;O!czppluP$)(8()XBvE4P+cU|8M?pn*|+S z@{6)D0r{+qTtJeU{{sX6AO3IrAArU_;+JD(W@Kb$29Zom%*>pe5H+_L_`mQ!=f4G1 zbB5o5hgp` datetime: + """Calculates the next retry time using exponential backoff.""" + delay = INITIAL_RETRY_DELAY_SECONDS * (2 ** retry_count) + return datetime.now(timezone.utc) + timedelta(seconds=delay) + +async def process_single_jira_request(db: Session, record: JiraAnalysis): + """Processes a single Jira webhook request using the LLM.""" + issue_key = record.issue_key + record_id = record.id + payload = JiraWebhookPayload.model_validate(record.request_payload) + + logger.bind( + issue_key=issue_key, + record_id=record_id, + timestamp=datetime.now(timezone.utc).isoformat() + ).info(f"[{issue_key}] Processing webhook request.") + + # Create Langfuse trace if enabled + trace = None + if settings.langfuse.enabled: + trace = settings.langfuse_client.start_span( + name="Jira Webhook Processing", + input=payload.model_dump(), + metadata={ + "trace_id": f"processor-{issue_key}-{record_id}", + "issue_key": issue_key, + "record_id": record_id, + "timestamp": datetime.now(timezone.utc).isoformat() + } + ) + + llm_input = { + "issueKey": payload.issueKey, + "summary": payload.summary, + "description": payload.description if payload.description else "No description provided.", + "status": payload.status if payload.status else "Unknown", + "labels": ", ".join(payload.labels) if payload.labels else "None", + "assignee": payload.assignee if payload.assignee else "Unassigned", + "updated": payload.updated if payload.updated else "Unknown", + "comment": payload.comment if payload.comment else "No new comment provided." + } + + llm_span = None + if settings.langfuse.enabled and trace: + llm_span = trace.start_span( + name="LLM Processing", + input=llm_input, + metadata={ + "model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model + } + ) + + try: + raw_llm_response = await analysis_chain.ainvoke(llm_input) + + if settings.langfuse.enabled and llm_span: + llm_span.update(output=raw_llm_response) + llm_span.end() + + try: + AnalysisFlags( + hasMultipleEscalations=raw_llm_response.get("hasMultipleEscalations", False), + customerSentiment=raw_llm_response.get("customerSentiment", "neutral") + ) + except Exception as e: + logger.error(f"[{issue_key}] Invalid LLM response structure: {e}", exc_info=True) + update_record_status( + db=db, + record_id=record_id, + analysis_result={"hasMultipleEscalations": False, "customerSentiment": "neutral"}, + raw_response=json.dumps(raw_llm_response), + status="validation_failed", + error_message=f"LLM response validation failed: {e}", + last_processed_at=datetime.now(timezone.utc), + retry_count_increment=1, + next_retry_at=calculate_next_retry_time(record.retry_count + 1) if record.retry_count < MAX_RETRIES else None + ) + if settings.langfuse.enabled and trace: + trace.end(status_message=f"Validation failed: {e}", status="ERROR") + raise ValueError(f"Invalid LLM response format: {e}") from e + + logger.debug(f"[{issue_key}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}") + update_record_status( + db=db, + record_id=record_id, + analysis_result=raw_llm_response, + raw_response=json.dumps(raw_llm_response), + status="completed", + last_processed_at=datetime.now(timezone.utc), + next_retry_at=None # No retry needed on success + ) + if settings.langfuse.enabled and trace: + trace.end(status="SUCCESS") + logger.info(f"[{issue_key}] Successfully processed and updated record {record_id}.") + + except Exception as e: + logger.error(f"[{issue_key}] LLM processing failed for record {record_id}: {str(e)}") + if settings.langfuse.enabled and llm_span: + llm_span.end(status_message=str(e), status="ERROR") + + new_retry_count = record.retry_count + 1 + new_status = "failed" + next_retry = None + if new_retry_count <= MAX_RETRIES: + next_retry = calculate_next_retry_time(new_retry_count) + new_status = "retrying" # Indicate that it will be retried + + update_record_status( + db=db, + record_id=record_id, + status=new_status, + error_message=f"LLM processing failed: {str(e)}", + last_processed_at=datetime.now(timezone.utc), + retry_count_increment=1, + next_retry_at=next_retry + ) + if settings.langfuse.enabled and trace: + trace.end(status_message=str(e), status="ERROR") + logger.error(f"[{issue_key}] Record {record_id} status updated to '{new_status}'. Retry count: {new_retry_count}") + + +async def main_processor_loop(): + """Main loop for the Jira webhook processor.""" + logger.info("Starting Jira webhook processor.") + while True: + db: Session = SessionLocal() + try: + # Fetch records that are 'pending' or 'retrying' and past their next_retry_at + # Order by created_at to process older requests first + pending_records = db.query(JiraAnalysis).filter( + (JiraAnalysis.status == "pending") | + ((JiraAnalysis.status == "retrying") & (JiraAnalysis.next_retry_at <= datetime.now(timezone.utc))) + ).order_by(JiraAnalysis.created_at.asc()).all() + + if not pending_records: + logger.debug(f"No pending or retrying records found. Sleeping for {POLL_INTERVAL_SECONDS} seconds.") + + for record in pending_records: + # Update status to 'processing' immediately to prevent other workers from picking it up + update_record_status(db, record.id, "processing", last_processed_at=datetime.now(timezone.utc)) + db.refresh(record) # Refresh to get the latest state + await process_single_jira_request(db, record) + except Exception as e: + logger.error(f"Error in main processor loop: {str(e)}", exc_info=True) + finally: + db.close() + + time.sleep(POLL_INTERVAL_SECONDS) + +if __name__ == "__main__": + import asyncio + asyncio.run(main_processor_loop()) \ No newline at end of file diff --git a/jira_processor.service b/jira_processor.service new file mode 100644 index 0000000..5f39a5a --- /dev/null +++ b/jira_processor.service @@ -0,0 +1,16 @@ +[Unit] +Description=Jira Webhook Processor Service +After=network.target + +[Service] +User=irek +Group=irek +WorkingDirectory=/home/irek/gitea +ExecStart=/home/irek/gitea/.venv/bin/python jira_processor.py +Restart=always +RestartSec=10 +StandardOutput=journal +StandardError=journal + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/jira_webhook_llm.py b/jira_webhook_llm.py index 1edb34b..1305734 100644 --- a/jira_webhook_llm.py +++ b/jira_webhook_llm.py @@ -17,7 +17,7 @@ import asyncio from functools import wraps, partial from concurrent.futures import ThreadPoolExecutor from database.database import init_db, get_db -from database.crud import create_analysis_record, update_analysis_record +from database.crud import create_analysis_record, update_record_status from llm.models import JiraWebhookPayload from llm.chains import analysis_chain, validate_response from api.handlers import router # Correct variable name @@ -64,10 +64,14 @@ async def lifespan(app: FastAPI): init_db() # Initialize the database # Setup signal handlers - loop = asyncio.get_running_loop() - for sig in (signal.SIGTERM, signal.SIGINT): - loop.add_signal_handler(sig, partial(handle_shutdown_signal, sig, loop)) - logger.info("Signal handlers configured successfully") + # Only set up signal handlers if not in a test environment + if os.getenv("IS_TEST_ENV") != "true": + loop = asyncio.get_running_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, partial(handle_shutdown_signal, sig, loop)) + logger.info("Signal handlers configured successfully") + else: + logger.info("Skipping signal handler configuration in test environment.") # Verify critical components if not hasattr(settings, 'langfuse_handler'): @@ -129,15 +133,6 @@ def create_app(): try: response = await call_next(request) - if response.status_code >= 400: - error_response = ErrorResponse( - error_id=request_id, - timestamp=datetime.now(timezone.utc).isoformat(), - status_code=response.status_code, - message=HTTPStatus(response.status_code).phrase, - details="Endpoint not found or invalid request" - ) - return JSONResponse(status_code=response.status_code, content=error_response.model_dump()) return response except HTTPException as e: logger.error(f"HTTP Error: {e.status_code} - {e.detail}") @@ -179,7 +174,7 @@ async def process_jira_webhook_background(record_id: int, payload: JiraWebhookPa with get_db() as db: try: - update_analysis_record(db, record_id, "processing") + update_record_status(db, record_id, "processing") llm_input = { "issueKey": payload.issueKey, @@ -200,16 +195,16 @@ async def process_jira_webhook_background(record_id: int, payload: JiraWebhookPa "hasMultipleEscalations": False, "customerSentiment": "neutral" } - update_analysis_record(db, record_id, "failed", analysis_result=analysis_result, error_message="Invalid LLM response format") + update_record_status(db, record_id, "failed", analysis_result=analysis_result, error_message="Invalid LLM response format") logger.error(f"LLM processing failed for {payload.issueKey}: Invalid response format") return - update_analysis_record(db, record_id, "completed", analysis_result=analysis_result) + update_record_status(db, record_id, "completed", analysis_result=analysis_result) logger.info(f"Background processing completed for record ID: {record_id}, Issue Key: {payload.issueKey}") except Exception as e: logger.error(f"Error during background processing for record ID {record_id}, Issue Key {payload.issueKey}: {str(e)}") - update_analysis_record(db, record_id, "failed", error_message=str(e)) + update_record_status(db, record_id, "failed", error_message=str(e)) def retry(max_retries: int = 3, delay: float = 1.0): """Decorator for retrying failed operations""" diff --git a/tests/conftest.py b/tests/conftest.py index dc64036..1787c8a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,6 +13,7 @@ def setup_db(monkeypatch): # Use in-memory SQLite for tests test_db_url = "sqlite:///:memory:" monkeypatch.setenv("DATABASE_URL", test_db_url) + monkeypatch.setenv("IS_TEST_ENV", "true") # Monkeypatch the global engine and SessionLocal in the database module engine = create_engine(test_db_url, connect_args={"check_same_thread": False}) @@ -28,7 +29,7 @@ def setup_db(monkeypatch): from database.models import Base as ModelsBase # Renamed to avoid conflict with imported Base - # Create all tables within the same connection + # Create all tables within the same connection and commit ModelsBase.metadata.create_all(bind=connection) # Use the connection here # Verify table creation within setup_db @@ -40,8 +41,8 @@ def setup_db(monkeypatch): yield engine # Yield the engine for test_client to use - # Cleanup: Rollback the transaction and close the connection - transaction.rollback() # Rollback to clean up data + # Cleanup: Rollback the test transaction and close the connection + transaction.rollback() # Rollback test data connection.close() print("--- setup_db fixture finished ---") diff --git a/tests/test_core.py b/tests/test_core.py index 1c54858..aec439c 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -5,20 +5,23 @@ from llm.models import JiraWebhookPayload from database.crud import create_analysis_record, get_analysis_by_id from database.models import JiraAnalysis from database.database import get_db +from database.models import JiraAnalysis +from database.crud import create_analysis_record, update_record_status, get_analysis_by_id from unittest.mock import MagicMock # Import MagicMock +from datetime import datetime, timezone def test_error_handling_middleware(test_client, mock_jira_payload): # Test 404 error handling response = test_client.post("/nonexistent-endpoint", json={}) assert response.status_code == 404 - assert "error_id" in response.json() + assert "detail" in response.json() # FastAPI's default 404 response uses "detail" # Test validation error handling invalid_payload = mock_jira_payload.copy() invalid_payload.pop("issueKey") response = test_client.post("/api/jira-webhook", json=invalid_payload) assert response.status_code == 422 - assert "details" in response.json() + assert "detail" in response.json() # FastAPI's default 422 response uses "detail" def test_webhook_handler(setup_db, test_client, mock_full_jira_payload, monkeypatch): # Mock the LLM analysis chain to avoid external calls @@ -35,10 +38,10 @@ def test_webhook_handler(setup_db, test_client, mock_full_jira_payload, monkeypa # Test successful webhook handling with full payload response = test_client.post("/api/jira-webhook", json=mock_full_jira_payload) - assert response.status_code == 200 + assert response.status_code == 202 response_data = response.json() assert "status" in response_data - assert response_data["status"] in ["success", "skipped"] + assert response_data["status"] in ["success", "skipped", "queued"] if response_data["status"] == "success": assert "analysis_flags" in response_data @@ -83,4 +86,165 @@ async def test_retry_decorator(): raise Exception("Test error") with pytest.raises(Exception): - await failing_function() \ No newline at end of file + await failing_function() + +def test_get_pending_queue_records_endpoint(setup_db, test_client, mock_full_jira_payload): + # Create a pending record + with get_db() as db: + payload_model = JiraWebhookPayload(**mock_full_jira_payload) + pending_record = create_analysis_record(db, payload_model) + db.commit() + db.refresh(pending_record) + + response = test_client.get("/api/queue/pending") + assert response.status_code == 200, f"Expected 200 but got {response.status_code}. Response: {response.text}" + data = response.json()["data"] + assert len(data) == 1 + assert data[0]["issue_key"] == mock_full_jira_payload["issueKey"] + assert data[0]["status"] == "pending" + +def test_get_pending_queue_records_endpoint_empty(setup_db, test_client): + # Ensure no records exist + with get_db() as db: + db.query(JiraAnalysis).delete() + db.commit() + + response = test_client.get("/api/queue/pending") + assert response.status_code == 200 + data = response.json()["data"] + assert len(data) == 0 + +def test_get_pending_queue_records_endpoint_error(test_client, monkeypatch): + def mock_get_pending_analysis_records(db): + raise Exception("Database error") + + monkeypatch.setattr("api.handlers.get_pending_analysis_records", mock_get_pending_analysis_records) + + response = test_client.get("/api/queue/pending") + assert response.status_code == 500, f"Expected 500 but got {response.status_code}. Response: {response.text}" + assert "detail" in response.json() # FastAPI's HTTPException uses "detail" + assert response.json()["detail"] == "Database error: Database error" + +def test_retry_analysis_record_endpoint_success(setup_db, test_client, mock_full_jira_payload): + # Create a failed record + with get_db() as db: + payload_model = JiraWebhookPayload(**mock_full_jira_payload) + failed_record = create_analysis_record(db, payload_model) + update_record_status(db, failed_record.id, "failed", error_message="LLM failed") + db.commit() + db.refresh(failed_record) + + response = test_client.post(f"/api/queue/{failed_record.id}/retry") + assert response.status_code == 200 + assert response.json()["message"] == f"Record {failed_record.id} marked for retry." + + with get_db() as db: + updated_record = get_analysis_by_id(db, failed_record.id) + assert updated_record.status == "pending" + assert updated_record.error_message is None + assert updated_record.analysis_result is None + assert updated_record.raw_response is None + assert updated_record.next_retry_at is None + +def test_retry_analysis_record_endpoint_not_found(test_client): + response = test_client.post("/api/queue/99999/retry") + assert response.status_code == 404 + # Handle both possible error response formats + assert "detail" in response.json() # FastAPI's HTTPException uses "detail" + assert response.json()["detail"] == "Analysis record not found" + +def test_retry_analysis_record_endpoint_invalid_status(setup_db, test_client, mock_full_jira_payload): + # Create a successful record + with get_db() as db: + payload_model = JiraWebhookPayload(**mock_full_jira_payload) + successful_record = create_analysis_record(db, payload_model) + update_record_status(db, successful_record.id, "success") + db.commit() + db.refresh(successful_record) + + response = test_client.post(f"/api/queue/{successful_record.id}/retry") + assert response.status_code == 400 + assert response.json()["detail"] == f"Record status is 'success'. Only 'failed' or 'validation_failed' records can be retried." + +def test_retry_analysis_record_endpoint_db_update_failure(setup_db, test_client, mock_full_jira_payload, monkeypatch): + # Create a failed record + with get_db() as db: + payload_model = JiraWebhookPayload(**mock_full_jira_payload) + failed_record = create_analysis_record(db, payload_model) + update_record_status(db, failed_record.id, "failed", error_message="LLM failed") + db.commit() + db.refresh(failed_record) + + def mock_update_record_status(*args, **kwargs): + return None # Simulate update failure + + monkeypatch.setattr("api.handlers.update_record_status", mock_update_record_status) + + response = test_client.post(f"/api/queue/{failed_record.id}/retry") + assert response.status_code == 500, f"Expected 500 but got {response.status_code}. Response: {response.text}" + assert response.json()["detail"] == "Failed to update record for retry." + +def test_retry_analysis_record_endpoint_retry_count_and_next_retry_at(setup_db, test_client, mock_full_jira_payload): + # Create a failed record with an initial retry count and next_retry_at + with get_db() as db: + payload_model = JiraWebhookPayload(**mock_full_jira_payload) + failed_record = create_analysis_record(db, payload_model) + update_record_status( + db, + failed_record.id, + "failed", + error_message="LLM failed", + retry_count_increment=1, + next_retry_at=datetime.now(timezone.utc) + ) + db.commit() + db.refresh(failed_record) + initial_retry_count = failed_record.retry_count + + response = test_client.post(f"/api/queue/{failed_record.id}/retry") + assert response.status_code == 200 + + with get_db() as db: + updated_record = get_analysis_by_id(db, failed_record.id) + assert updated_record.status == "pending" + assert updated_record.error_message is None + assert updated_record.next_retry_at is None # Should be reset to None + # The retry endpoint itself doesn't increment retry_count, + # it just resets the status. The increment happens during processing. + # So, we assert it remains the same as before the retry request. + assert updated_record.retry_count == initial_retry_count + +@pytest.mark.asyncio +async def test_concurrent_retry_operations(setup_db, test_client, mock_full_jira_payload): + # Create multiple failed records + record_ids = [] + with get_db() as db: + for i in range(5): + payload = mock_full_jira_payload.copy() + payload["issueKey"] = f"TEST-{i}" + payload_model = JiraWebhookPayload(**payload) + failed_record = create_analysis_record(db, payload_model) + update_record_status(db, failed_record.id, "failed", error_message=f"LLM failed {i}") + db.commit() + db.refresh(failed_record) + record_ids.append(failed_record.id) + + # Simulate concurrent retry requests + import asyncio + async def send_retry_request(record_id): + return test_client.post(f"/api/queue/{record_id}/retry") + + tasks = [send_retry_request(rid) for rid in record_ids] + responses = await asyncio.gather(*tasks) + + for response in responses: + assert response.status_code == 200 + assert "message" in response.json() + + # Verify all records are marked as pending + with get_db() as db: + for record_id in record_ids: + updated_record = get_analysis_by_id(db, record_id) + assert updated_record.status == "pending" + assert updated_record.error_message is None + assert updated_record.next_retry_at is None \ No newline at end of file diff --git a/webhooks/handlers.py b/webhooks/handlers.py index b17b7bc..0ccb311 100644 --- a/webhooks/handlers.py +++ b/webhooks/handlers.py @@ -9,9 +9,8 @@ import uuid from config import settings from langfuse import Langfuse -from database.crud import create_analysis_record, get_analysis_record, update_analysis_record -from llm.models import JiraWebhookPayload, AnalysisFlags -from llm.chains import analysis_chain, validate_response +from database.crud import create_analysis_record +from llm.models import JiraWebhookPayload from database.database import get_db_session webhook_router = APIRouter() @@ -33,10 +32,7 @@ class ValidationError(HTTPException): super().__init__(status_code=422, detail=detail) class JiraWebhookHandler: - def __init__(self): - self.analysis_chain = analysis_chain - - async def handle_webhook(self, payload: JiraWebhookPayload, db: Session): + async def process_jira_request(self, payload: JiraWebhookPayload, db: Session): try: if not payload.issueKey: raise BadRequestError("Missing required field: issueKey") @@ -44,139 +40,32 @@ class JiraWebhookHandler: if not payload.summary: raise BadRequestError("Missing required field: summary") - # Check for existing analysis record - existing_record = get_analysis_record(db, payload.issueKey) - if existing_record: - logger.info(f"Existing analysis record found for {payload.issueKey}. Skipping new analysis.") - return {"status": "skipped", "analysis_flags": existing_record.analysis_result} - # Create new analysis record with initial state new_record = create_analysis_record(db=db, payload=payload) - update_analysis_record(db=db, record_id=new_record.id, status="processing") logger.bind( issue_key=payload.issueKey, record_id=new_record.id, timestamp=datetime.now(timezone.utc).isoformat() - ).info(f"[{payload.issueKey}] Received webhook") + ).info(f"[{payload.issueKey}] Received webhook and queued for processing.") - # Create Langfuse trace if enabled - trace = None - if settings.langfuse.enabled: - trace = settings.langfuse_client.start_span( # Use start_span - name="Jira Webhook", - input=payload.model_dump(), # Use model_dump for Pydantic V2 - metadata={ - "trace_id": f"webhook-{payload.issueKey}", - "issue_key": payload.issueKey, - "timestamp": datetime.now(timezone.utc).isoformat() - } - ) - - llm_input = { - "issueKey": payload.issueKey, - "summary": payload.summary, - "description": payload.description if payload.description else "No description provided.", - "status": payload.status if payload.status else "Unknown", - "labels": ", ".join(payload.labels) if payload.labels else "None", - "assignee": payload.assignee if payload.assignee else "Unassigned", - "updated": payload.updated if payload.updated else "Unknown", - "comment": payload.comment if payload.comment else "No new comment provided." - } - - # Create Langfuse span for LLM processing if enabled - llm_span = None - if settings.langfuse.enabled and trace: - llm_span = trace.start_span( - name="LLM Processing", - input=llm_input, - metadata={ - "model": settings.llm.model if settings.llm.mode == 'openai' else settings.llm.ollama_model - } - ) - - try: - raw_llm_response = await self.analysis_chain.ainvoke(llm_input) - - # Update Langfuse span with output if enabled - if settings.langfuse.enabled and llm_span: - llm_span.update(output=raw_llm_response) - llm_span.end() - - # Validate LLM response - try: - # Validate using Pydantic model, extracting only relevant fields - AnalysisFlags( - hasMultipleEscalations=raw_llm_response.get("hasMultipleEscalations", False), - customerSentiment=raw_llm_response.get("customerSentiment", "neutral") - ) - except Exception as e: - logger.error(f"[{payload.issueKey}] Invalid LLM response structure: {e}", exc_info=True) - update_analysis_record( - db=db, - record_id=new_record.id, - analysis_result={"hasMultipleEscalations": False, "customerSentiment": "neutral"}, - raw_response=json.dumps(raw_llm_response), # Store as JSON string - status="validation_failed" - ) - raise ValueError(f"Invalid LLM response format: {e}") from e - - logger.debug(f"[{payload.issueKey}] LLM Analysis Result: {json.dumps(raw_llm_response, indent=2)}") - # Update record with final results - update_analysis_record( - db=db, - record_id=new_record.id, - analysis_result=raw_llm_response, - raw_response=str(raw_llm_response), # Store validated result as raw - status="completed" - ) - return {"status": "success", "analysis_flags": raw_llm_response} - - except Exception as e: - logger.error(f"[{payload.issueKey}] LLM processing failed: {str(e)}") - - # Log error to Langfuse if enabled - if settings.langfuse.enabled and llm_span: - llm_span.end(status_message=str(e), status="ERROR") - - update_analysis_record( - db=db, - record_id=new_record.id, - status="failed", - error_message=f"LLM processing failed: {str(e)}" - ) - error_id = str(uuid.uuid4()) - logger.error(f"[{payload.issueKey}] Error ID: {error_id}") - return { - "status": "error", - "error_id": error_id, - "analysis_flags": { - "hasMultipleEscalations": False, - "customerSentiment": "neutral" - }, - "error": str(e) - } + return {"status": "queued", "record_id": new_record.id} except Exception as e: issue_key = payload.issueKey if payload.issueKey else "N/A" - logger.error(f"[{issue_key}] Error processing webhook: {str(e)}") + logger.error(f"[{issue_key}] Error receiving webhook: {str(e)}") import traceback logger.error(f"[{issue_key}] Stack trace: {traceback.format_exc()}") - - # Log error to Langfuse if enabled - if settings.langfuse.enabled and trace: - trace.end(status_message=str(e), status="ERROR") - raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") # Initialize handler webhook_handler = JiraWebhookHandler() -@webhook_router.post("/api/jira-webhook") -async def jira_webhook_endpoint(payload: JiraWebhookPayload, db: Session = Depends(get_db_session)): - """Jira webhook endpoint""" +@webhook_router.post("/api/jira-webhook", status_code=202) +async def receive_jira_request(payload: JiraWebhookPayload, db: Session = Depends(get_db_session)): + """Jira webhook endpoint - receives and queues requests for processing""" try: - result = await webhook_handler.handle_webhook(payload, db) + result = await webhook_handler.process_jira_request(payload, db) return result except ValidationError as e: raise