ETL转换在数据集成中的应用:供应商档案同步案例解析

  • 轻易云集成顾问-潘裕
### 案例分享:金蝶云星空与旺店通·企业奇门供应商档案同步 在本篇技术案例中,我们将详细探讨如何通过轻易云数据集成平台,实现金蝶云星空系统中的供应商档案信息高效、顺畅地同步到旺店通·企业奇门系统。这一任务的执行不仅需要处理接口的数据交互,还需要确保数据格式的一致性和流转过程中的可靠性。 首先,需要调用金蝶云星空提供的`executeBillQuery` API接口,批量提取供应商档案数据。该接口支持定时任务调度,可按照业务需求设置抓取频率,以确保数据信息的及时更新。为应对大数据量带来的挑战,利用平台强大的高吞吐量能力,可以快速获取并处理这些数据。 然而,从金蝶云星空获得的数据通常伴随着分页和限流问题。因此,在实际操作中,需要实现对API返回结果进行逐页抓取,并利用异常重试机制来解决由于限流导致的数据丢失或未成功的问题。同时,还必须进行数据质量监控,实时检测异常并记录日志,这样可以有效追溯和纠正任何潜在错误。 接下来,将获得的供应商档案数据经过必要的转换逻辑处理,使之适配于旺店通·企业奇门所接受的数据结构。在此过程中,自定义转换逻辑尤为重要,因为这能帮助我们应对各种复杂、多变的业务需求。 最终,通过调用旺店通·企业奇门提供的`wdt.purchase.provider.create`写入接口, 将整理后的供应商档案信息批量导入系统。为了保证操作能够无缝完成,中间每一个步骤都需严格把控,对错漏单元实时告警及补救措施至关重要。此外,对于出现接口响应延迟等情况,也需配置相应超时重传机制,以提升整体流程稳定性和效率。 通过这样一种严谨且全面的方法论,我们能够在实现系统之间无缝集成同时,有效提升了数据信息管理水平,为后续进一步优化运营奠定坚实基础。 ![用友BIP接口开发配置](https://pic.qeasy.cloud/D3.png~tplv-syqr462i7n-qeasy.image) ### 供应商档案同步:调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空的`executeBillQuery`接口来获取供应商档案数据,并对其进行初步加工。 #### 接口配置与请求参数 首先,我们需要配置元数据,以便正确调用金蝶云星空的API接口。以下是具体的元数据配置: ```json { "api": "executeBillQuery", "method": "POST", "number": "FNumber", "id": "FSupplierId", "pagination": { "pageSize": 100 }, "idCheck": true, "request": [ {"field":"FSupplierId","label":"FSupplierId","type":"string","value":"FSupplierId"}, {"field":"FNumber","label":"编码","type":"string","value":"FNumber"}, {"field":"FName","label":"名称","type":"string","value":"FName"}, {"field":"FCreateOrgId_FNumber","label":"创建组织","type":"string","value":"FCreateOrgId.FNumber"}, {"field":"FUseOrgId_FNumber","label":"使用组织","type":"string","value":"FUseOrgId.FNumber"}, {"field":"FDescription","label":"描述","type":"string","value":"FDescription"}, {"field":"FShortName","label":"简称","type":"string","value":"FShortName"}, {"field":"FMinPOValue","label":"最小订单量","type":"string","describe":"小数","value":"FMinPOValue"}, {"field":"FCountry_FNumber","label":"国家","type":"string","describe":"单选辅助资料列表","value":"FCountry.FNumber"}, {"field":"FBusinessStatus","label":"业务状态","type":"string","describe":"单据状态","value":"FBusinessStatus"}, {"field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FAuditDate>='{{LAST_SYNC_TIME|dateTime}}' and FCreateOrgId.FNumber = '100' and FDOCUMENTSTATUS = 'C'"}, {"field": "FieldKeys", "label": "需查询的字段key集合", "type": "array", "describe": "金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber", "parser":{"name": "ArrayToString", "params": ","}}, {"field": "FormId", "label": "业务对象表单Id", "type": "string", "describe": "必须填写金蝶的表单ID如:PUR_PurchaseOrder", "value": "BD_Supplier"} ], ... } ``` #### 请求示例 在配置好元数据后,我们可以构建请求体并发送请求。以下是一个示例请求体: ```json { "_api_": "/k3cloud/Kingdee.BOS.WebApi.ServicesStub.DynamicFormService.ExecuteBillQuery.common.kdsvc", "_method_": "_POST_", "_args_":{ "_FormId_ ": “BD_Supplier”, "_FieldKeys_ ": “FSupplierId,FNumber,FName,FCreateOrgId.FNumber,FUseOrgId.FNumber”, "_FilterString_ ": “FAuditDate>='2023-01-01' and FCreateOrgId.FNumber = '100' and FDOCUMENTSTATUS = 'C'”, "_Limit_ ": “100” } } ``` #### 数据清洗与加工 在获取到原始数据后,下一步是对数据进行清洗和初步加工。这一步骤非常关键,因为它确保了后续的数据转换和写入能够顺利进行。 1. **字段映射**:根据元数据中的`FieldKeys`,将返回的数据字段映射到目标系统所需的字段。 2. **数据类型转换**:确保每个字段的数据类型符合目标系统的要求。例如,将字符串类型的日期转换为标准日期格式。 3. **过滤无效数据**:移除不符合业务规则或无效的数据记录。 以下是一个简单的数据清洗示例: ```python def clean_data(raw_data): cleaned_data = [] for record in raw_data: cleaned_record = { '供应商ID': record['FSupplierId'], '编码': record['FNumber'], '名称': record['FName'], '创建组织': record['FCreateOrgId.FNumber'], '使用组织': record['FUseOrgId.FNumber'] } # 数据类型转换 cleaned_record['最小订单量'] = float(record.get('FMinPOValue', 0)) # 添加更多清洗逻辑... cleaned_data.append(cleaned_record) return cleaned_data ``` #### 实时监控与日志记录 在整个过程中,实时监控和日志记录也是不可或缺的一部分。通过轻易云平台提供的可视化界面,可以实时监控每个步骤的数据流动和处理状态,确保任何异常情况都能及时发现和处理。 ```json { "_monitoring_":{ "_status_ ": “running”, "_progress_ ": “50%”, "_logs_ ": [ { "_timestamp_ ": “2023-10-01T12:00:00Z”, "_message_ ": “开始调用executeBillQuery接口” }, { "_timestamp_ ": “2023-10-01T12:00:05Z”, "_message_ ": “成功获取供应商档案数据” } ] } } ``` 通过以上步骤,我们可以高效地从金蝶云星空获取供应商档案数据,并对其进行初步加工,为后续的数据转换和写入打下坚实基础。这一过程不仅提升了业务透明度,还极大提高了整体效率。 ![用友与MES系统接口开发配置](https://pic.qeasy.cloud/S28.png~tplv-syqr462i7n-qeasy.image) ### 数据集成生命周期中的ETL转换:供应商档案同步到旺店通·企业奇门API接口 在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台旺店通·企业奇门API接口所能够接收的格式,并最终写入目标平台。本文将深入探讨这一过程,特别是如何利用元数据配置实现这一目标。 #### 1. API接口及请求方法 在本次案例中,我们使用的API接口为`wdt.purchase.provider.create`,请求方法为`POST`。这个接口用于创建供应商数据信息,确保供应商档案能够在旺店通系统中被正确识别和管理。 #### 2. 元数据配置解析 元数据配置是实现ETL转换的关键。以下是我们需要映射和转换的字段: - **provider_no(供应商编号)**: 代表供应商所有属性的唯一编码,用于供应商区分。在ERP系统中,这个字段通常对应于`FNumber`。 - **provider_name(供应商名称)**: 对应于ERP系统中的`FName`。 - **min_purchase_num(最小采购量)**: 默认值为1。 - **purchase_cycle_days(采购周期)**: 默认值为1。 - **arrive_cycle_days(到货周期)**: 默认值为1。 - **contact(联系人)**: 需从源数据中提取。 - **telno(座机)**: 需从源数据中提取。 - **mobile(移动电话)**: 需从源数据中提取。 - **fax(传真)**: 需从源数据中提取。 - **zip(邮编)**: 需从源数据中提取。 - **email(邮箱)**: 需从源数据中提取。 - **qq(qq)**: 需从源数据中提取。 - **wangwang(旺旺)**: 需从源数据中提取。 - **address(地址)**: 包含省、市、区(县)、地址详情,需要进行拼接或格式化处理。 - **website(网址)**: 需从源数据中提取。 - **last_purchase_time(最后采购日期)**: 格式要求为yyyy-MM-dd HH:mm:ss,如果不传则默认创建时间。 - **is_disabled(停用)**: 表示是否停用该供应商,需要根据业务逻辑设置。 - **charge_cycle_days(结算周期)**: 单位为天,不传默认0。 #### 3. 数据清洗与转换 在进行ETL转换时,我们需要对上述字段进行清洗和转换,以确保符合目标平台的要求。例如: ```json { "provider_no": "{FNumber}", "provider_name": "{FName}", "min_purchase_num": "1", "purchase_cycle_days": "1", "arrive_cycle_days": "1", "contact": "{Contact}", "telno": "{TelNo}", "mobile": "{Mobile}", "fax": "{Fax}", "zip": "{Zip}", "email": "{Email}", "qq": "{QQ}", "wangwang": "{WangWang}", "address": "{Province}{City}{District}{DetailAddress}", "website": "{Website}", "last_purchase_time": "{LastPurchaseTime}", "is_disabled": "{IsDisabled}", "charge_cycle_days": "{ChargeCycleDays}" } ``` 其中,`{FNumber}`、`{FName}`等占位符表示需要从源平台的数据字段中获取相应的信息。 #### 4. 数据写入 完成清洗和转换后,我们使用POST方法将处理后的JSON对象发送到目标平台的API接口: ```python import requests url = 'https://api.wangdian.cn/openapi2/wdt.purchase.provider.create' headers = {'Content-Type': 'application/json'} data = { # 填充上面处理后的JSON对象 } response = requests.post(url, headers=headers, json=data) if response.status_code == 200: print("供应商档案同步成功") else: print("同步失败:", response.text) ``` 通过上述步骤,我们实现了将源平台的数据经过ETL转换后,成功写入到旺店通·企业奇门API接口。这一过程不仅确保了数据的一致性和准确性,还提升了业务流程的自动化程度。 ![如何对接钉钉API接口](https://pic.qeasy.cloud/T21.png~tplv-syqr462i7n-qeasy.image)