fix: Move callback execution out of the async session (#3360)
This commit is contained in:
@@ -67,8 +67,20 @@ class JobManager:
|
||||
@trace_method
|
||||
def update_job_by_id(self, job_id: str, job_update: JobUpdate, actor: PydanticUser) -> PydanticJob:
|
||||
"""Update a job by its ID with the given JobUpdate object."""
|
||||
# First check if we need to dispatch a callback
|
||||
needs_callback = False
|
||||
callback_url = None
|
||||
with db_registry.session() as session:
|
||||
job = self._verify_job_access(session=session, job_id=job_id, actor=actor, access=["write"])
|
||||
not_completed_before = not bool(job.completed_at)
|
||||
|
||||
# Check if we'll need to dispatch callback
|
||||
if job_update.status in {JobStatus.completed, JobStatus.failed} and not_completed_before and job.callback_url:
|
||||
needs_callback = True
|
||||
callback_url = job.callback_url
|
||||
|
||||
# Update the job first to get the final metadata
|
||||
with db_registry.session() as session:
|
||||
# Fetch the job by ID
|
||||
job = self._verify_job_access(session=session, job_id=job_id, actor=actor, access=["write"])
|
||||
not_completed_before = not bool(job.completed_at)
|
||||
|
||||
@@ -84,20 +96,52 @@ class JobManager:
|
||||
|
||||
if job_update.status in {JobStatus.completed, JobStatus.failed} and not_completed_before:
|
||||
job.completed_at = get_utc_time().replace(tzinfo=None)
|
||||
if job.callback_url:
|
||||
self._dispatch_callback(job)
|
||||
|
||||
# Save the updated job to the database
|
||||
job.update(db_session=session, actor=actor)
|
||||
# Save the updated job to the database first
|
||||
job = job.update(db_session=session, actor=actor)
|
||||
|
||||
return job.to_pydantic()
|
||||
# Get the updated metadata for callback
|
||||
final_metadata = job.metadata_
|
||||
result = job.to_pydantic()
|
||||
|
||||
# Dispatch callback outside of database session if needed
|
||||
if needs_callback:
|
||||
callback_info = {
|
||||
"job_id": job_id,
|
||||
"callback_url": callback_url,
|
||||
"status": job_update.status,
|
||||
"completed_at": get_utc_time().replace(tzinfo=None),
|
||||
"metadata": final_metadata,
|
||||
}
|
||||
callback_result = self._dispatch_callback_sync(callback_info)
|
||||
|
||||
# Update callback status in a separate transaction
|
||||
with db_registry.session() as session:
|
||||
job = self._verify_job_access(session=session, job_id=job_id, actor=actor, access=["write"])
|
||||
job.callback_sent_at = callback_result["callback_sent_at"]
|
||||
job.callback_status_code = callback_result.get("callback_status_code")
|
||||
job.callback_error = callback_result.get("callback_error")
|
||||
job.update(db_session=session, actor=actor)
|
||||
result = job.to_pydantic()
|
||||
|
||||
return result
|
||||
|
||||
@enforce_types
|
||||
@trace_method
|
||||
async def update_job_by_id_async(self, job_id: str, job_update: JobUpdate, actor: PydanticUser) -> PydanticJob:
|
||||
"""Update a job by its ID with the given JobUpdate object asynchronously."""
|
||||
callback_func = None
|
||||
# First check if we need to dispatch a callback
|
||||
needs_callback = False
|
||||
callback_url = None
|
||||
async with db_registry.async_session() as session:
|
||||
job = await self._verify_job_access_async(session=session, job_id=job_id, actor=actor, access=["write"])
|
||||
|
||||
# Check if we'll need to dispatch callback
|
||||
if job_update.status in {JobStatus.completed, JobStatus.failed} and job.callback_url:
|
||||
needs_callback = True
|
||||
callback_url = job.callback_url
|
||||
|
||||
# Update the job first to get the final metadata
|
||||
async with db_registry.async_session() as session:
|
||||
# Fetch the job by ID
|
||||
job = await self._verify_job_access_async(session=session, job_id=job_id, actor=actor, access=["write"])
|
||||
@@ -116,25 +160,35 @@ class JobManager:
|
||||
if job_update.status in {JobStatus.completed, JobStatus.failed}:
|
||||
logger.info(f"Current job completed at: {job.completed_at}")
|
||||
job.completed_at = get_utc_time().replace(tzinfo=None)
|
||||
if job.callback_url:
|
||||
callback_func = self._dispatch_callback_async(
|
||||
callback_url=job.callback_url,
|
||||
payload={
|
||||
"job_id": job.id,
|
||||
"status": job.status,
|
||||
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
|
||||
"metadata": job.metadata_,
|
||||
},
|
||||
actor=actor,
|
||||
)
|
||||
|
||||
# Save the updated job to the database
|
||||
await job.update_async(db_session=session, actor=actor)
|
||||
# Save the updated job to the database first
|
||||
job = await job.update_async(db_session=session, actor=actor)
|
||||
|
||||
if callback_func:
|
||||
return await callback_func
|
||||
# Get the updated metadata for callback
|
||||
final_metadata = job.metadata_
|
||||
result = job.to_pydantic()
|
||||
|
||||
return job.to_pydantic()
|
||||
# Dispatch callback outside of database session if needed
|
||||
if needs_callback:
|
||||
callback_info = {
|
||||
"job_id": job_id,
|
||||
"callback_url": callback_url,
|
||||
"status": job_update.status,
|
||||
"completed_at": get_utc_time().replace(tzinfo=None),
|
||||
"metadata": final_metadata,
|
||||
}
|
||||
callback_result = await self._dispatch_callback_async(callback_info)
|
||||
|
||||
# Update callback status in a separate transaction
|
||||
async with db_registry.async_session() as session:
|
||||
job = await self._verify_job_access_async(session=session, job_id=job_id, actor=actor, access=["write"])
|
||||
job.callback_sent_at = callback_result["callback_sent_at"]
|
||||
job.callback_status_code = callback_result.get("callback_status_code")
|
||||
job.callback_error = callback_result.get("callback_error")
|
||||
await job.update_async(db_session=session, actor=actor)
|
||||
result = job.to_pydantic()
|
||||
|
||||
return result
|
||||
|
||||
@enforce_types
|
||||
@trace_method
|
||||
@@ -617,7 +671,7 @@ class JobManager:
|
||||
session: Session,
|
||||
job_id: str,
|
||||
actor: PydanticUser,
|
||||
access: List[Literal["read", "write", "delete"]] = ["read"],
|
||||
access: List[Literal["read", "write", "admin"]] = ["read"],
|
||||
) -> JobModel:
|
||||
"""
|
||||
Verify that a job exists and the user has the required access.
|
||||
@@ -685,61 +739,58 @@ class JobManager:
|
||||
return request_config
|
||||
|
||||
@trace_method
|
||||
def _dispatch_callback(self, job: JobModel) -> None:
|
||||
def _dispatch_callback_sync(self, callback_info: dict) -> dict:
|
||||
"""
|
||||
POST a standard JSON payload to job.callback_url
|
||||
and record timestamp + HTTP status.
|
||||
POST a standard JSON payload to callback_url and return callback status.
|
||||
"""
|
||||
|
||||
payload = {
|
||||
"job_id": job.id,
|
||||
"status": job.status,
|
||||
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
|
||||
"metadata": job.metadata_,
|
||||
"job_id": callback_info["job_id"],
|
||||
"status": callback_info["status"],
|
||||
"completed_at": callback_info["completed_at"].isoformat() if callback_info["completed_at"] else None,
|
||||
"metadata": callback_info["metadata"],
|
||||
}
|
||||
|
||||
callback_sent_at = get_utc_time().replace(tzinfo=None)
|
||||
result = {"callback_sent_at": callback_sent_at}
|
||||
|
||||
try:
|
||||
log_event("POST callback dispatched", payload)
|
||||
resp = post(job.callback_url, json=payload, timeout=5.0)
|
||||
resp = post(callback_info["callback_url"], json=payload, timeout=5.0)
|
||||
log_event("POST callback finished")
|
||||
job.callback_sent_at = get_utc_time().replace(tzinfo=None)
|
||||
job.callback_status_code = resp.status_code
|
||||
|
||||
result["callback_status_code"] = resp.status_code
|
||||
except Exception as e:
|
||||
error_message = f"Failed to dispatch callback for job {job.id} to {job.callback_url}: {e!s}"
|
||||
error_message = f"Failed to dispatch callback for job {callback_info['job_id']} to {callback_info['callback_url']}: {e!s}"
|
||||
logger.error(error_message)
|
||||
# Record the failed attempt
|
||||
job.callback_sent_at = get_utc_time().replace(tzinfo=None)
|
||||
job.callback_error = error_message
|
||||
result["callback_error"] = error_message
|
||||
# Continue silently - callback failures should not affect job completion
|
||||
|
||||
return result
|
||||
|
||||
@trace_method
|
||||
async def _dispatch_callback_async(self, callback_url: str, payload: dict, actor: PydanticUser) -> PydanticJob:
|
||||
async def _dispatch_callback_async(self, callback_info: dict) -> dict:
|
||||
"""
|
||||
POST a standard JSON payload to job.callback_url and record timestamp + HTTP status asynchronously.
|
||||
POST a standard JSON payload to callback_url and return callback status asynchronously.
|
||||
"""
|
||||
job_id = payload["job_id"]
|
||||
callback_sent_at, callback_status_code, callback_error = None, None, None
|
||||
payload = {
|
||||
"job_id": callback_info["job_id"],
|
||||
"status": callback_info["status"],
|
||||
"completed_at": callback_info["completed_at"].isoformat() if callback_info["completed_at"] else None,
|
||||
"metadata": callback_info["metadata"],
|
||||
}
|
||||
|
||||
callback_sent_at = get_utc_time().replace(tzinfo=None)
|
||||
result = {"callback_sent_at": callback_sent_at}
|
||||
|
||||
try:
|
||||
async with AsyncClient() as client:
|
||||
log_event("POST callback dispatched", payload)
|
||||
resp = await client.post(callback_url, json=payload, timeout=5.0)
|
||||
resp = await client.post(callback_info["callback_url"], json=payload, timeout=5.0)
|
||||
log_event("POST callback finished")
|
||||
# Ensure timestamp is timezone-naive for DB compatibility
|
||||
callback_sent_at = get_utc_time().replace(tzinfo=None)
|
||||
callback_status_code = resp.status_code
|
||||
result["callback_status_code"] = resp.status_code
|
||||
except Exception as e:
|
||||
error_message = f"Failed to dispatch callback for job {job_id} to {callback_url}: {e!s}"
|
||||
error_message = f"Failed to dispatch callback for job {callback_info['job_id']} to {callback_info['callback_url']}: {e!s}"
|
||||
logger.error(error_message)
|
||||
# Record the failed attempt
|
||||
callback_sent_at = get_utc_time().replace(tzinfo=None)
|
||||
callback_error = error_message
|
||||
result["callback_error"] = error_message
|
||||
# Continue silently - callback failures should not affect job completion
|
||||
|
||||
async with db_registry.async_session() as session:
|
||||
job = await JobModel.read_async(db_session=session, identifier=job_id, actor=actor, access_type=AccessType.USER)
|
||||
job.callback_sent_at = callback_sent_at
|
||||
job.callback_status_code = callback_status_code
|
||||
job.callback_error = callback_error
|
||||
await job.update_async(db_session=session, actor=actor)
|
||||
return job.to_pydantic()
|
||||
return result
|
||||
|
||||
Reference in New Issue
Block a user