coze-studio/backend/infra/impl/document/parser/builtin/parse_pdf.py

152 lines
5.6 KiB
Python

import io
import json
import os
import sys
import base64
from typing import Literal
import pdfplumber
from PIL import Image, ImageChops
from pdfminer.pdfcolor import (
LITERAL_DEVICE_CMYK,
)
from pdfminer.pdftypes import (
LITERALS_DCT_DECODE,
LITERALS_FLATE_DECODE,
)
def bbox_overlap(bbox1, bbox2):
x0_1, y0_1, x1_1, y1_1 = bbox1
x0_2, y0_2, x1_2, y1_2 = bbox2
x_overlap = max(0, min(x1_1, x1_2) - max(x0_1, x0_2))
y_overlap = max(0, min(y1_1, y1_2) - max(y0_1, y0_2))
overlap_area = x_overlap * y_overlap
bbox1_area = (x1_1 - x0_1) * (y1_1 - y0_1)
bbox2_area = (x1_2 - x0_2) * (y1_2 - y0_2)
if bbox1_area == 0 or bbox2_area == 0:
return 0
return overlap_area / min(bbox1_area, bbox2_area)
def is_structured_table(table):
if not table:
return False
row_count = len(table)
col_count = max(len(row) for row in table)
return row_count >= 2 and col_count >= 2
def extract_pdf_content(pdf_data: bytes, extract_images, extract_tables: bool, filter_pages: []):
with pdfplumber.open(io.BytesIO(pdf_data)) as pdf:
content = []
for page_num, page in enumerate(pdf.pages):
if filter_pages is not None and page_num + 1 in filter_pages:
print(f"Skip page {page_num + 1}...")
continue
print(f"Processing page {page_num + 1}...")
text = page.extract_text(x_tolerance=2)
content.append({
'type': 'text',
'content': text,
'page': page_num + 1,
'bbox': page.bbox
})
if extract_images:
images = page.images
for img_index, img in enumerate(images):
try:
filters = img['stream'].get_filters()
data = img['stream'].get_data()
buffered = io.BytesIO()
if filters[-1][0] in LITERALS_DCT_DECODE:
if LITERAL_DEVICE_CMYK in img['colorspace']:
i = Image.open(io.BytesIO(data))
i = ImageChops.invert(i)
i = i.convert("RGB")
i.save(buffered, format="PNG")
else:
buffered.write(data)
elif len(filters) == 1 and filters[0][0] in LITERALS_FLATE_DECODE:
width, height = img['srcsize']
channels = len(img['stream'].get_data()) / width / height / (img['bits'] / 8)
mode: Literal["1", "L", "RGB", "CMYK"]
if img['bits'] == 1:
mode = "1"
elif img['bits'] == 8 and channels == 1:
mode = "L"
elif img['bits'] == 8 and channels == 3:
mode = "RGB"
elif img['bits'] == 8 and channels == 4:
mode = "CMYK"
i = Image.frombytes(mode, img['srcsize'], data, "raw")
i.save(buffered, format="PNG")
else:
buffered.write(data)
content.append({
'type': 'image',
'content': base64.b64encode(buffered.getvalue()).decode('utf-8'),
'page': page_num + 1,
'bbox': (img['x0'], img['top'], img['x1'], img['bottom'])
})
except Exception as err:
print(f"Skipping an unsupported image on page {page_num + 1}, error message: {err}")
if extract_tables:
tables = page.extract_tables()
for table in tables:
content.append({
'type': 'table',
'table': table,
'page': page_num + 1,
'bbox': page.bbox
})
content.sort(key=lambda x: (x['page'], x['bbox'][1], x['bbox'][0]))
filtered_content = []
for item in content:
if item['type'] == 'table':
if is_structured_table(item['table']):
filtered_content.append(item)
continue
overlap_found = False
for existing_item in filtered_content:
if existing_item['type'] == 'text' and bbox_overlap(item['bbox'], existing_item['bbox']) > 0.8:
overlap_found = True
break
if overlap_found:
continue
filtered_content.append(item)
return filtered_content
if __name__ == "__main__":
w = os.fdopen(3, "wb", )
r = os.fdopen(4, "rb", )
pdf_data = sys.stdin.buffer.read()
print(f"Read {len(pdf_data)} bytes of PDF data")
try:
req = json.load(r)
ei, et, fp = req['extract_images'], req['extract_tables'], req['filter_pages']
extracted_content = extract_pdf_content(pdf_data, ei, et, fp)
print(f"Extracted {len(extracted_content)} items")
result = json.dumps({"content": extracted_content}, ensure_ascii=False)
w.write(str.encode(result))
w.flush()
w.close()
print("Pdf parse done")
except Exception as e:
print("Pdf parse error", e)
w.write(str.encode(json.dumps({"error": str(e)})))
w.flush()
w.close()