172 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			172 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
import io
 | 
						|
import os
 | 
						|
import json
 | 
						|
import sys
 | 
						|
import base64
 | 
						|
import logging
 | 
						|
import time
 | 
						|
from abc import ABC
 | 
						|
from typing import List, IO
 | 
						|
 | 
						|
from docx import ImagePart
 | 
						|
from docx.oxml import CT_P, CT_Tbl
 | 
						|
from docx.table import Table
 | 
						|
from docx.text.paragraph import Paragraph
 | 
						|
from docx import Document
 | 
						|
from PIL import Image
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
class DocxLoader(ABC):
 | 
						|
    def __init__(
 | 
						|
            self,
 | 
						|
            file_content: IO[bytes],
 | 
						|
            extract_images: bool = True,
 | 
						|
            extract_tables: bool = True,
 | 
						|
    ):
 | 
						|
        self.file_content = file_content
 | 
						|
        self.extract_images = extract_images
 | 
						|
        self.extract_tables = extract_tables
 | 
						|
 | 
						|
    def load(self) -> List[dict]:
 | 
						|
        result = []
 | 
						|
        doc = Document(self.file_content)
 | 
						|
        it = iter(doc.element.body)
 | 
						|
        text = ""
 | 
						|
 | 
						|
        for part in it:
 | 
						|
            blocks = self.parse_part(part, doc)
 | 
						|
            if blocks is None or len(blocks) == 0:
 | 
						|
                continue
 | 
						|
            for block in blocks:
 | 
						|
                if self.extract_images and isinstance(block, list):
 | 
						|
                    for b in block:
 | 
						|
                        image = io.BytesIO()
 | 
						|
                        try:
 | 
						|
                            Image.open(io.BytesIO(b.image.blob)).save(image, format="png")
 | 
						|
                        except Exception as e:
 | 
						|
                            logging.error(f"load image failed, time={time.asctime()}, err:{e}")
 | 
						|
                            raise RuntimeError("ExtractImageError")
 | 
						|
 | 
						|
                        if len(text) > 0:
 | 
						|
                            result.append(
 | 
						|
                                {
 | 
						|
                                    "content": text,
 | 
						|
                                    "type": "text",
 | 
						|
                                }
 | 
						|
                            )
 | 
						|
                            text = ""
 | 
						|
 | 
						|
                        result.append(
 | 
						|
                            {
 | 
						|
                                "content": base64.b64encode(image.getvalue()).decode('utf-8'),
 | 
						|
                                "type": "image",
 | 
						|
                            }
 | 
						|
                        )
 | 
						|
 | 
						|
                if isinstance(block, Paragraph):
 | 
						|
                    text += block.text
 | 
						|
 | 
						|
                if self.extract_tables and isinstance(block, Table):
 | 
						|
                    rows = block.rows
 | 
						|
                    if len(text) > 0:
 | 
						|
                        result.append(
 | 
						|
                            {
 | 
						|
                                "content": text,
 | 
						|
                                "type": "text",
 | 
						|
                            }
 | 
						|
                        )
 | 
						|
                        text = ""
 | 
						|
                    table = self.convert_table(rows)
 | 
						|
                    result.append(
 | 
						|
                        {
 | 
						|
                            "table": table,
 | 
						|
                            "type": "table",
 | 
						|
                        }
 | 
						|
                    )
 | 
						|
            if text:
 | 
						|
                text += "\n\n"
 | 
						|
        if len(text) > 0:
 | 
						|
            result.append(
 | 
						|
                {
 | 
						|
                    "content": text,
 | 
						|
                    "type": "text",
 | 
						|
                }
 | 
						|
            )
 | 
						|
 | 
						|
        return result
 | 
						|
 | 
						|
    def parse_part(self, block, doc: Document):
 | 
						|
        if isinstance(block, CT_P):
 | 
						|
            blocks = []
 | 
						|
            para = Paragraph(block, doc)
 | 
						|
            image_part = self.get_image_part(para, doc)
 | 
						|
            if image_part and para.text:
 | 
						|
                blocks.extend(self.parse_run(para))
 | 
						|
            elif image_part:
 | 
						|
                blocks.append(image_part)
 | 
						|
            elif para.text:
 | 
						|
                blocks.append(para)
 | 
						|
            return blocks
 | 
						|
        elif isinstance(block, CT_Tbl):
 | 
						|
            return [Table(block, doc)]
 | 
						|
 | 
						|
    def parse_run(self, para: Paragraph):
 | 
						|
        runs = para.runs
 | 
						|
        paras = []
 | 
						|
        if runs is None or len(runs) == 0:
 | 
						|
            return paras
 | 
						|
        for run in runs:
 | 
						|
            if run is None or run.element is None:
 | 
						|
                continue
 | 
						|
            p = Paragraph(run.element, para)
 | 
						|
            image_part = self.get_image_part(p, para)
 | 
						|
            if image_part:
 | 
						|
                paras.append(image_part)
 | 
						|
            else:
 | 
						|
                paras.append(p)
 | 
						|
        return paras
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_image_part(graph: Paragraph, doc: Document):
 | 
						|
        images = graph._element.xpath(".//pic:pic")
 | 
						|
        image_parts = []
 | 
						|
        for image in images:
 | 
						|
            for img_id in image.xpath(".//a:blip/@r:embed"):
 | 
						|
                part = doc.part.related_parts[img_id]
 | 
						|
                if isinstance(part, ImagePart):
 | 
						|
                    image_parts.append(part)
 | 
						|
        return image_parts
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def convert_table(rows) -> List[List[str]]:
 | 
						|
        resp_rows = []
 | 
						|
        for i, row in enumerate(rows):
 | 
						|
            resp_row = []
 | 
						|
            for j, cell in enumerate(row.cells):
 | 
						|
                resp_row.append(cell.text if cell is not None else '')
 | 
						|
            resp_rows.append(resp_row)
 | 
						|
 | 
						|
        return resp_rows
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    w = os.fdopen(3, "wb", )
 | 
						|
    r = os.fdopen(4, "rb", )
 | 
						|
 | 
						|
    try:
 | 
						|
        req = json.load(r)
 | 
						|
        ei, et = req['extract_images'], req['extract_tables']
 | 
						|
        loader = DocxLoader(file_content=io.BytesIO(sys.stdin.buffer.read()), extract_images=ei, extract_tables=et)
 | 
						|
        resp = loader.load()
 | 
						|
        print(f"Extracted {len(resp)} items")
 | 
						|
        result = json.dumps({"content": resp}, ensure_ascii=False)
 | 
						|
        w.write(str.encode(result))
 | 
						|
        w.flush()
 | 
						|
        w.close()
 | 
						|
        print("Docx parse done")
 | 
						|
    except Exception as e:
 | 
						|
        print("Docx parse error", e)
 | 
						|
        w.write(str.encode(json.dumps({"error": str(e)})))
 | 
						|
        w.flush()
 | 
						|
        w.close() |