172 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			172 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
| import io
 | |
| import os
 | |
| import json
 | |
| import sys
 | |
| import base64
 | |
| import logging
 | |
| import time
 | |
| from abc import ABC
 | |
| from typing import List, IO
 | |
| 
 | |
| from docx import ImagePart
 | |
| from docx.oxml import CT_P, CT_Tbl
 | |
| from docx.table import Table
 | |
| from docx.text.paragraph import Paragraph
 | |
| from docx import Document
 | |
| from PIL import Image
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| class DocxLoader(ABC):
 | |
|     def __init__(
 | |
|             self,
 | |
|             file_content: IO[bytes],
 | |
|             extract_images: bool = True,
 | |
|             extract_tables: bool = True,
 | |
|     ):
 | |
|         self.file_content = file_content
 | |
|         self.extract_images = extract_images
 | |
|         self.extract_tables = extract_tables
 | |
| 
 | |
|     def load(self) -> List[dict]:
 | |
|         result = []
 | |
|         doc = Document(self.file_content)
 | |
|         it = iter(doc.element.body)
 | |
|         text = ""
 | |
| 
 | |
|         for part in it:
 | |
|             blocks = self.parse_part(part, doc)
 | |
|             if blocks is None or len(blocks) == 0:
 | |
|                 continue
 | |
|             for block in blocks:
 | |
|                 if self.extract_images and isinstance(block, list):
 | |
|                     for b in block:
 | |
|                         image = io.BytesIO()
 | |
|                         try:
 | |
|                             Image.open(io.BytesIO(b.image.blob)).save(image, format="png")
 | |
|                         except Exception as e:
 | |
|                             logging.error(f"load image failed, time={time.asctime()}, err:{e}")
 | |
|                             raise RuntimeError("ExtractImageError")
 | |
| 
 | |
|                         if len(text) > 0:
 | |
|                             result.append(
 | |
|                                 {
 | |
|                                     "content": text,
 | |
|                                     "type": "text",
 | |
|                                 }
 | |
|                             )
 | |
|                             text = ""
 | |
| 
 | |
|                         result.append(
 | |
|                             {
 | |
|                                 "content": base64.b64encode(image.getvalue()).decode('utf-8'),
 | |
|                                 "type": "image",
 | |
|                             }
 | |
|                         )
 | |
| 
 | |
|                 if isinstance(block, Paragraph):
 | |
|                     text += block.text
 | |
| 
 | |
|                 if self.extract_tables and isinstance(block, Table):
 | |
|                     rows = block.rows
 | |
|                     if len(text) > 0:
 | |
|                         result.append(
 | |
|                             {
 | |
|                                 "content": text,
 | |
|                                 "type": "text",
 | |
|                             }
 | |
|                         )
 | |
|                         text = ""
 | |
|                     table = self.convert_table(rows)
 | |
|                     result.append(
 | |
|                         {
 | |
|                             "table": table,
 | |
|                             "type": "table",
 | |
|                         }
 | |
|                     )
 | |
|             if text:
 | |
|                 text += "\n\n"
 | |
|         if len(text) > 0:
 | |
|             result.append(
 | |
|                 {
 | |
|                     "content": text,
 | |
|                     "type": "text",
 | |
|                 }
 | |
|             )
 | |
| 
 | |
|         return result
 | |
| 
 | |
|     def parse_part(self, block, doc: Document):
 | |
|         if isinstance(block, CT_P):
 | |
|             blocks = []
 | |
|             para = Paragraph(block, doc)
 | |
|             image_part = self.get_image_part(para, doc)
 | |
|             if image_part and para.text:
 | |
|                 blocks.extend(self.parse_run(para))
 | |
|             elif image_part:
 | |
|                 blocks.append(image_part)
 | |
|             elif para.text:
 | |
|                 blocks.append(para)
 | |
|             return blocks
 | |
|         elif isinstance(block, CT_Tbl):
 | |
|             return [Table(block, doc)]
 | |
| 
 | |
|     def parse_run(self, para: Paragraph):
 | |
|         runs = para.runs
 | |
|         paras = []
 | |
|         if runs is None or len(runs) == 0:
 | |
|             return paras
 | |
|         for run in runs:
 | |
|             if run is None or run.element is None:
 | |
|                 continue
 | |
|             p = Paragraph(run.element, para)
 | |
|             image_part = self.get_image_part(p, para)
 | |
|             if image_part:
 | |
|                 paras.append(image_part)
 | |
|             else:
 | |
|                 paras.append(p)
 | |
|         return paras
 | |
| 
 | |
|     @staticmethod
 | |
|     def get_image_part(graph: Paragraph, doc: Document):
 | |
|         images = graph._element.xpath(".//pic:pic")
 | |
|         image_parts = []
 | |
|         for image in images:
 | |
|             for img_id in image.xpath(".//a:blip/@r:embed"):
 | |
|                 part = doc.part.related_parts[img_id]
 | |
|                 if isinstance(part, ImagePart):
 | |
|                     image_parts.append(part)
 | |
|         return image_parts
 | |
| 
 | |
|     @staticmethod
 | |
|     def convert_table(rows) -> List[List[str]]:
 | |
|         resp_rows = []
 | |
|         for i, row in enumerate(rows):
 | |
|             resp_row = []
 | |
|             for j, cell in enumerate(row.cells):
 | |
|                 resp_row.append(cell.text if cell is not None else '')
 | |
|             resp_rows.append(resp_row)
 | |
| 
 | |
|         return resp_rows
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     w = os.fdopen(3, "wb", )
 | |
|     r = os.fdopen(4, "rb", )
 | |
| 
 | |
|     try:
 | |
|         req = json.load(r)
 | |
|         ei, et = req['extract_images'], req['extract_tables']
 | |
|         loader = DocxLoader(file_content=io.BytesIO(sys.stdin.buffer.read()), extract_images=ei, extract_tables=et)
 | |
|         resp = loader.load()
 | |
|         print(f"Extracted {len(resp)} items")
 | |
|         result = json.dumps({"content": resp}, ensure_ascii=False)
 | |
|         w.write(str.encode(result))
 | |
|         w.flush()
 | |
|         w.close()
 | |
|         print("Docx parse done")
 | |
|     except Exception as e:
 | |
|         print("Docx parse error", e)
 | |
|         w.write(str.encode(json.dumps({"error": str(e)})))
 | |
|         w.flush()
 | |
|         w.close() |