I第一次提交

This commit is contained in:
zhangzf1119
2025-12-13 23:00:09 +08:00
commit ac08a0b6ff
180 changed files with 28023 additions and 0 deletions

282
main(2).py Normal file
View File

@@ -0,0 +1,282 @@
import json
import requests
import base64
import time
from typing import Dict, Any, Optional, List
class UmiOcrClient:
"""
封装了 Umi-OCR HTTP API 的客户端。
提供了文档识别的完整异步流程,以及图片和二维码识别接口。
"""
def __init__(self, base_url: str = "http://154.219.106.93:1224"):
"""
初始化客户端。
:param base_url: Umi-OCR 服务的基地址。
"""
self.base_url = base_url.rstrip('/')
self.headers_json = {"Content-Type": "application/json"}
print(f"UmiOcrClient 初始化。目标地址: {self.base_url}")
def _send_request(self, method: str, endpoint: str, data: Optional[Dict[str, Any]] = None,
files: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
发送 HTTP 请求的通用内部方法。
"""
url = self.base_url + endpoint
try:
if method.upper() == 'GET':
response = requests.get(url)
elif method.upper() == 'POST':
if files:
# 使用 multipart/form-data for file upload (Step 1)
response = requests.post(url, data=data, files=files, timeout=300)
else:
# 使用 application/json for other POST requests
data_str = json.dumps(data)
response = requests.post(url, data=data_str, headers=self.headers_json, timeout=300)
else:
raise ValueError(f"不支持的 HTTP 方法: {method}")
# 检查 HTTP 状态码
response.raise_for_status()
# 解析响应 JSON
return response.json()
except requests.exceptions.RequestException as e:
print(f"请求 {url} 失败。")
raise Exception(f"请求错误: {e}")
except json.JSONDecodeError as e:
# 某些接口(如 /api/doc/clear可能返回空响应体这里需要容错
if response.status_code == 200 and not response.text:
return {"code": 100, "data": "Success (No content)"}
print(f"响应内容不是有效的 JSON。原始内容:\n{response.text}")
raise Exception(f"JSON 解析错误: {e}")
# ==========================================================================
# 📚 文档识别PDF等文件接口封装
# ==========================================================================
def doc_get_options(self) -> Dict[str, Any]:
""" 0. 准备工作:参数查询 (/api/doc/get_options) """
endpoint = "/api/doc/get_options"
return self._send_request("GET", endpoint)
def doc_upload(self, file_path: str, options: Optional[Dict[str, Any]] = None) -> str:
""" 1. 上传待识别文档 (/api/doc/upload)返回任务ID """
print(f"🚀 上传文件: {file_path}")
endpoint = "/api/doc/upload"
# 准备 form-data 的 'json' 部分
json_data = json.dumps(options if options is not None else {})
# 准备 form-data 的 'file' 部分
files = {'file': open(file_path, 'rb')}
data = {'json': json_data}
try:
response = self._send_request("POST", endpoint, data=data, files=files)
if response.get('code') == 100:
return response.get('data') # 任务ID
else:
raise Exception(f"文件上传失败: {response.get('data')}")
finally:
# 确保文件流关闭
if 'file' in files and files['file'].closed == False:
files['file'].close()
def doc_query_result(self, task_id: str, is_data: bool = False, is_unread: bool = True,
data_format: str = "dict") -> Dict[str, Any]:
""" 2. 查询任务状态 (/api/doc/result) """
endpoint = "/api/doc/result"
request_data = {
"id": task_id,
"is_data": is_data,
"is_unread": is_unread,
"format": data_format,
}
return self._send_request("POST", endpoint, request_data)
def doc_get_download_link(self, task_id: str, file_types: List[str] = ["pdfLayered"], ignore_blank: bool = True) -> \
Dict[str, str]:
""" 3. 获取结果下载链接 (/api/doc/download) """
endpoint = "/api/doc/download"
request_data = {
"id": task_id,
"file_types": file_types,
"ignore_blank": ignore_blank,
}
response = self._send_request("POST", endpoint, request_data)
if response.get('code') == 100:
return {
"download_url": self.base_url + response.get('data'), # 完整的下载链接
"file_name": response.get('name')
}
else:
raise Exception(f"生成下载链接失败: {response.get('data')}")
def doc_clear_task(self, task_id: str) -> bool:
""" 5. 任务清理 (/api/doc/clear/<id>) """
endpoint = f"/api/doc/clear/{task_id}"
response = self._send_request("GET", endpoint)
if response.get('code') == 100:
print(f"🧹 任务ID {task_id} 清理成功。")
return True
else:
print(f"⚠️ 任务ID {task_id} 清理失败或不存在: {response.get('data')}")
return False
# --- 高级同步流程函数 ---
def run_doc_ocr_sync(self, file_path: str, options: Optional[Dict[str, Any]] = None,
download_file_types: List[str] = ["pdfLayered"], interval: int = 5) -> Dict[str, Any]:
"""
同步执行整个文档OCR流程 (上传 -> 轮询 -> 下载链接 -> 清理)。
"""
task_id = None
try:
# 1. 上传文件获取任务ID
task_id = self.doc_upload(file_path, options)
print(f"📝 任务启动成功任务ID: {task_id}")
# 2. 轮询任务状态
print("⏳ 正在轮询任务状态...")
while True:
status_res = self.doc_query_result(task_id, is_data=False)
is_done = status_res.get('is_done', False)
state = status_res.get('state', 'unknown')
processed = status_res.get('processed_count', 0)
total = status_res.get('pages_count', '?')
print(f" - 状态: {state.upper()} ({processed}/{total} 页已处理)")
if is_done:
if state == "success":
print("✅ 任务完成!")
break
else:
message = status_res.get('message', '未知错误')
raise Exception(f"任务执行失败 (State: {state}): {message}")
time.sleep(interval) # 休息指定间隔时间后继续查询
# 3. 获取下载链接
download_info = self.doc_get_download_link(task_id, download_file_types)
print(f"🔗 下载链接生成成功。文件: {download_info['file_name']}")
# 4. 获取识别文本(可选步骤,通过查询获取)
text_result = self.doc_query_result(task_id, is_data=True, data_format="text").get('data')
return {
"code": 100,
"task_id": task_id,
"download_info": download_info,
"recognized_text": text_result,
}
except Exception as e:
# 捕捉所有错误并报告
raise Exception(f"文档识别流程失败: {e}")
finally:
# 5. 任务清理 (无论成功失败都清理)
if task_id:
self.doc_clear_task(task_id)
# ==========================================================================
# 📸 其他接口(保留自上次封装)
# ==========================================================================
def ocr_base64_image(self, base64_image: str, options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
""" 图片OCRBase64 识别接口 (/api/ocr) """
print("\n--- 调用图片OCR Base64 识别接口 (/api/ocr) ---")
endpoint = "/api/ocr"
request_data = {
"base64": base64_image,
"options": options if options is not None else {}
}
return self._send_request("POST", endpoint, request_data)
def recognize_qrcode_base64(self, base64_image: str) -> Dict[str, Any]:
""" 【推测接口】二维码Base64 识别 (/api/qrcode/recognize) """
print("\n--- 调用二维码 Base64 识别接口(推测: /api/qrcode/recognize---")
endpoint = "/api/qrcode/recognize"
request_data = {"base64": base64_image}
return self._send_request("POST", endpoint, request_data)
def generate_qrcode(self, text: str, options: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
""" 【推测接口】二维码:从文本生成图片 (/api/qrcode/generate) """
print("\n--- 调用二维码生成接口(推测: /api/qrcode/generate---")
endpoint = "/api/qrcode/generate"
request_data = {
"text": text,
"options": options if options is not None else {}
}
return self._send_request("POST", endpoint, request_data)
# ==============================================================================
# 示例用法
# ==============================================================================
if __name__ == '__main__':
# 请根据您的实际情况修改以下参数
TEST_PDF_PATH = r"D:\daima\java\gdyd_zhpb_zgf\第2章 方案建议书.pdf"
# 实例化客户端
client = UmiOcrClient()
# --- 演示文档识别流程 ---
print("\n" + "=" * 50)
print(" 演示:同步文档识别流程 (Step 1-5)")
print("=" * 50)
try:
# 尝试使用 run_doc_ocr_sync 接口
# 假设我们想要中文识别并生成双层PDF和纯文本txt
doc_options = {
"ocr.language": "models/config_chinese.txt",
"pageRangeEnd": 1 # 限制只识别前5页
}
# ⚠️ 注意: 运行此段代码前,请确保您的目录下存在 TEST_PDF_PATH 所指的文件
final_result = client.run_doc_ocr_sync(
file_path=TEST_PDF_PATH,
options=doc_options,
download_file_types=["pdfLayered", "txtPlain"],
interval=3 # 轮询间隔3秒
)
print("\n🎉 整个文档识别任务执行完毕。")
print("--- 总结结果 ---")
print(f"任务ID: {final_result['task_id']}")
print(f"下载文件名: {final_result['download_info']['file_name']}")
print(f"下载链接: {final_result['download_info']['download_url']}")
print(f"部分识别文本 (前100字): \n{final_result['recognized_text']}...")
except Exception as e:
print(f"\n❌ 文档识别流程中断/失败。错误信息: {e}")
print("提示:请确认 Umi-OCR 是否运行在目标 IP/端口,并且文件路径正确。")
print("\n" + "=" * 50)
print(" 演示:图片 Base64 识别")
print("=" * 50)
# 演示图片识别 (与上次封装相同,确认接口可用)
EXAMPLE_BASE64 = "iVBORw0KGgoAAAANSUhEUgAAAC4AAAAXCAIAAAD7ruoFAAAACXBIWXMAABnWAAAZ1gEY0crtAAAAEXRFWHRTb2Z0d2FyZQBTbmlwYXN0ZV0Xzt0AAAHjSURBVEiJ7ZYrcsMwEEBXnR7FLuj0BPIJHJOi0DAZ2qSsMCxEgjYrDQqJdALrBJ2ASndRgeNI8ledutOCLrLl1e7T/mRkjIG/IXe/DWBldRTNEoQSpgNURe5puiiaJehrMuJSXSTgbaby0A1WzLrCCQCmyn0FwoN0V06QONWAt1nUxfnjHYA8p65GjhDKxcjedVH6JOejBPwYh21eE0Wzfe0tqIsEkGXcVcpoMH4CRZ+P0lsQp/pWJ4ripf1XFDFe8GHSHlYcSo9Es31t60RdFlN1RUmrma5oTzTVB8ZUaeeYEC9GmL6kNkDw9BANAQYo3xTNdqUkvHq+rYhDKW0Bj3RSEIpmyWyBaZaMTCrCK+tJ5Jsa07fs3E7esE66HzralRLgJKp0/BD6fJRSxvmDsb6joqkcFXGqMVVFFEHDL2gTxwCAaTabnkFUWhDCHTd9iYrGcAL1ZnqIp5Vpiqh7bCfua7FA4qN0INMcN1+cgCzj+UFxtbmvwdZvGIrI41JiqhZBWhhF8WxorkYPpQwJiWYJeA3rXE4hzcwJ+B96F9zCFHC0FcVegghvFul7oeEE8PvHeJqC0w0AUbbFIT8JnEwGbPKcS2OxU3HMTqD0r4wgEIuiKJ7i4MS16+og8/+bPZRPLa+6Ld2DSzcAAAAASUVORK5CYII="
try:
ocr_result = client.ocr_base64_image(
base64_image=EXAMPLE_BASE64,
options={"data.format": "text"}
)
print("✅ Base64 识别成功。识别结果:", ocr_result.get('data'))
except Exception as e:
print(f"❌ Base64 识别失败。")