ETL技术实战:从MySQL数据转换到金蝶云星空API写入

  • 轻易云集成顾问-曹润
### MySQL数据集成到金蝶云星空的技术实现:SHd生产汇报单新增-单工序深圳天一-好 在本文中,我们将探讨如何通过轻易云数据集成平台,将MySQL中的数据高效地写入至金蝶云星空系统。在实际案例“SHd生产汇报单新增-单工序深圳天一-好”中,我们具体使用了MySQL接口select和金蝶云API batchSave,实现了大批量的数据快速、可靠且精确的对接。 为了确保集成过程的无缝进行,以下几个关键技术特性被运用: 1. **高吞吐量的数据写入能力**:这一特性能使大量来自MySQL数据库的数据迅速传输到金蝶云星空,极大提升了整个处理流程的时效性。 2. **实时监控与告警**:利用集中化监控和告警系统,不仅能够保证每个任务节点状态的透明可见,还可以及时发现并处理潜在异常情况。 3. **自定义数据转换逻辑**:此功能允许我们根据业务需求灵活定制不同类型的数据转换规则,从而适配不同结构要求,特别是在两套系统之间存在显著差异时,这一点尤为重要。 具体操作过程中,通过MySQL select接口抓取源数据,并应用必要的数据清洗和转换后,再调用金蝶云batchSave API完成目标系统的数据存储。下面是详细步骤介绍,涵盖从初始配置到最终成功对接所需注意的各项细节。 ![钉钉与MES系统接口开发配置](https://pic.qeasy.cloud/D13.png~tplv-syqr462i7n-qeasy.image) ### 调用MySQL接口select获取并加工数据 在轻易云数据集成平台的生命周期中,调用源系统MySQL接口进行数据请求与清洗是至关重要的一步。本文将详细探讨如何通过配置元数据来实现这一过程。 #### 元数据配置解析 元数据配置是实现数据集成的关键。以下是我们使用的元数据配置: ```json { "api": "select", "effect": "QUERY", "method": "SQL", "number": "入库单号", "id": "入库单号", "name": "name", "request": [ { "field": "main_params", "label": "主参数", "type": "object", "describe": "对应主查询语句内的动态参数对象", "children": [ { "field": "limit", "label": "返回的记录数", "type": "int", "describe": "你可以使用 LIMIT 属性来设定返回的记录数。", "value": 100 }, { "field": "offset", "label": "数据偏移量", "type": "int", "describe": "你可以通过OFFSET指定SELECT语句开始查询的数据偏移量。默认情况下偏移量为0。" } ] } ], ... } ``` #### 主查询语句 主查询语句是整个数据请求与清洗过程的核心部分。以下是具体的SQL查询: ```sql select case m.delivery_org when 'T01.01' then CONCAT('HJ', CAST(hj1.id AS CHAR)) when 'T04' then CONCAT('HJGD', CAST(hj1.id AS CHAR)) else CONCAT('HJ', CAST(hj1.id AS CHAR)) end as 生产订单号, a.part_no as 成品编号, c.mode_no as 计划跟踪号, CONCAT('RKD',CAST(a.id AS CHAR)) as 入库单号, date(a.update_time) as 日期, a.confirm_numb as 入库数量, a.id as sourceid, 0.000001 as 工时, c.pre_process_code, c.current_process_code, c.next_process_code, m.delivery_org as 供应组织 from wms_instock_confirm_task_detail a left join wms_instock_purchase_task_detail c on MATTERIAL_TYPE='3' and c.next_process_code is null left join wms_instock_confirm_main_task_detail b on b.connect_uuid=c.uuid left join mbs_nuclear_price_task hj on hj.mold_no=c.mode_no and hj.part_no=a.part_no left join mbs_nuclear_price_info hj1 on hj1.nuclear_price_task_uuid=hj.nuclear_price_task_uuid and hj1.out_type='3' left join mbs_order_plan_bom l on c.mode_no=l.bom_no left join mbs_order_bom m on m.bom_uuid=l.bom_uuid where a.connect_uuid=b.uuid and a.company_code='TYZN' and a.update_time>'2023-08-01' and hj1.create_time>(select config_value from sys_config where config_id=337) and a.is_success2 !='1' and a.is_success1='1' limit :limit offset :offset ``` #### 动态参数对象 在上述SQL查询中,`:limit`和`:offset`为动态参数。这些参数通过`main_params`字段进行传递: ```json { ... "request":[ { ... children: [ { field: 'limit', label: '返回的记录数', type: 'int', describe: '你可以使用 LIMIT 属性来设定返回的记录数。', value: '100' }, { field: 'offset', label: '数据偏移量', type: 'int', describe: '你可以通过OFFSET指定SELECT语句开始查询的数据偏移量。默认情况下偏移量为0。' } ] } ], ... } ``` #### 数据请求与清洗流程 在实际操作中,首先需要调用MySQL接口执行上述SQL查询,并根据业务需求设置合适的`limit`和`offset`值。例如: ```json { main_params: { limit: 100, offset: 0 } } ``` 执行完查询后,平台会自动对返回的数据进行初步清洗,包括去除重复项、格式化日期等操作。这一步骤确保了后续的数据转换与写入能够顺利进行。 #### 实践案例 假设我们需要从源系统获取最近更新的数据,并且每次只获取100条记录,可以通过如下方式配置和调用API: ```json { api: 'select', method: 'SQL', main_params: { limit: 100, offset: 0 } } ``` 在实际应用中,可以根据需要调整`limit`和`offset`值,以实现分页获取大量数据。 通过上述步骤,我们成功实现了从MySQL数据库中调用接口并获取所需数据,为后续的数据转换与写入打下了坚实基础。这一过程不仅提高了数据处理效率,也确保了业务流程的透明性和可追溯性。 ![钉钉与ERP系统接口开发配置](https://pic.qeasy.cloud/S16.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口的技术案例 在数据集成生命周期的第二阶段,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台金蝶云星空API接口所能够接收的格式,并最终写入目标平台。本文将详细介绍如何利用元数据配置完成这一过程。 #### API接口配置与请求参数 首先,我们需要配置金蝶云星空API接口的相关参数。根据提供的元数据配置,目标API为`batchSave`,使用POST方法进行请求。以下是关键参数的详细说明: - **FormId**: 业务对象表单Id,必须填写金蝶的表单ID,这里为`PRD_MORPT`。 - **IsAutoSubmitAndAudit**: 提交并审核,设置为`true`。 - **IsVerifyBaseDataField**: 验证基础资料,设置为`true`。 - **Operation**: 执行的操作,这里为`Save`。 #### 请求头部与主体字段映射 在请求主体中,需要将源平台的数据字段映射到金蝶云星空API所需的字段。以下是主要字段及其映射规则: 1. **单据编号 (FBillNo)**: - 类型:string - 值:{{入库单号}} 2. **单据类型 (FBillType)**: - 类型:string - 值:SCHBD01_SYS - 解析器:ConvertObjectParser (FNumber) 3. **单据日期 (FDate)**: - 类型:string - 值:{{日期}} 4. **生产组织 (FPrdOrgId)**: - 类型:string - 值:根据供应组织动态生成,例如: ```sql case '{{供应组织}}' when 'T01.01' then 'T01.06' when 'T04' then 'T04' else '' end ``` 5. **生产车间 (FWorkshipIdH)**: - 类型:string - 值:根据供应组织动态生成,例如: ```sql case '{{供应组织}}' when 'T01.01' then '13051101' when 'T04' then 'TY880000' else '' end ``` 6. **备注 (FDescription)**: - 类型:string - 值:轻易云对接 #### 明细数据处理 对于明细数据,我们使用数组结构进行处理,其中每个子项代表一个具体的数据条目。以下是主要字段及其映射规则: 1. **源单分录内码 (FSrcEntryId)**: - 类型:string - 值:通过查询获取,例如: ```sql _findCollection find FTreeEntity_FEntryId from 10e0ff3a-25f4-31e0-acbc-6e462ae4fdb8 where FBillNo={{生产订单号}} ``` 2. **物料编码 (FMaterialId)**: - 类型:string - 值:{{items.成品编号}} - 解析器:ConvertObjectParser (FNumber) 3. **完成数量 (FFinishQty)**: - 类型:string - 值:{{items.入库数量}} 4. **人员实作工时 (FHrWorkTime)**: - 类型:string - 值:{{items.工时}} 5. **生产订单内码、分录号等其他关联字段**: 这些字段通过类似于上述查询方式获取,并进行相应填充。 #### 数据转换与写入 在完成上述配置后,通过ETL工具或自定义脚本,将源平台的数据按照上述规则转换为目标格式,并通过HTTP POST请求发送到金蝶云星空API接口。 ```json { "FormId": "PRD_MORPT", "IsAutoSubmitAndAudit": true, "IsVerifyBaseDataField": true, "Operation": "Save", "Model": { "FBillNo": "{{入库单号}}", "FBillType": {"FNumber": "SCHBD01_SYS"}, "FDate": "{{日期}}", "FPrdOrgId": {"FNumber": "_function case '{{供应组织}}' when 'T01.01' then 'T01.06' when 'T04' then 'T04' else '' end"}, "FWorkshipIdH": {"FNumber": "_function case '{{供应组织}}' when 'T01.01' then '13051101' when 'T04' then 'TY880000' else '' end"}, "FDescription": "轻易云对接", "FEntity": [ { "FSrcEntryId": "_findCollection find FTreeEntity_FEntryId from 10e0ff3a-25f4-31e0-acbc-6e462ae4fdb8 where FBillNo={{生产订单号}}", ... } ] } } ``` 通过上述步骤,我们实现了从源平台到金蝶云星空API接口的数据转换和写入。这一过程确保了数据的一致性和准确性,同时大大提高了业务处理效率。 ![用友与CRM系统接口开发配置](https://pic.qeasy.cloud/T25.png~tplv-syqr462i7n-qeasy.image)