Files
smart-admin/main(2).py
2025-12-13 23:00:09 +08:00

282 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import requests
import base64
import time
from typing import Dict, Any, Optional, List
class UmiOcrClient:
"""
封装了 Umi-OCR HTTP API 的客户端。
提供了文档识别的完整异步流程,以及图片和二维码识别接口。
"""
def __init__(self, base_url: str = "http://154.219.106.93:1224"):
"""
初始化客户端。
:param base_url: Umi-OCR 服务的基地址。
"""
self.base_url = base_url.rstrip('/')
self.headers_json = {"Content-Type": "application/json"}
print(f"UmiOcrClient 初始化。目标地址: {self.base_url}")
def _send_request(self, method: str, endpoint: str, data: Optional[Dict[str, Any]] = None,
files: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
发送 HTTP 请求的通用内部方法。
"""
url = self.base_url + endpoint
try:
if method.upper() == 'GET':
response = requests.get(url)
elif method.upper() == 'POST':
if files:
# 使用 multipart/form-data for file upload (Step 1)
response = requests.post(url, data=data, files=files, timeout=300)
else:
# 使用 application/json for other POST requests
data_str = json.dumps(data)
response = requests.post(url, data=data_str, headers=self.headers_json, timeout=300)
else:
raise ValueError(f"不支持的 HTTP 方法: {method}")
# 检查 HTTP 状态码
response.raise_for_status()
# 解析响应 JSON
return response.json()
except requests.exceptions.RequestException as e:
print(f"请求 {url} 失败。")
raise Exception(f"请求错误: {e}")
except json.JSONDecodeError as e:
# 某些接口(如 /api/doc/clear可能返回空响应体这里需要容错
if response.status_code == 200 and not response.text:
return {"code": 100, "data": "Success (No content)"}
print(f"响应内容不是有效的 JSON。原始内容:\n{response.text}")
raise Exception(f"JSON 解析错误: {e}")
# ==========================================================================
# 📚 文档识别PDF等文件接口封装
# ==========================================================================
def doc_get_options(self) -> Dict[str, Any]:
""" 0. 准备工作:参数查询 (/api/doc/get_options) """
endpoint = "/api/doc/get_options"
return self._send_request("GET", endpoint)
def doc_upload(self, file_path: str, options: Optional[Dict[str, Any]] = None) -> str:
""" 1. 上传待识别文档 (/api/doc/upload)返回任务ID """
print(f"🚀 上传文件: {file_path}")
endpoint = "/api/doc/upload"
# 准备 form-data 的 'json' 部分
json_data = json.dumps(options if options is not None else {})
# 准备 form-data 的 'file' 部分
files = {'file': open(file_path, 'rb')}
data = {'json': json_data}
try:
response = self._send_request("POST", endpoint, data=data, files=files)
if response.get('code') == 100:
return response.get('data') # 任务ID
else:
raise Exception(f"文件上传失败: {response.get('data')}")
finally:
# 确保文件流关闭
if 'file' in files and files['file'].closed == False:
files['file'].close()
def doc_query_result(self, task_id: str, is_data: bool = False, is_unread: bool = True,
data_format: str = "dict") -> Dict[str, Any]:
""" 2. 查询任务状态 (/api/doc/result) """
endpoint = "/api/doc/result"
request_data = {
"id": task_id,
"is_data": is_data,
"is_unread": is_unread,
"format": data_format,
}
return self._send_request("POST", endpoint, request_data)
def doc_get_download_link(self, task_id: str, file_types: List[str] = ["pdfLayered"], ignore_blank: bool = True) -> \
Dict[str, str]:
""" 3. 获取结果下载链接 (/api/doc/download) """
endpoint = "/api/doc/download"
request_data = {
"id": task_id,
"file_types": file_types,
"ignore_blank": ignore_blank,
}
response = self._send_request("POST", endpoint, request_data)
if response.get('code') == 100:
return {
"download_url": self.base_url + response.get('data'), # 完整的下载链接
"file_name": response.get('name')
}
else:
raise Exception(f"生成下载链接失败: {response.get('data')}")
def doc_clear_task(self, task_id: str) -> bool:
""" 5. 任务清理 (/api/doc/clear/<id>) """
endpoint = f"/api/doc/clear/{task_id}"
response = self._send_request("GET", endpoint)
if response.get('code') == 100:
print(f"🧹 任务ID {task_id} 清理成功。")
return True
else:
print(f"⚠️ 任务ID {task_id} 清理失败或不存在: {response.get('data')}")
return False
# --- 高级同步流程函数 ---
def run_doc_ocr_sync(self, file_path: str, options: Optional[Dict[str, Any]] = None,
download_file_types: List[str] = ["pdfLayered"], interval: int = 5) -> Dict[str, Any]:
"""
同步执行整个文档OCR流程 (上传 -> 轮询 -> 下载链接 -> 清理)。
"""
task_id = None
try:
# 1. 上传文件获取任务ID
task_id = self.doc_upload(file_path, options)
print(f"📝 任务启动成功任务ID: {task_id}")
# 2. 轮询任务状态
print("⏳ 正在轮询任务状态...")
while True:
status_res = self.doc_query_result(task_id, is_data=False)
is_done = status_res.get('is_done', False)
state = status_res.get('state', 'unknown')
processed = status_res.get('processed_count', 0)
total = status_res.get('pages_count', '?')
print(f" - 状态: {state.upper()} ({processed}/{total} 页已处理)")
if is_done:
if state == "success":
print("✅ 任务完成!")
break
else:
message = status_res.get('message', '未知错误')
raise Exception(f"任务执行失败 (State: {state}): {message}")
time.sleep(interval) # 休息指定间隔时间后继续查询
# 3. 获取下载链接
download_info = self.doc_get_download_link(task_id, download_file_types)
print(f"🔗 下载链接生成成功。文件: {download_info['file_name']}")
# 4. 获取识别文本(可选步骤,通过查询获取)
text_result = self.doc_query_result(task_id, is_data=True, data_format="text").get('data')
return {
"code": 100,
"task_id": task_id,
"download_info": download_info,
"recognized_text": text_result,
}
except Exception as e:
# 捕捉所有错误并报告
raise Exception(f"文档识别流程失败: {e}")
finally:
# 5. 任务清理 (无论成功失败都清理)
if task_id:
self.doc_clear_task(task_id)
# ==========================================================================
# 📸 其他接口(保留自上次封装)
# ==========================================================================
def ocr_base64_image(self, base64_image: str, options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
""" 图片OCRBase64 识别接口 (/api/ocr) """
print("\n--- 调用图片OCR Base64 识别接口 (/api/ocr) ---")
endpoint = "/api/ocr"
request_data = {
"base64": base64_image,
"options": options if options is not None else {}
}
return self._send_request("POST", endpoint, request_data)
def recognize_qrcode_base64(self, base64_image: str) -> Dict[str, Any]:
""" 【推测接口】二维码Base64 识别 (/api/qrcode/recognize) """
print("\n--- 调用二维码 Base64 识别接口(推测: /api/qrcode/recognize---")
endpoint = "/api/qrcode/recognize"
request_data = {"base64": base64_image}
return self._send_request("POST", endpoint, request_data)
def generate_qrcode(self, text: str, options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
""" 【推测接口】二维码:从文本生成图片 (/api/qrcode/generate) """
print("\n--- 调用二维码生成接口(推测: /api/qrcode/generate---")
endpoint = "/api/qrcode/generate"
request_data = {
"text": text,
"options": options if options is not None else {}
}
return self._send_request("POST", endpoint, request_data)
# ==============================================================================
# 示例用法
# ==============================================================================
if __name__ == '__main__':
# 请根据您的实际情况修改以下参数
TEST_PDF_PATH = r"D:\daima\java\gdyd_zhpb_zgf\第2章 方案建议书.pdf"
# 实例化客户端
client = UmiOcrClient()
# --- 演示文档识别流程 ---
print("\n" + "=" * 50)
print(" 演示:同步文档识别流程 (Step 1-5)")
print("=" * 50)
try:
# 尝试使用 run_doc_ocr_sync 接口
# 假设我们想要中文识别并生成双层PDF和纯文本txt
doc_options = {
"ocr.language": "models/config_chinese.txt",
"pageRangeEnd": 1 # 限制只识别前5页
}
# ⚠️ 注意: 运行此段代码前,请确保您的目录下存在 TEST_PDF_PATH 所指的文件
final_result = client.run_doc_ocr_sync(
file_path=TEST_PDF_PATH,
options=doc_options,
download_file_types=["pdfLayered", "txtPlain"],
interval=3 # 轮询间隔3秒
)
print("\n🎉 整个文档识别任务执行完毕。")
print("--- 总结结果 ---")
print(f"任务ID: {final_result['task_id']}")
print(f"下载文件名: {final_result['download_info']['file_name']}")
print(f"下载链接: {final_result['download_info']['download_url']}")
print(f"部分识别文本 (前100字): \n{final_result['recognized_text']}...")
except Exception as e:
print(f"\n❌ 文档识别流程中断/失败。错误信息: {e}")
print("提示:请确认 Umi-OCR 是否运行在目标 IP/端口,并且文件路径正确。")
print("\n" + "=" * 50)
print(" 演示:图片 Base64 识别")
print("=" * 50)
# 演示图片识别 (与上次封装相同,确认接口可用)
EXAMPLE_BASE64 = "iVBORw0KGgoAAAANSUhEUgAAAC4AAAAXCAIAAAD7ruoFAAAACXBIWXMAABnWAAAZ1gEY0crtAAAAEXRFWHRTb2Z0d2FyZQBTbmlwYXN0ZV0Xzt0AAAHjSURBVEiJ7ZYrcsMwEEBXnR7FLuj0BPIJHJOi0DAZ2qSsMCxEgjYrDQqJdALrBJ2ASndRgeNI8ledutOCLrLl1e7T/mRkjIG/IXe/DWBldRTNEoQSpgNURe5puiiaJehrMuJSXSTgbaby0A1WzLrCCQCmyn0FwoN0V06QONWAt1nUxfnjHYA8p65GjhDKxcjedVH6JOejBPwYh21eE0Wzfe0tqIsEkGXcVcpoMH4CRZ+P0lsQp/pWJ4ripf1XFDFe8GHSHlYcSo9Es31t60RdFlN1RUmrma5oTzTVB8ZUaeeYEC9GmL6kNkDw9BANAQYo3xTNdqUkvHq+rYhDKW0Bj3RSEIpmyWyBaZaMTCrCK+tJ5Jsa07fs3E7esE66HzralRLgJKp0/BD6fJRSxvmDsb6joqkcFXGqMVVFFEHDL2gTxwCAaTabnkFUWhDCHTd9iYrGcAL1ZnqIp5Vpiqh7bCfua7FA4qN0INMcN1+cgCzj+UFxtbmvwdZvGIrI41JiqhZBWhhF8WxorkYPpQwJiWYJeA3rXE4hzcwJ+B96F9zCFHC0FcVegghvFul7oeEE8PvHeJqC0w0AUbbFIT8JnEwGbPKcS2OxU3HMTqD0r4wgEIuiKJ7i4MS16+og8/+bPZRPLa+6Ld2DSzcAAAAASUVORK5CYII="
try:
ocr_result = client.ocr_base64_image(
base64_image=EXAMPLE_BASE64,
options={"data.format": "text"}
)
print("✅ Base64 识别成功。识别结果:", ocr_result.get('data'))
except Exception as e:
print(f"❌ Base64 识别失败。")