ETL数据集成:将源数据转换为金蝶云星空格式

  • 轻易云集成顾问-孙传友
### 案例分享:聚水潭数据集成到金蝶云星空 在一次系统对接项目中,我们使用轻易云数据集成平台实现了聚水潭数据向金蝶云星空的无缝同步。具体的方案名称为“[自动]-其他出库单同步”,其中涵盖多个技术要点,确保数据安全、快速且准确地传递。 #### 接口调用与分页处理 首先,通过调用聚水潭接口 `/open/other/inout/query` 获取需要同步的数据。这一过程中,我们尤其注意了分页和限流问题,以防止大批量的数据请求时导致接口性能下降或被限制访问。例如,在每次请求中加入适当的 `page_num` 和 `page_size` 参数,以确保获取所有可能的数据条目,并遵循最大并发限制策略,从而避免频繁碰撞影响整体效率。 ```json { "method": "POST", "url": "/open/other/inout/query", "data": { "page_num": 1, "page_size": 100 } } ``` #### 数据格式转换与映射 在处理两者之间的数据格式差异时,通过定制化映射规则将聚水潭中的原始数据转换为符合金蝶云星空要求的结构体。这里我们利用了轻易云的平台特性,通过图形化界面定义字段对应关系,并设置必要的规则以适应双方系统间不同的数据模型。 例如,将聚水潭返回的出库单信息映射至金蝶相应字段,这样不仅简化开发工作,还有效减少由于手动操作带来的错误风险。 ```json { // 示例代码展示如何进行简单字段映射: "src_field_1": "dest_field_1", ... } ``` #### 快速写入与异常重试机制 为了确保大量集成数据能够稳定快速地写入至金蝶,我们使用其批量保存API `/batchSave`,结合平台提供的多线程并发功能,将大量出库单信息高效提交。另外,为应对网络抖动或者临时性故障等情况,每次提交失败后我们引入了智能重试机制,使得任务具备更高可靠性和容错能力。一旦发现异常,会记录日志信息并触发告警程序,及时通知维护人员排查问题根源。 这些实践保证了整个过程不仅透明,可监控,同时也极大提升了业务运作效率。在随后内容中将深入介绍各环节实现细节及相关配置。 ![金蝶与WMS系统接口开发配置](https://pic.qeasy.cloud/D12.png~tplv-syqr462i7n-qeasy.image) ### 调用聚水潭接口获取并加工数据的技术案例 在轻易云数据集成平台中,调用源系统接口是数据处理生命周期的第一步。本文将详细探讨如何通过调用聚水潭接口`/open/other/inout/query`来获取并加工数据。 #### 接口调用配置 首先,我们需要配置元数据以便正确调用聚水潭接口。以下是元数据配置的关键部分: ```json { "api": "/open/other/inout/query", "method": "POST", "number": "io_id", "id": "io_id", "pagination": { "pageSize": 50 }, "idCheck": true, "request": [ { "field": "modified_begin", "label": "修改起始时间", "type": "string", "value": "{{LAST_SYNC_TIME|datetime}}" }, { "field": "modified_end", "label": "修改结束时间", "type": "string", "value": "{{CURRENT_TIME|datetime}}" }, { "field": "types", "label": "单据类型", "type": "string", "describe": "单据类型 :其它退货,其它出库,其它进仓", "value": "其它出库", "parser": { "name": "StringToArray", "params": "," } }, { "field": "status", "label": "单据状态", "type": "string", "describe":"单据状态,Confirmed=生效,WaitConfirm待审核,Archive=归档,Cancelled=取消", "value":"Confirmed" }, { "field":"page_index", "label":"开始页码", 'type':"string", 'value':"1" }, { 'field':'page_size', 'label':'每页行数', 'type':'string', 'value':'{PAGINATION_PAGE_SIZE}' } ], 'condition_bk':[[{'field':'f_status','logic':'eqv2','value':'Confirmed'}],[]], 'condition':[[{'field':'f_status','logic':'eqv2','value':'Confirmed'}],[]] } ``` #### 请求参数详解 1. **modified_begin** 和 **modified_end**: 用于指定查询的时间范围。`{{LAST_SYNC_TIME|datetime}}`和`{{CURRENT_TIME|datetime}}`分别表示上次同步时间和当前时间。 2. **types**: 指定单据类型为“其它出库”。使用了`StringToArray`解析器将字符串转换为数组。 3. **status**: 固定为“Confirmed”,表示只查询已生效的单据。 4. **page_index** 和 **page_size**: 用于分页查询,每次请求50条记录。 #### 数据请求与清洗 在实际操作中,我们需要确保请求的数据符合业务需求,并进行必要的数据清洗。以下是一个示例代码片段,用于发起请求并处理返回的数据: ```python import requests import datetime # 设置请求参数 payload = { 'modified_begin': (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S'), 'modified_end': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'types': ['其它出库'], 'status': 'Confirmed', 'page_index': 1, 'page_size': 50 } # 发起POST请求 response = requests.post('https://api.jushuitan.com/open/other/inout/query', json=payload) # 检查响应状态 if response.status_code == 200: data = response.json() # 数据清洗与转换 cleaned_data = [] for item in data['data']: cleaned_item = { 'io_id': item['io_id'], 'status': item['status'], # 添加其他需要的字段 } cleaned_data.append(cleaned_item) else: print(f"Error: {response.status_code}") ``` #### 数据转换与写入 在获取并清洗数据后,需要将其转换为目标系统所需的格式,并写入到目标数据库或系统中。这一步通常涉及到对字段进行映射和格式化。 ```python def transform_and_write(data): transformed_data = [] for item in data: transformed_item = { 'ID': item['io_id'], 'Status': item['status'], # 添加其他字段映射 } transformed_data.append(transformed_item) # 写入目标系统(例如数据库) write_to_database(transformed_data) def write_to_database(data): # 假设使用SQLAlchemy进行数据库操作 from sqlalchemy import create_engine, Table, MetaData engine = create_engine('mysql+pymysql://user:password@host/dbname') metadata = MetaData(bind=engine) table = Table('target_table', metadata, autoload=True) with engine.connect() as conn: conn.execute(table.insert(), data) # 调用转换与写入函数 transform_and_write(cleaned_data) ``` 通过以上步骤,我们实现了从聚水潭接口获取数据、清洗、转换并写入目标系统的完整流程。这不仅确保了数据的一致性和准确性,还提高了业务处理的效率。 ![钉钉与MES系统接口开发配置](https://pic.qeasy.cloud/S29.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台实现金蝶云星空API接口的数据转换与写入 在数据集成过程中,ETL(提取、转换、加载)是关键的一环。本文将详细探讨如何使用轻易云数据集成平台,将源平台的数据进行ETL转换,并最终写入到金蝶云星空API接口所能接收的格式。 #### 元数据配置解析 首先,我们需要理解元数据配置中的各个字段及其作用。以下是我们将要处理的主要字段: 1. **FBillTypeID**:单据类型,固定值为`QTCKD01_SYS`。 2. **FBillNo**:单据编号,对应源平台中的`{io_id}`。 3. **FStockOrgId**:库存组织,通过条件判断`{wms_co_id}`来确定具体值。 4. **FPickOrgId**:领用组织,与库存组织类似,通过条件判断确定。 5. **FStockDirect**:库存方向,固定值为`GENERAL`。 6. **FDate**:日期,对应源平台中的`{io_date}`。 7. **FOwnerTypeIdHead**:货主类型,固定值为`BD_OwnerOrg`。 8. **FOwnerIdHead**:货主,通过条件判断`{wms_co_id}`来确定具体值。 9. **FNote**:备注,对应源平台中的`{remark}`。 10. **FDeptId**:领料部门,通过SKU编码前缀判断来确定具体值。 11. **F_TLWD_Assistant**:出入库类型,对应源平台中的`{drop_co_name}`。 此外,还有一个重要的字段是**FEntity**,它包含了明细信息,如物料编码、实发数量、发货仓库等。 #### 数据转换逻辑 在进行数据转换时,我们需要特别注意以下几点: 1. **条件判断与映射** - 库存组织(FStockOrgId)和领用组织(FPickOrgId)的值是通过对`{wms_co_id}`进行条件判断来确定的。例如: ```sql _function case '{wms_co_id}' when '13328244' then '101' when '10404759' then '101' when '12612085' then '100' else '101' end ``` - 领料部门(FDeptId)的值则是通过对SKU编码前缀进行条件判断来确定的。例如: ```sql _function case when '{{items.sku_id}}' like 'A%' or '{{items.sku_id}}' like 'M%' or '{{items.sku_id}}' like 'X%' or '{{items.sku_id}}' like 'D%' or '{{items.sku_id}}' like 'T%' then 'LJ006' else 'LJ011' end ``` 2. **基础资料解析** - 多个字段需要将原始值通过基础资料解析器(ConvertObjectParser)转换为金蝶云星空系统可识别的编码。例如: ```json {"name":"ConvertObjectParser","params":"FNumber"} ``` 3. **数组处理** - 明细信息(FEntity)字段是一个数组,需要逐项处理每个子项,包括物料编码、实发数量、发货仓库等。 #### API请求构建 根据元数据配置,我们需要构建一个符合金蝶云星空API要求的请求体。以下是一个示例请求体: ```json { "FormId": "STK_MisDelivery", "IsAutoSubmitAndAudit": true, "IsVerifyBaseDataField": false, "Operation": "Save", "InterationFlags": "STK_InvCheckResult", "Model": { "FBillTypeID": {"FNumber": "QTCKD01_SYS"}, "FBillNo": "{io_id}", "FStockOrgId": {"FNumber": "_function case '{wms_co_id}' ..."}, "FPickOrgId": {"FNumber": "_function case '{wms_co_id}' ..."}, "FStockDirect": "GENERAL", "FDate": "{io_date}", ... "FEntity": [ { "FMaterialId": {"FNumber": "{{items.sku_id}}"}, "FQty": "{{items.qty}}", ... } ] } } ``` #### 实际操作步骤 1. **提取数据** - 从源平台提取原始数据,并根据元数据配置进行初步清洗和格式化。 2. **转换数据** - 根据元数据配置中的规则,将清洗后的数据进行转换,包括条件判断、基础资料解析等。 3. **构建请求体** - 将转换后的数据按照金蝶云星空API要求的格式构建请求体。 4. **发送请求** - 使用HTTP POST方法,将构建好的请求体发送至金蝶云星空API接口。 #### 示例代码 以下是一个简单的Python示例代码,用于展示上述过程: ```python import requests import json # 构建请求体 request_body = { "FormId": "STK_MisDelivery", ... } # 设置请求头 headers = { 'Content-Type': 'application/json' } # 发送POST请求 response = requests.post('https://api.kingdee.com/batchSave', headers=headers, data=json.dumps(request_body)) # 检查响应状态 if response.status_code == 200: print("Data successfully written to Kingdee Cloud.") else: print("Failed to write data:", response.text) ``` 通过以上步骤,我们可以高效地将源平台的数据经过ETL转换后写入到金蝶云星空系统,实现不同系统间的数据无缝对接。 ![如何开发用友BIP接口](https://pic.qeasy.cloud/T25.png~tplv-syqr462i7n-qeasy.image)