152 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			152 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Python
		
	
	
	
| import io
 | |
| import json
 | |
| import os
 | |
| import sys
 | |
| import base64
 | |
| 
 | |
| from typing import Literal
 | |
| import pdfplumber
 | |
| from PIL import Image, ImageChops
 | |
| from pdfminer.pdfcolor import (
 | |
|     LITERAL_DEVICE_CMYK,
 | |
| )
 | |
| from pdfminer.pdftypes import (
 | |
|     LITERALS_DCT_DECODE,
 | |
|     LITERALS_FLATE_DECODE,
 | |
| )
 | |
| 
 | |
| def bbox_overlap(bbox1, bbox2):
 | |
|     x0_1, y0_1, x1_1, y1_1 = bbox1
 | |
|     x0_2, y0_2, x1_2, y1_2 = bbox2
 | |
| 
 | |
|     x_overlap = max(0, min(x1_1, x1_2) - max(x0_1, x0_2))
 | |
|     y_overlap = max(0, min(y1_1, y1_2) - max(y0_1, y0_2))
 | |
| 
 | |
|     overlap_area = x_overlap * y_overlap
 | |
| 
 | |
|     bbox1_area = (x1_1 - x0_1) * (y1_1 - y0_1)
 | |
|     bbox2_area = (x1_2 - x0_2) * (y1_2 - y0_2)
 | |
|     if bbox1_area == 0 or bbox2_area == 0:
 | |
|         return 0
 | |
| 
 | |
|     return overlap_area / min(bbox1_area, bbox2_area)
 | |
| 
 | |
| 
 | |
| def is_structured_table(table):
 | |
|     if not table:
 | |
|         return False
 | |
|     row_count = len(table)
 | |
|     col_count = max(len(row) for row in table)
 | |
|     return row_count >= 2 and col_count >= 2
 | |
| 
 | |
| 
 | |
| def extract_pdf_content(pdf_data: bytes, extract_images, extract_tables: bool, filter_pages: []):
 | |
|     with pdfplumber.open(io.BytesIO(pdf_data)) as pdf:
 | |
|         content = []
 | |
| 
 | |
|         for page_num, page in enumerate(pdf.pages):
 | |
|             if filter_pages is not None and page_num + 1 in filter_pages:
 | |
|                 print(f"Skip page {page_num + 1}...")
 | |
|                 continue
 | |
|             print(f"Processing page {page_num + 1}...")
 | |
|             text = page.extract_text(x_tolerance=2)
 | |
|             content.append({
 | |
|                 'type': 'text',
 | |
|                 'content': text,
 | |
|                 'page': page_num + 1,
 | |
|                 'bbox': page.bbox
 | |
|             })
 | |
| 
 | |
|             if extract_images:
 | |
|                 images = page.images
 | |
|                 for img_index, img in enumerate(images):
 | |
|                     try:
 | |
|                         filters = img['stream'].get_filters()
 | |
|                         data = img['stream'].get_data()
 | |
|                         buffered = io.BytesIO()
 | |
| 
 | |
|                         if filters[-1][0] in LITERALS_DCT_DECODE:
 | |
|                             if LITERAL_DEVICE_CMYK in img['colorspace']:
 | |
|                                 i = Image.open(io.BytesIO(data))
 | |
|                                 i = ImageChops.invert(i)
 | |
|                                 i = i.convert("RGB")
 | |
|                                 i.save(buffered, format="PNG")
 | |
|                             else:
 | |
|                                 buffered.write(data)
 | |
| 
 | |
|                         elif len(filters) == 1 and filters[0][0] in LITERALS_FLATE_DECODE:
 | |
|                             width, height = img['srcsize']
 | |
|                             channels = len(img['stream'].get_data()) / width / height / (img['bits'] / 8)
 | |
|                             mode: Literal["1", "L", "RGB", "CMYK"]
 | |
|                             if img['bits'] == 1:
 | |
|                                 mode = "1"
 | |
|                             elif img['bits'] == 8 and channels == 1:
 | |
|                                 mode = "L"
 | |
|                             elif img['bits'] == 8 and channels == 3:
 | |
|                                 mode = "RGB"
 | |
|                             elif img['bits'] == 8 and channels == 4:
 | |
|                                 mode = "CMYK"
 | |
|                             i = Image.frombytes(mode, img['srcsize'], data, "raw")
 | |
|                             i.save(buffered, format="PNG")
 | |
|                         else:
 | |
|                             buffered.write(data)
 | |
|                         content.append({
 | |
|                             'type': 'image',
 | |
|                             'content': base64.b64encode(buffered.getvalue()).decode('utf-8'),
 | |
|                             'page': page_num + 1,
 | |
|                             'bbox': (img['x0'], img['top'], img['x1'], img['bottom'])
 | |
|                         })
 | |
|                     except Exception as err:
 | |
|                         print(f"Skipping an unsupported image on page {page_num + 1}, error message: {err}")
 | |
| 
 | |
|             if extract_tables:
 | |
|                 tables = page.extract_tables()
 | |
|                 for table in tables:
 | |
|                     content.append({
 | |
|                         'type': 'table',
 | |
|                         'table': table,
 | |
|                         'page': page_num + 1,
 | |
|                         'bbox': page.bbox
 | |
|                     })
 | |
| 
 | |
|         content.sort(key=lambda x: (x['page'], x['bbox'][1], x['bbox'][0]))
 | |
| 
 | |
|         filtered_content = []
 | |
|         for item in content:
 | |
|             if item['type'] == 'table':
 | |
|                 if is_structured_table(item['table']):
 | |
|                     filtered_content.append(item)
 | |
|                     continue
 | |
|                 overlap_found = False
 | |
|                 for existing_item in filtered_content:
 | |
|                     if existing_item['type'] == 'text' and bbox_overlap(item['bbox'], existing_item['bbox']) > 0.8:
 | |
|                         overlap_found = True
 | |
|                         break
 | |
|                 if overlap_found:
 | |
|                     continue
 | |
|             filtered_content.append(item)
 | |
| 
 | |
|         return filtered_content
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     w = os.fdopen(3, "wb", )
 | |
|     r = os.fdopen(4, "rb", )
 | |
|     pdf_data = sys.stdin.buffer.read()
 | |
|     print(f"Read {len(pdf_data)} bytes of PDF data")
 | |
| 
 | |
|     try:
 | |
|         req = json.load(r)
 | |
|         ei, et, fp = req['extract_images'], req['extract_tables'], req['filter_pages']
 | |
|         extracted_content = extract_pdf_content(pdf_data, ei, et, fp)
 | |
|         print(f"Extracted {len(extracted_content)} items")
 | |
|         result = json.dumps({"content": extracted_content}, ensure_ascii=False)
 | |
|         w.write(str.encode(result))
 | |
|         w.flush()
 | |
|         w.close()
 | |
|         print("Pdf parse done")
 | |
|     except Exception as e:
 | |
|         print("Pdf parse error", e)
 | |
|         w.write(str.encode(json.dumps({"error": str(e)})))
 | |
|         w.flush()
 | |
|         w.close() |