import time import json import base64 import hashlib import uuid import requests # 配置信息 API_URL = "http://149.88.90.45:8000/api/v1/files/upload" # API_URL = "http://127.0.0.1:8000/api/v1/files/upload" # 注意:这里的 APP_ID 需要与后端校验逻辑匹配(如果后端开启了严格校验) APP_ID = "your-app-id" # APP_KEY 需要与后端配置文件中的 private-model.appkey 一致 APP_KEY = "test-app-key" # 测试用的文件 URL (这里使用用户提供的示例URL,或者替换为可访问的公共URL) # 注意:如果后端无法访问此内网地址,下载会失败。 # 北京协成致远网络科技有限公司_标段1_标包1_文件.zip FILE_URL = "http://45.192.102.202/%E5%8C%97%E4%BA%AC%E5%8D%8F%E6%88%90%E8%87%B4%E8%BF%9C%E7%BD%91%E7%BB%9C%E7%A7%91%E6%8A%80%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8_%E6%A0%87%E6%AE%B51_%E6%A0%87%E5%8C%851_%E6%96%87%E4%BB%B6.zip" # 北京直真科技股份有限公司_标段1_标包1_文件.zip # FILE_URL = "http://45.192.102.202/%E5%8C%97%E4%BA%AC%E7%9B%B4%E7%9C%9F%E7%A7%91%E6%8A%80%E8%82%A1%E4%BB%BD%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8_%E6%A0%87%E6%AE%B51_%E6%A0%87%E5%8C%851_%E6%96%87%E4%BB%B6.zip" # 博瑞得科技有限公司_标段1_标包1_文件.zip # FILE_URL = "http://45.192.102.202/%E5%8D%9A%E7%91%9E%E5%BE%97%E7%A7%91%E6%8A%80%E6%9C%89%E9%99%90%E5%85%AC%E5%8F%B8_%E6%A0%87%E6%AE%B51_%E6%A0%87%E5%8C%851_%E6%96%87%E4%BB%B6.zip" JSON_FILE_URL = "http://45.192.102.202/%E9%87%87%E8%B4%AD%E6%96%87%E4%BB%B6json20250717161005.txt" def generate_headers(): """生成符合接口规范的请求头""" cur_time = str(int(time.time())) trace_id = str(uuid.uuid4()) # 1. 构造 X-Server-Param (Base64编码的JSON) param_dict = { "appid": APP_ID, "csid": trace_id } param_json = json.dumps(param_dict) x_server_param = base64.b64encode(param_json.encode('utf-8')).decode('utf-8') # 2. 构造 X-CheckSum (MD5签名) # 公式: MD5(appKey + curTime + serverParam) check_sum_str = APP_KEY + cur_time + x_server_param x_check_sum = hashlib.md5(check_sum_str.encode('utf-8')).hexdigest() headers = { "Content-Type": "application/json", "X-CurTime": cur_time, "X-Server-Param": x_server_param, "X-CheckSum": x_check_sum, "traceId": trace_id } return headers def test_upload(file_type="1"): print(f"=== 开始测试接口: {API_URL} [fileType={file_type}] ===") headers = generate_headers() # 构造请求体 (完全按照用户提供的JSON结构) payload = { "fileIds": [FILE_URL], "fileType": file_type, # 1商务、2技术 "projectId": "project001", "packageId": "project001", "supplierId": "supplier001", "jsonFileId": JSON_FILE_URL, "unitId": "unit001" } print("\n[请求头]:") print(json.dumps(headers, indent=2)) print("\n[请求体]:") print(json.dumps(payload, indent=2, ensure_ascii=False)) try: print("\n正在发送请求...") response = requests.post(API_URL, headers=headers, json=payload) print(f"\n[响应状态码]: {response.status_code}") print("[响应内容]:") try: print(json.dumps(response.json(), indent=2, ensure_ascii=False)) except: print(response.text) except requests.exceptions.ConnectionError: print(f"\n❌ 连接失败: 无法连接到 {API_URL}") print("请确保后端服务已启动并在端口 8000 上监听") except Exception as e: print(f"\n❌ 发生错误: {e}") if __name__ == "__main__": # 检查是否安装了 requests 库 try: import requests # 测试场景1: 商务文件 (fileType="1") - 应该直接返回成功,不下载 print("\n>>> 测试场景1: 商务文件 (fileType='1')") test_upload(file_type="1") print("\n" + "="*50 + "\n") # 测试场景2: 技术文件 (fileType="2") - 应该触发下载和处理 # 注意:如果后端无法访问内网URL,这里可能会在日志中报错,但接口会立即返回成功(因为是异步处理) print(">>> 测试场景2: 技术文件 (fileType='2')") test_upload(file_type="2") except ImportError: print("错误: 需要安装 'requests' 库才能运行此脚本。") print("请运行: pip install requests") # http://149.88.90.45/#/home?projectName=0717%E5%9B%9E%E5%BD%92%E6%B5%8B%E8%AF%95-%E6%9D%A8%E6%B1%B6%E5%AE%87&projectId=project001