ETL技术实战:从MySQL数据转换到金蝶云星空API写入
### MySQL数据集成到金蝶云星空的技术实现:SHd生产汇报单新增-单工序深圳天一-好
在本文中,我们将探讨如何通过轻易云数据集成平台,将MySQL中的数据高效地写入至金蝶云星空系统。在实际案例“SHd生产汇报单新增-单工序深圳天一-好”中,我们具体使用了MySQL接口select和金蝶云API batchSave,实现了大批量的数据快速、可靠且精确的对接。
为了确保集成过程的无缝进行,以下几个关键技术特性被运用:
1. **高吞吐量的数据写入能力**:这一特性能使大量来自MySQL数据库的数据迅速传输到金蝶云星空,极大提升了整个处理流程的时效性。
2. **实时监控与告警**:利用集中化监控和告警系统,不仅能够保证每个任务节点状态的透明可见,还可以及时发现并处理潜在异常情况。
3. **自定义数据转换逻辑**:此功能允许我们根据业务需求灵活定制不同类型的数据转换规则,从而适配不同结构要求,特别是在两套系统之间存在显著差异时,这一点尤为重要。
具体操作过程中,通过MySQL select接口抓取源数据,并应用必要的数据清洗和转换后,再调用金蝶云batchSave API完成目标系统的数据存储。下面是详细步骤介绍,涵盖从初始配置到最终成功对接所需注意的各项细节。
![钉钉与MES系统接口开发配置](https://pic.qeasy.cloud/D13.png~tplv-syqr462i7n-qeasy.image)
### 调用MySQL接口select获取并加工数据
在轻易云数据集成平台的生命周期中,调用源系统MySQL接口进行数据请求与清洗是至关重要的一步。本文将详细探讨如何通过配置元数据来实现这一过程。
#### 元数据配置解析
元数据配置是实现数据集成的关键。以下是我们使用的元数据配置:
```json
{
"api": "select",
"effect": "QUERY",
"method": "SQL",
"number": "入库单号",
"id": "入库单号",
"name": "name",
"request": [
{
"field": "main_params",
"label": "主参数",
"type": "object",
"describe": "对应主查询语句内的动态参数对象",
"children": [
{
"field": "limit",
"label": "返回的记录数",
"type": "int",
"describe": "你可以使用 LIMIT 属性来设定返回的记录数。",
"value": 100
},
{
"field": "offset",
"label": "数据偏移量",
"type": "int",
"describe": "你可以通过OFFSET指定SELECT语句开始查询的数据偏移量。默认情况下偏移量为0。"
}
]
}
],
...
}
```
#### 主查询语句
主查询语句是整个数据请求与清洗过程的核心部分。以下是具体的SQL查询:
```sql
select
case m.delivery_org
when 'T01.01' then CONCAT('HJ', CAST(hj1.id AS CHAR))
when 'T04' then CONCAT('HJGD', CAST(hj1.id AS CHAR))
else CONCAT('HJ', CAST(hj1.id AS CHAR))
end as 生产订单号,
a.part_no as 成品编号,
c.mode_no as 计划跟踪号,
CONCAT('RKD',CAST(a.id AS CHAR)) as 入库单号,
date(a.update_time) as 日期,
a.confirm_numb as 入库数量,
a.id as sourceid,
0.000001 as 工时,
c.pre_process_code,
c.current_process_code,
c.next_process_code,
m.delivery_org as 供应组织
from wms_instock_confirm_task_detail a
left join wms_instock_purchase_task_detail c on MATTERIAL_TYPE='3' and c.next_process_code is null
left join wms_instock_confirm_main_task_detail b on b.connect_uuid=c.uuid
left join mbs_nuclear_price_task hj on hj.mold_no=c.mode_no and hj.part_no=a.part_no
left join mbs_nuclear_price_info hj1 on hj1.nuclear_price_task_uuid=hj.nuclear_price_task_uuid and hj1.out_type='3'
left join mbs_order_plan_bom l on c.mode_no=l.bom_no
left join mbs_order_bom m on m.bom_uuid=l.bom_uuid
where a.connect_uuid=b.uuid
and a.company_code='TYZN'
and a.update_time>'2023-08-01'
and hj1.create_time>(select config_value from sys_config where config_id=337)
and a.is_success2 !='1' and a.is_success1='1'
limit :limit offset :offset
```
#### 动态参数对象
在上述SQL查询中,`:limit`和`:offset`为动态参数。这些参数通过`main_params`字段进行传递:
```json
{
...
"request":[
{
...
children: [
{
field: 'limit',
label: '返回的记录数',
type: 'int',
describe: '你可以使用 LIMIT 属性来设定返回的记录数。',
value: '100'
},
{
field: 'offset',
label: '数据偏移量',
type: 'int',
describe: '你可以通过OFFSET指定SELECT语句开始查询的数据偏移量。默认情况下偏移量为0。'
}
]
}
],
...
}
```
#### 数据请求与清洗流程
在实际操作中,首先需要调用MySQL接口执行上述SQL查询,并根据业务需求设置合适的`limit`和`offset`值。例如:
```json
{
main_params: {
limit: 100,
offset: 0
}
}
```
执行完查询后,平台会自动对返回的数据进行初步清洗,包括去除重复项、格式化日期等操作。这一步骤确保了后续的数据转换与写入能够顺利进行。
#### 实践案例
假设我们需要从源系统获取最近更新的数据,并且每次只获取100条记录,可以通过如下方式配置和调用API:
```json
{
api: 'select',
method: 'SQL',
main_params: {
limit: 100,
offset: 0
}
}
```
在实际应用中,可以根据需要调整`limit`和`offset`值,以实现分页获取大量数据。
通过上述步骤,我们成功实现了从MySQL数据库中调用接口并获取所需数据,为后续的数据转换与写入打下了坚实基础。这一过程不仅提高了数据处理效率,也确保了业务流程的透明性和可追溯性。
![钉钉与ERP系统接口开发配置](https://pic.qeasy.cloud/S16.png~tplv-syqr462i7n-qeasy.image)
### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口的技术案例
在数据集成生命周期的第二阶段,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台金蝶云星空API接口所能够接收的格式,并最终写入目标平台。本文将详细介绍如何利用元数据配置完成这一过程。
#### API接口配置与请求参数
首先,我们需要配置金蝶云星空API接口的相关参数。根据提供的元数据配置,目标API为`batchSave`,使用POST方法进行请求。以下是关键参数的详细说明:
- **FormId**: 业务对象表单Id,必须填写金蝶的表单ID,这里为`PRD_MORPT`。
- **IsAutoSubmitAndAudit**: 提交并审核,设置为`true`。
- **IsVerifyBaseDataField**: 验证基础资料,设置为`true`。
- **Operation**: 执行的操作,这里为`Save`。
#### 请求头部与主体字段映射
在请求主体中,需要将源平台的数据字段映射到金蝶云星空API所需的字段。以下是主要字段及其映射规则:
1. **单据编号 (FBillNo)**:
- 类型:string
- 值:{{入库单号}}
2. **单据类型 (FBillType)**:
- 类型:string
- 值:SCHBD01_SYS
- 解析器:ConvertObjectParser (FNumber)
3. **单据日期 (FDate)**:
- 类型:string
- 值:{{日期}}
4. **生产组织 (FPrdOrgId)**:
- 类型:string
- 值:根据供应组织动态生成,例如:
```sql
case '{{供应组织}}' when 'T01.01' then 'T01.06' when 'T04' then 'T04' else '' end
```
5. **生产车间 (FWorkshipIdH)**:
- 类型:string
- 值:根据供应组织动态生成,例如:
```sql
case '{{供应组织}}' when 'T01.01' then '13051101' when 'T04' then 'TY880000' else '' end
```
6. **备注 (FDescription)**:
- 类型:string
- 值:轻易云对接
#### 明细数据处理
对于明细数据,我们使用数组结构进行处理,其中每个子项代表一个具体的数据条目。以下是主要字段及其映射规则:
1. **源单分录内码 (FSrcEntryId)**:
- 类型:string
- 值:通过查询获取,例如:
```sql
_findCollection find FTreeEntity_FEntryId from 10e0ff3a-25f4-31e0-acbc-6e462ae4fdb8 where FBillNo={{生产订单号}}
```
2. **物料编码 (FMaterialId)**:
- 类型:string
- 值:{{items.成品编号}}
- 解析器:ConvertObjectParser (FNumber)
3. **完成数量 (FFinishQty)**:
- 类型:string
- 值:{{items.入库数量}}
4. **人员实作工时 (FHrWorkTime)**:
- 类型:string
- 值:{{items.工时}}
5. **生产订单内码、分录号等其他关联字段**:
这些字段通过类似于上述查询方式获取,并进行相应填充。
#### 数据转换与写入
在完成上述配置后,通过ETL工具或自定义脚本,将源平台的数据按照上述规则转换为目标格式,并通过HTTP POST请求发送到金蝶云星空API接口。
```json
{
"FormId": "PRD_MORPT",
"IsAutoSubmitAndAudit": true,
"IsVerifyBaseDataField": true,
"Operation": "Save",
"Model": {
"FBillNo": "{{入库单号}}",
"FBillType": {"FNumber": "SCHBD01_SYS"},
"FDate": "{{日期}}",
"FPrdOrgId": {"FNumber": "_function case '{{供应组织}}' when 'T01.01' then 'T01.06' when 'T04' then 'T04' else '' end"},
"FWorkshipIdH": {"FNumber": "_function case '{{供应组织}}' when 'T01.01' then '13051101' when 'T04' then 'TY880000' else '' end"},
"FDescription": "轻易云对接",
"FEntity": [
{
"FSrcEntryId": "_findCollection find FTreeEntity_FEntryId from 10e0ff3a-25f4-31e0-acbc-6e462ae4fdb8 where FBillNo={{生产订单号}}",
...
}
]
}
}
```
通过上述步骤,我们实现了从源平台到金蝶云星空API接口的数据转换和写入。这一过程确保了数据的一致性和准确性,同时大大提高了业务处理效率。
![用友与CRM系统接口开发配置](https://pic.qeasy.cloud/T25.png~tplv-syqr462i7n-qeasy.image)