fix: wrap markitdown PDF processing in asyncio.to_thread (#8614)
MarkItDown.convert() does blocking file I/O and CPU-intensive PDF parsing. This was blocking the event loop during file uploads. Now wraps the entire markitdown pipeline (tempfile write, convert, cleanup) in asyncio.to_thread() to run in thread pool. 🐾 Generated with [Letta Code](https://letta.com) Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
@@ -28,6 +28,8 @@ class MarkitdownFileParser(FileParser):
|
||||
@trace_method
|
||||
async def extract_text(self, content: bytes, mime_type: str) -> OCRResponse:
|
||||
"""Extract text using markitdown."""
|
||||
import asyncio
|
||||
|
||||
try:
|
||||
# Handle simple text files directly
|
||||
if is_simple_text_mime_type(mime_type):
|
||||
@@ -49,31 +51,37 @@ class MarkitdownFileParser(FileParser):
|
||||
|
||||
logger.info(f"Extracting text using markitdown: {self.model}")
|
||||
|
||||
# Create temporary file to pass to markitdown
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=self._get_file_extension(mime_type)) as temp_file:
|
||||
temp_file.write(content)
|
||||
temp_file_path = temp_file.name
|
||||
# Run CPU/IO-intensive markitdown processing in thread pool to avoid blocking event loop
|
||||
def blocking_markitdown_convert():
|
||||
# Create temporary file to pass to markitdown
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=self._get_file_extension(mime_type)) as temp_file:
|
||||
temp_file.write(content)
|
||||
temp_file_path = temp_file.name
|
||||
|
||||
try:
|
||||
md = MarkItDown(enable_plugins=False)
|
||||
result = md.convert(temp_file_path)
|
||||
try:
|
||||
md = MarkItDown(enable_plugins=False)
|
||||
result = md.convert(temp_file_path)
|
||||
return result.text_content
|
||||
finally:
|
||||
# Clean up temporary file
|
||||
os.unlink(temp_file_path)
|
||||
|
||||
return OCRResponse(
|
||||
model=self.model,
|
||||
pages=[
|
||||
OCRPageObject(
|
||||
index=0,
|
||||
markdown=result.text_content,
|
||||
images=[],
|
||||
dimensions=None,
|
||||
)
|
||||
],
|
||||
usage_info=OCRUsageInfo(pages_processed=1),
|
||||
document_annotation=None,
|
||||
)
|
||||
finally:
|
||||
# Clean up temporary file
|
||||
os.unlink(temp_file_path)
|
||||
# Run blocking operations in thread pool
|
||||
text_content = await asyncio.to_thread(blocking_markitdown_convert)
|
||||
|
||||
return OCRResponse(
|
||||
model=self.model,
|
||||
pages=[
|
||||
OCRPageObject(
|
||||
index=0,
|
||||
markdown=text_content,
|
||||
images=[],
|
||||
dimensions=None,
|
||||
)
|
||||
],
|
||||
usage_info=OCRUsageInfo(pages_processed=1),
|
||||
document_annotation=None,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Markitdown text extraction failed: {str(e)}")
|
||||
|
||||
Reference in New Issue
Block a user