coze-studio/backend/infra/impl/document/parser/builtin/parse_docx.py

172 lines
5.4 KiB
Python

import io
import os
import json
import sys
import base64
import logging
import time
from abc import ABC
from typing import List, IO
from docx import ImagePart
from docx.oxml import CT_P, CT_Tbl
from docx.table import Table
from docx.text.paragraph import Paragraph
from docx import Document
from PIL import Image
logger = logging.getLogger(__name__)
class DocxLoader(ABC):
def __init__(
self,
file_content: IO[bytes],
extract_images: bool = True,
extract_tables: bool = True,
):
self.file_content = file_content
self.extract_images = extract_images
self.extract_tables = extract_tables
def load(self) -> List[dict]:
result = []
doc = Document(self.file_content)
it = iter(doc.element.body)
text = ""
for part in it:
blocks = self.parse_part(part, doc)
if blocks is None or len(blocks) == 0:
continue
for block in blocks:
if self.extract_images and isinstance(block, list):
for b in block:
image = io.BytesIO()
try:
Image.open(io.BytesIO(b.image.blob)).save(image, format="png")
except Exception as e:
logging.error(f"load image failed, time={time.asctime()}, err:{e}")
raise RuntimeError("ExtractImageError")
if len(text) > 0:
result.append(
{
"content": text,
"type": "text",
}
)
text = ""
result.append(
{
"content": base64.b64encode(image.getvalue()).decode('utf-8'),
"type": "image",
}
)
if isinstance(block, Paragraph):
text += block.text
if self.extract_tables and isinstance(block, Table):
rows = block.rows
if len(text) > 0:
result.append(
{
"content": text,
"type": "text",
}
)
text = ""
table = self.convert_table(rows)
result.append(
{
"table": table,
"type": "table",
}
)
if text:
text += "\n\n"
if len(text) > 0:
result.append(
{
"content": text,
"type": "text",
}
)
return result
def parse_part(self, block, doc: Document):
if isinstance(block, CT_P):
blocks = []
para = Paragraph(block, doc)
image_part = self.get_image_part(para, doc)
if image_part and para.text:
blocks.extend(self.parse_run(para))
elif image_part:
blocks.append(image_part)
elif para.text:
blocks.append(para)
return blocks
elif isinstance(block, CT_Tbl):
return [Table(block, doc)]
def parse_run(self, para: Paragraph):
runs = para.runs
paras = []
if runs is None or len(runs) == 0:
return paras
for run in runs:
if run is None or run.element is None:
continue
p = Paragraph(run.element, para)
image_part = self.get_image_part(p, para)
if image_part:
paras.append(image_part)
else:
paras.append(p)
return paras
@staticmethod
def get_image_part(graph: Paragraph, doc: Document):
images = graph._element.xpath(".//pic:pic")
image_parts = []
for image in images:
for img_id in image.xpath(".//a:blip/@r:embed"):
part = doc.part.related_parts[img_id]
if isinstance(part, ImagePart):
image_parts.append(part)
return image_parts
@staticmethod
def convert_table(rows) -> List[List[str]]:
resp_rows = []
for i, row in enumerate(rows):
resp_row = []
for j, cell in enumerate(row.cells):
resp_row.append(cell.text if cell is not None else '')
resp_rows.append(resp_row)
return resp_rows
if __name__ == "__main__":
w = os.fdopen(3, "wb", )
r = os.fdopen(4, "rb", )
try:
req = json.load(r)
ei, et = req['extract_images'], req['extract_tables']
loader = DocxLoader(file_content=io.BytesIO(sys.stdin.buffer.read()), extract_images=ei, extract_tables=et)
resp = loader.load()
print(f"Extracted {len(resp)} items")
result = json.dumps({"content": resp}, ensure_ascii=False)
w.write(str.encode(result))
w.flush()
w.close()
print("Docx parse done")
except Exception as e:
print("Docx parse error", e)
w.write(str.encode(json.dumps({"error": str(e)})))
w.flush()
w.close()