feat: make dispatch callback maintain own db connection (#3279)
This commit is contained in:
@@ -96,6 +96,8 @@ class JobManager:
|
||||
@trace_method
|
||||
async def update_job_by_id_async(self, job_id: str, job_update: JobUpdate, actor: PydanticUser) -> PydanticJob:
|
||||
"""Update a job by its ID with the given JobUpdate object asynchronously."""
|
||||
callback_func = None
|
||||
|
||||
async with db_registry.async_session() as session:
|
||||
# Fetch the job by ID
|
||||
job = await self._verify_job_access_async(session=session, job_id=job_id, actor=actor, access=["write"])
|
||||
@@ -115,11 +117,23 @@ class JobManager:
|
||||
logger.info(f"Current job completed at: {job.completed_at}")
|
||||
job.completed_at = get_utc_time().replace(tzinfo=None)
|
||||
if job.callback_url:
|
||||
await self._dispatch_callback_async(job)
|
||||
callback_func = self._dispatch_callback_async(
|
||||
callback_url=job.callback_url,
|
||||
payload={
|
||||
"job_id": job.id,
|
||||
"status": job.status,
|
||||
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
|
||||
"metadata": job.metadata_,
|
||||
},
|
||||
actor=actor,
|
||||
)
|
||||
|
||||
# Save the updated job to the database
|
||||
await job.update_async(db_session=session, actor=actor)
|
||||
|
||||
if callback_func:
|
||||
return await callback_func
|
||||
|
||||
return job.to_pydantic()
|
||||
|
||||
@enforce_types
|
||||
@@ -699,29 +713,33 @@ class JobManager:
|
||||
# Continue silently - callback failures should not affect job completion
|
||||
|
||||
@trace_method
|
||||
async def _dispatch_callback_async(self, job: JobModel) -> None:
|
||||
async def _dispatch_callback_async(self, callback_url: str, payload: dict, actor: PydanticUser) -> PydanticJob:
|
||||
"""
|
||||
POST a standard JSON payload to job.callback_url and record timestamp + HTTP status asynchronously.
|
||||
"""
|
||||
payload = {
|
||||
"job_id": job.id,
|
||||
"status": job.status,
|
||||
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
|
||||
"metadata": job.metadata_,
|
||||
}
|
||||
job_id = payload["job_id"]
|
||||
callback_sent_at, callback_status_code, callback_error = None, None, None
|
||||
|
||||
try:
|
||||
async with AsyncClient() as client:
|
||||
log_event("POST callback dispatched", payload)
|
||||
resp = await client.post(job.callback_url, json=payload, timeout=5.0)
|
||||
resp = await client.post(callback_url, json=payload, timeout=5.0)
|
||||
log_event("POST callback finished")
|
||||
# Ensure timestamp is timezone-naive for DB compatibility
|
||||
job.callback_sent_at = get_utc_time().replace(tzinfo=None)
|
||||
job.callback_status_code = resp.status_code
|
||||
callback_sent_at = get_utc_time().replace(tzinfo=None)
|
||||
callback_status_code = resp.status_code
|
||||
except Exception as e:
|
||||
error_message = f"Failed to dispatch callback for job {job.id} to {job.callback_url}: {e!s}"
|
||||
error_message = f"Failed to dispatch callback for job {job_id} to {callback_url}: {e!s}"
|
||||
logger.error(error_message)
|
||||
# Record the failed attempt
|
||||
job.callback_sent_at = get_utc_time().replace(tzinfo=None)
|
||||
job.callback_error = error_message
|
||||
callback_sent_at = get_utc_time().replace(tzinfo=None)
|
||||
callback_error = error_message
|
||||
# Continue silently - callback failures should not affect job completion
|
||||
|
||||
async with db_registry.async_session() as session:
|
||||
job = await JobModel.read_async(db_session=session, identifier=job_id, actor=actor, access_type=AccessType.USER)
|
||||
job.callback_sent_at = callback_sent_at
|
||||
job.callback_status_code = callback_status_code
|
||||
job.callback_error = callback_error
|
||||
await job.update_async(db_session=session, actor=actor)
|
||||
return job.to_pydantic()
|
||||
|
||||
Reference in New Issue
Block a user