From c05f3cec0b35323608a69ca47a078a2e4d1d3168 Mon Sep 17 00:00:00 2001 From: cthomas Date: Mon, 12 Jan 2026 13:36:28 -0800 Subject: [PATCH] fix: wrap markitdown PDF processing in asyncio.to_thread (#8614) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit MarkItDown.convert() does blocking file I/O and CPU-intensive PDF parsing. This was blocking the event loop during file uploads. Now wraps the entire markitdown pipeline (tempfile write, convert, cleanup) in asyncio.to_thread() to run in thread pool. 🐾 Generated with [Letta Code](https://letta.com) Co-authored-by: Letta --- .../parser/markitdown_parser.py | 54 +++++++++++-------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/letta/services/file_processor/parser/markitdown_parser.py b/letta/services/file_processor/parser/markitdown_parser.py index a8cfb7bb..02595b9d 100644 --- a/letta/services/file_processor/parser/markitdown_parser.py +++ b/letta/services/file_processor/parser/markitdown_parser.py @@ -28,6 +28,8 @@ class MarkitdownFileParser(FileParser): @trace_method async def extract_text(self, content: bytes, mime_type: str) -> OCRResponse: """Extract text using markitdown.""" + import asyncio + try: # Handle simple text files directly if is_simple_text_mime_type(mime_type): @@ -49,31 +51,37 @@ class MarkitdownFileParser(FileParser): logger.info(f"Extracting text using markitdown: {self.model}") - # Create temporary file to pass to markitdown - with tempfile.NamedTemporaryFile(delete=False, suffix=self._get_file_extension(mime_type)) as temp_file: - temp_file.write(content) - temp_file_path = temp_file.name + # Run CPU/IO-intensive markitdown processing in thread pool to avoid blocking event loop + def blocking_markitdown_convert(): + # Create temporary file to pass to markitdown + with tempfile.NamedTemporaryFile(delete=False, suffix=self._get_file_extension(mime_type)) as temp_file: + temp_file.write(content) + temp_file_path = temp_file.name - try: - md = MarkItDown(enable_plugins=False) - result = md.convert(temp_file_path) + try: + md = MarkItDown(enable_plugins=False) + result = md.convert(temp_file_path) + return result.text_content + finally: + # Clean up temporary file + os.unlink(temp_file_path) - return OCRResponse( - model=self.model, - pages=[ - OCRPageObject( - index=0, - markdown=result.text_content, - images=[], - dimensions=None, - ) - ], - usage_info=OCRUsageInfo(pages_processed=1), - document_annotation=None, - ) - finally: - # Clean up temporary file - os.unlink(temp_file_path) + # Run blocking operations in thread pool + text_content = await asyncio.to_thread(blocking_markitdown_convert) + + return OCRResponse( + model=self.model, + pages=[ + OCRPageObject( + index=0, + markdown=text_content, + images=[], + dimensions=None, + ) + ], + usage_info=OCRUsageInfo(pages_processed=1), + document_annotation=None, + ) except Exception as e: logger.error(f"Markitdown text extraction failed: {str(e)}")