轻易云平台实现ETL转换及数据写入旺店通的流程

  • 轻易云集成顾问-吴伟
### 物料同步——只取审核日期1月16号之后:金蝶云星空到旺店通·旗舰奇门数据集成案例 在电商和企业管理中,实时且高效的数据同步至关重要。本案例聚焦于如何通过轻易云数据集成平台,将金蝶云星空中的物料数据无缝接入旺店通·旗舰奇门,以实现自动化、精准的业务流程。我们将探讨API接口调用、安全可靠的数据流处理,以及系统对接过程中的具体技术细节。 首先,我们使用的是金蝶云星空提供的`executeBillQuery` API,该接口允许我们按需定制查询范围。在本方案中,我们仅获取审核日期在1月16号之后的物料数据,这是一次限量但关键的信息提取,确保了信息的新鲜度和相关性。 为了保证大量的数据能够快速写入到旺店通·旗舰奇门,采用了批量写入策略,通过调用其API `wdt.goods.goods.push` 实现。这不仅大幅提升了传输效率,还有效避免单条请求可能带来的性能瓶颈。同时,为应对分页与限流问题,在金蝶云星空端采取分页抓取,每次读取固定数量的数据页,并配以适当的重试机制,以覆盖网络或服务暂时不可用等异常情况。 另外,由于两个系统间存在数据格式差异,需要进行格式转换与映射。在这方面,通过自定义脚本实现字段映射,使得从源头导出的原始字段能准确对应目标系统所需格式。例如,从金蝶云星空的"MaterialID"对应到旺店通·旗舰奇门的"item_code",同时注意处理必要的数据类型转换及校验规则。 最后,监控与日志记录是整个流程的重要组成部分。借助轻易云平台内置功能,可以实现对每一步操作状态进行实时监控,并记录详细日志。一旦出现错误,不仅第一时间发出警报,还能通过历史日志快速定位并修复问题,使得整个集成过程更加透明和安全。 后续文章将继续深入分享这些技术细节,包括完整代码示例及实际配置步骤,为读者提供一个可操作、可参考的最佳实践方案。 ![金蝶云星空API接口配置](https://pic.qeasy.cloud/D16.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统金蝶云星空接口executeBillQuery获取并加工数据 在轻易云数据集成平台的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细介绍如何通过金蝶云星空接口`executeBillQuery`获取物料数据,并进行初步加工。 #### 接口调用配置 首先,我们需要配置调用金蝶云星空接口的元数据。以下是我们使用的元数据配置: ```json { "api": "executeBillQuery", "method": "POST", "number": "FNumber", "id": "FMATERIALID", "pagination": { "pageSize": 100 }, "request": [ {"field":"FMATERIALID","label":"实体主键","type":"string","value":"FMATERIALID"}, {"field":"FNumber","label":"编码","type":"string","value":"FNumber"}, {"field":"FName","label":"名称","type":"string","value":"FName"}, {"field":"FSpecification","label":"规格型号","type":"string","value":"FSpecification"}, {"field":"FOldNumber","label":"旧物料编码","type":"string","value":"FOldNumber"}, {"field":"FBARCODE","label":"条码","type":"string","value":"FBARCODE"}, {"field":"FDescription","label":"描述","type":"string","value":"FDescription"}, {"field":"FMaterialGroup_FNumber","label":"物料分组","type":"string","value":"FMaterialGroup.FNumber"}, {"field":"FErpClsID","label":"物料属性","type":"string","value":"FErpClsID"}, {"field":"FDocumentStatus","label":"数据状态","type":"string","value":"FDocumentStatus"}, {"field":"FForbidStatus","label":"禁用状态","type":"string","value":"FForbidStatus"}, {"field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FApproveDate>='2023-01-16' and (FCategoryID.fnumber = 'CHLB02_SYS' or FCategoryID.fnumber = 'CHLB04_SYS' or FCategoryID.fnumber = 'CHLB05_SYS'or FCategoryID.fnumber = 'CHLB08_SYS') and FUseOrgId.FNumber = '01' and FForbidStatus = 'A'"} ], "otherRequest":[ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field": "TopRowCount", "label": "返回总行数", "type": int, describe: 金蝶的查询分页参数}, {"field": FieldKeys, label: 需查询的字段key集合, type: array, describe: 金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber, parser:{name:ArrayToString,params:,}}, { field: FormId, label: 业务对象表单Id, type: string, describe: 必须填写金蝶的表单ID如:PUR_PurchaseOrder,value:BD_MATERIAL} ] } ``` #### 数据请求与清洗 在配置好元数据后,我们可以通过轻易云平台发起HTTP POST请求,调用`executeBillQuery`接口。以下是请求体示例: ```json { "_api_": "/k3cloud/Kingdee.BOS.WebApi.ServicesStub.DynamicFormService.ExecuteBillQuery.common.kdsvc", "_method_": "_POST_", "_args_":{ "_FormId_":"", "_FieldKeys_":"", "_FilterString_":"", "_OrderString_":"", "_TopRowCount_":"", "_StartRow_":"", "_Limit_":"", "_SubSystemId_":"", "_DCTimeStamp_":"", "_LocaleName_":"", "_UserToken_":"", "_NetworkCtrlToken_":"", "_AppKey_":"", "_AppSecretKey_":"", "_ClientType_":"", "_ClientVersion_":"", "_ClientIPAddr_":"", "__SessionId__" : "", "__Locale__" : "", "__UserToken__" : "" } } ``` 在请求体中,我们需要根据实际情况填充相应字段,例如`FormId`、`FieldKeys`、`FilterString`等。特别地,过滤条件(FilterString)用于筛选出审核日期在1月16号之后的数据。 #### 数据转换与写入 获取到原始数据后,需要对其进行清洗和转换,以便后续处理和存储。例如,可以对字段进行重命名、类型转换等操作。以下是一个简单的数据转换示例: ```python def transform_data(raw_data): transformed_data = [] for item in raw_data: transformed_item = { 'material_id': item['FMATERIALID'], 'code': item['FNumber'], 'name': item['FName'], 'specification': item['FSpecification'], 'barcode': item['FBARCODE'], 'description': item['FDescription'], # 添加更多字段转换... } transformed_data.append(transformed_item) return transformed_data ``` 通过上述步骤,我们可以将从金蝶云星空获取的数据进行初步加工,为后续的数据处理和分析打下基础。 #### 实时监控与优化 在整个过程中,实时监控数据流动和处理状态至关重要。轻易云平台提供了全透明可视化的操作界面,使得每个环节都清晰易懂,并能及时发现和解决问题,从而提升业务透明度和效率。 通过以上步骤,我们成功实现了从金蝶云星空接口获取并加工物料数据,为后续的数据集成奠定了坚实基础。在实际应用中,可以根据具体需求进一步优化和扩展这些操作。 ![金蝶与CRM系统接口开发配置](https://pic.qeasy.cloud/S30.png~tplv-syqr462i7n-qeasy.image) ### 轻易云数据集成平台:ETL转换及写入旺店通·旗舰奇门API接口 在数据集成生命周期的第二阶段,重点在于将已经从源平台获取并清洗过的数据进行ETL(Extract, Transform, Load)转换,并最终写入目标平台。在本案例中,我们将探讨如何将源平台的数据转换为旺店通·旗舰奇门API接口所能接收的格式,并进行写入操作。 #### 元数据配置解析 元数据配置是实现数据转换和写入的关键。以下是对元数据配置的详细解析: ```json { "api": "wdt.goods.goods.push", "method": "POST", "idCheck": true, "request": [ { "field": "goodsInfo", "label": "货品信息", "type": "object", "describe": "货品信息", "children": [ {"field": "goods_no", "label": "货品编号", "type": "string", "describe": "货品编号", "value":"{FNumber}", "parent":"goodsInfo"}, {"field": "goods_name", "label": "货品名称", "type": "string", "describe":"货品名称’", "value":"{FName}", "parent":"goodsInfo"}, {"field": "class_name", "label":"分类名称", "type":"string", "describe":"分类名称,不传或为空则默认为’无’","value":"{F_ora_Assistant1}", "parent":"goodsInfo"}, {"field": "brand_name", "label":"品牌名称", "type":"string", "describe":"品牌名称, 不传或为空则默认为’无’","value":"{F_ora_Assistant}", "parent":"goodsInfo"}, {"field": "unit_name", "label":"基本单位名称","type":"string","describe":"基本单位名称, 不传或为空则默认为’无’","value":"{FBaseUnitId_FNumber}","parent":"goodsInfo","mappingDirection":"positive"}, {"field": "aux_unit_name", "label":"辅助单位名称","type":"string","describe":"辅助单位名称, 不传或为空则默认为’无’","parent":"goodsInfo"}, {"field": "flag_name", "label":"货品标记名称","type":"string","describe":"货品标记名称, 不传或为空则默认为’无’","parent":"goodsInfo"}, {"field": "goods_type", "label":"货品类型","type":"string","describe":"默认0, 0:其它1:销售货品2:原材料3:包装物4:周转材料5:虚拟商品6:固定资产8:分装箱","value":"{FCategoryID}","parent": ... ``` #### 数据转换与映射 为了确保数据能够正确地从源平台转换并写入到旺店通·旗舰奇门API接口,我们需要进行以下几步操作: 1. **字段映射**: - `goods_no` 映射到 `{FNumber}` - `goods_name` 映射到 `{FName}` - `class_name` 映射到 `{F_ora_Assistant1}` - `brand_name` 映射到 `{F_ora_Assistant}` - `unit_name` 映射到 `{FBaseUnitId_FNumber}` 2. **默认值处理**: - 对于一些可选字段,如 `class_name` 和 `brand_name`,如果源数据中没有提供,则使用默认值“无”。 3. **复杂字段处理**: - 对于复杂对象,如 `specInfoList`,需要逐一映射其子字段。例如,`wholesale_price` 映射到 `{F_ora_Decimal2}`,`weight` 映射到 `{FNETWEIGHT}`。 4. **数据类型转换**: - 确保所有字段的数据类型与目标平台要求的一致。例如,将数值类型的字段如 `weight`, `height`, `length`, `width` 转换为字符串类型。 #### 实际操作步骤 1. **提取数据**: 从源系统提取符合条件的数据,这里假设已经完成了第一步的数据请求与清洗。 2. **转换数据**: 使用元数据配置中的映射规则,将源数据字段逐一转换为目标平台所需格式。 3. **组装请求**: 根据元数据配置中的结构,将转换后的数据组装成符合旺店通·旗舰奇门API接口要求的JSON对象。 4. **发送请求**: 使用HTTP POST方法,将组装好的JSON对象发送至目标API接口。确保请求头包含必要的认证信息和Content-Type设置为application/json。 ```python import requests import json url = 'https://api.wangdian.cn/openapi2/goods_push.php' headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer your_token_here' } payload = { 'goodsInfo': { 'goods_no': source_data['FNumber'], 'goods_name': source_data['FName'], 'class_name': source_data.get('F_ora_Assistant1', '无'), 'brand_name': source_data.get('F_ora_Assistant', '无'), ... }, 'specInfoList': [ { 'wholesale_price': source_data['F_ora_Decimal2'], 'weight': str(source_data['FNETWEIGHT']), ... } ] } response = requests.post(url, headers=headers, data=json.dumps(payload)) if response.status_code == 200: print("Data successfully pushed to WangDianTong API") else: print(f"Failed to push data: {response.text}") ``` 通过上述步骤,我们能够高效地将源平台的数据经过ETL转换后,成功推送至旺店通·旗舰奇门API接口,实现不同系统间的数据无缝对接。这不仅提升了业务透明度和效率,也确保了数据的一致性和准确性。 ![用友与SCM系统接口开发配置](https://pic.qeasy.cloud/T8.png~tplv-syqr462i7n-qeasy.image)