实现ETL转换及批量写入:轻易云平台对接金蝶云API指南

  • 轻易云集成顾问-姚缘
### MySQL数据集成到金蝶云星空:GD生产订单新增-好 在现代企业的业务流程中,数据集成与管理的重要性不言而喻。本文将分享一个具体的系统对接案例:如何高效地将MySQL数据库中的生产订单数据集成到金蝶云星空平台,以支持企业的资源计划和运营管理。特别是通过轻易云数据集成平台,我们实现了这一复杂任务的一站式解决。 首先,为了确保MySQL数据能够顺利进入金蝶云星空,我们需要考虑以下几个核心技术点: 1. **高吞吐量的数据写入**: 由于生产订单的数据量通常较大,高吞吐量的数据写入能力显得尤为关键。在本次方案中,我们使用`batchSave` API接口批量传输数据,使得大量订单信息可以快速被同步到金蝶云星空。这不仅提升了处理时效性,还有效避免了因单条插入导致的性能瓶颈。 2. **实时监控与告警系统**: 集中的监控与告警功能帮助我们全程追踪每个数据集成任务的状态和性能。一旦出现异常情况,比如网络波动或API调用失败,系统会即时发出告警并自动触发重试机制,从而保证整个流程稳定可靠。 3. **自定义转换逻辑**: 为适应特定业务需求和不同的数据结构,我们在轻易云平台上配置了自定义的数据转换逻辑。通过可视化工具直观设计映射关系,把MySQL中的原始生产订单字段精确映射为符合金蝶云要求的新格式。这一步骤极大优化了后续操作,并且减少人工干预所带来的误差风险。 4. **分页和限流处理**: 在从MySQL获取大规模库存记录时,为防止一次性读取过多数据引发服务端压力或者超时问题,我们采用分页查询模式,并配合限流策略。例如,通过执行类似于 `select * from orders limit 1000 offset n*1000` 的命令,可以分批、规范有序地抓取数据信息。这种方法既保障读取效率,又兼顾系统负载均衡。 5. **异常处理及错误重试机制**: 数据对接过程中不可避免会遇到各种异常情形,如网络断开、服务端挂起等。因此,建立完善的错误重试机制尤为重要。在本案实施中,一旦检测到传输失败,将立即进行详细日志记录,并依据设定规则尝试重新发送请求,直到任务成功完成或达到最大重试次数。同时,这些环节上的细致管理措施也确保没有一张单据遗漏,所有内容准确无误 ![系统集成平台API接口配置](https://pic.qeasy.cloud/D5.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台调用MySQL接口获取并加工数据 在数据集成过程中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用MySQL接口`select`获取并加工数据,并结合具体的元数据配置示例进行说明。 #### 元数据配置解析 在本案例中,我们使用了以下元数据配置: ```json { "api": "select", "effect": "QUERY", "method": "POST", "number": "生产订单号", "id": "生产订单号", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "value": "1", "children": [ { "field": "limit", "label": "limit", "type": "string", "value": "100" }, { "field": "offset", "label": "offset", "type": "string" } ] } ], "otherRequest": [ { "field": "main_sql", "label": "main_sql", "type": "string", ... } ], ... } ``` #### 数据请求与清洗 在数据请求阶段,我们需要通过MySQL接口`select`语句从源系统中获取所需的数据。以下是具体的SQL查询语句: ```sql select (select dispatch from mes_dispatch_record_process c where c.to_be_dispatch_uuid=a.to_be_dispatch_uuid LIMIT 1) as 生产订单号, (select if(weekday(dispatch_time)=6,date_sub(dispatch_time, interval 1 day),dispatch_time) from mes_dispatch_record_process c where c.to_be_dispatch_uuid=a.to_be_dispatch_uuid LIMIT 1) as 计划开工时间, (select process_end_time from mes_dispatch_record_process c where c.to_be_dispatch_uuid=a.to_be_dispatch_uuid order by process_end_time desc LIMIT 1) as 计划完工时间, a.prod_part_no as 成品编号, a.dispatched_num as 生产数量, b.bom_no as 计划跟踪号, a.id as sourceid, case when b.mac_choose_depart='1' then 'GDTY-001' when b.mac_choose_depart='2' then 'GDTY-002' when b.mac_choose_depart='3' then 'GDTY-003' when b.mac_choose_depart='4' then 'GDTY-004' else '' end as 计划组 from mes_to_be_dispatched a left join oms_order_bom b on a.bom_uuid=b.bom_uuid left join oms_order e on b.order_uuid=e.order_uuid where a.company_code='GDTY' and a.status='3' and a.is_success !='1' and b.material_source='2' and b.is_close='1' and b.kingdee_fwl='0' limit :limit offset :offset ``` 该查询语句从多个表中提取相关字段,并通过子查询和条件判断对数据进行初步清洗和加工。例如,通过子查询获取`生产订单号`、`计划开工时间`和`计划完工时间`,并根据不同条件设置相应的`计划组`。 #### 数据转换与写入 在获取并清洗数据后,下一步是将其转换为目标格式并写入目标系统。在轻易云平台上,这一步通常涉及到配置转换规则和目标接口。由于本文重点聚焦于调用源系统接口,因此不再详细展开转换与写入过程。 #### API接口调用示例 为了实现上述功能,我们需要通过API接口进行实际调用。以下是一个示例请求: ```json { "_api_":"select", "_effect_":"QUERY", "_method_":"POST", "_number_":"生产订单号", "_id_":"生产订单号", "_idCheck_":"true", "_request_":[{"field":"main_params","label":"main_params","type":"object","describe":"111","value":"1","children":[{"field":"limit","label":"limit","type":"string","value":"100"},{"field":"offset","label":"offset","type":"string"}]}], "_otherRequest_":[{"field":"main_sql","label":"main_sql","type":"string","describe":"111","value":"select \n(select dispatch from mes_dispatch_record_process c \nwhere c.to_be_dispatch_uuid=a.to_be_dispatch_uuid LIMIT 1) as ..."}] } ``` 该请求包含了必要的参数和SQL查询语句,通过POST方法发送到指定的API端点,以获取所需的数据。 #### 小结 通过上述步骤,我们可以利用轻易云数据集成平台高效地调用MySQL接口获取并加工数据。这一过程不仅简化了复杂的数据处理任务,还提高了业务流程的透明度和效率。在实际应用中,可以根据具体需求进一步调整和优化元数据配置,以满足不同场景下的数据集成需求。 ![用友BIP接口开发配置](https://pic.qeasy.cloud/S11.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台实现ETL转换并写入金蝶云星空API接口 在使用轻易云数据集成平台进行数据集成的过程中,第二步是将已经集成的源平台数据进行ETL(提取、转换、加载)转换,并将其转为目标平台金蝶云星空API接口所能够接收的格式,最终写入目标平台。本文将详细探讨这一过程中的技术细节和具体实现方法。 #### API接口配置与元数据解析 首先,我们需要了解金蝶云星空API接口的配置要求。根据提供的元数据配置,可以看到该接口主要通过`batchSave`方法进行批量保存操作,采用POST请求方式。 ```json { "api": "batchSave", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"{{生产订单号}}"}, {"field":"FBillType","label":"单据类型","type":"string","describe":"单据类型","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"SCDD09_SYS"}, {"field":"Fdate","label":"单据日期","type":"string","describe":"单据日期","value":"{{计划开工时间}}"}, {"field":"FPrdOrgId","label":"生产组织","type":"string","describe":"生产组织","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"T04"}, {"field":"FWorkGroupId","label":"计划组","type":"string","value":"{{计划组}}","parser":{"name":"ConvertObjectParser","params":"FNumber"}}, {"field":"FTreeEntity","label":"明细","type":"array", "describe": "明细", "children": [ {"field": "FMaterialId", "label": "物料编码", "type": "string", "describe": "子项物料编码", "parser": {"name": "ConvertObjectParser", "params": "FNumber"}, "value": "{{成品编号}}"}, {"field": "Fqty", "label": "数量", "type": "string", "describe": "数量", "value": "{{生产数量}}"}, {"field": "FProductType", "label": "产品类型", "type": "string", "describe": "产品类型", "value": 1}, {"field": "FPlanFinishDate", "label": “计划结束时间”, “type”: “string”, “describe”: “计划结束时间”, “value”: “{{计划完工时间}}”}, {"field": “FPlanStartDate”, “label”: “计划开工时间”, “type”: “string”, “describe”: “计划开工时间”, “value”: “{{计划开工时间}}”}, {"field”: “FISBACKFLUSH”, “label”: “倒冲领料”, ”type”: ”string”, ”describe”: ”倒冲领料”}, {"field”: ”FMTONO”, ”label”: ”计划跟踪号”, ”type”: ”string”, ”describe”: ”计划跟踪号”,”value”:”{{计划跟踪号}}”}, {"field”:”FWorkShopID”,”label”:”生产车间”,”type”:”string”,”describe”:”生产车间”,”parser”:{“name”:”ConvertObjectParser”,”params”:”FNumber“},”value“:"_function case '{{计划组}}' when 'GDTY-002' then 'TY840301A' else 'TY840301' end"} ]} ], ... } ``` #### 数据字段映射与转换 在上述配置中,每个字段都有其特定的属性和描述。例如,`FBillNo`表示单据编号,其值来源于源数据中的`生产订单号`。类似地,`FBillType`、`Fdate`等字段也有对应的源数据字段。 特别需要注意的是一些字段的值需要经过转换,例如: - `FBillType`和`FPrdOrgId`等字段使用了`ConvertObjectParser`进行对象转换。 - `FTreeEntity`中的子项物料编码、数量、产品类型等字段也需要根据源数据进行相应的映射和转换。 #### 实现批量保存操作 为了实现批量保存操作,需要构建一个符合金蝶云星空API要求的数据结构。以下是一个示例代码片段,展示了如何将源数据映射到目标API请求体中: ```python import requests import json # 构建请求体 payload = { 'FormId': 'PRD_MO', 'Operation': 'BatchSave', 'IsAutoSubmitAndAudit': False, 'IsVerifyBaseDataField': True, 'Model': { 'FBillNo': source_data['生产订单号'], 'FBillType': {'FNumber': 'SCDD09_SYS'}, 'Fdate': source_data['计划开工时间'], 'FPrdOrgId': {'FNumber': 'T04'}, 'FWorkGroupId': {'FNumber': source_data['计划组']}, 'FTreeEntity': [ { 'FMaterialId': {'FNumber': item['成品编号']}, 'Fqty': item['生产数量'], 'FProductType': 1, 'FPlanFinishDate': item['计划完工时间'], 'FPlanStartDate': item['计划开工时间'], 'FMTONO': item['计划跟踪号'], 'FWorkShopID': {'_function case ': f"'{source_data['计划组']}' when 'GDTY-002' then 'TY840301A' else 'TY840301'"} } for item in source_data['明细'] ] } } # 发起POST请求 response = requests.post('https://api.kingdee.com/k3cloud/batchSave', headers=headers, data=json.dumps(payload)) # 检查响应状态 if response.status_code == 200: print("Data saved successfully.") else: print(f"Failed to save data: {response.text}") ``` #### 数据验证与错误处理 在实际操作中,还需要对数据进行验证,以确保所有必填字段都有正确的数据,并且格式符合目标平台要求。例如,在发送请求前,可以增加一些验证逻辑: ```python def validate_data(data): required_fields = ['生产订单号', '计划开工时间', ...] for field in required_fields: if field not in data or not data[field]: raise ValueError(f"Missing required field: {field}") try: validate_data(source_data) except ValueError as e: print(f"Data validation error: {e}") ``` 此外,还可以在捕获异常时记录详细日志,以便后续排查问题: ```python try: response = requests.post('https://api.kingdee.com/k3cloud/batchSave', headers=headers, data=json.dumps(payload)) response.raise_for_status() except requests.exceptions.RequestException as e: print(f"HTTP request failed: {e}") ``` 通过上述步骤,我们可以有效地将源平台的数据经过ETL转换后写入到金蝶云星空API接口,实现不同系统间的数据无缝对接。这不仅提高了数据处理的效率,也确保了业务流程的透明度和可追溯性。 ![系统集成平台API接口配置](https://pic.qeasy.cloud/T12.png~tplv-syqr462i7n-qeasy.image)