轻易云ETL技术解析:从数据提取到批量写入的完整案例

  • 轻易云集成顾问-张妍琪
### 物料查询(Kingdee_YF手工增加旧物料编码)的系统对接集成案例分享 在本技术案例中,我们将详细介绍如何实现金蝶云星空的数据与轻易云集成平台的无缝对接,特别是针对`物料查询(Kingdee_YF手工增加旧物料编码)`这一具体业务需求。通过配置API接口、处理分页和限流问题及实现可靠的数据写入,我们成功地确保了数据集成过程的高效性和准确性。 首先,为了保证金蝶云星空数据能够被完整获取并传递到轻易云集成平台,我们采取了定时抓取策略。利用金蝶云星空提供的`executeBillQuery` API接口,可以按设定的时间间隔自动抓取所需数据,有效避免漏单现象。同时,该接口支持分页查询,大大提升了大量数据读取的效率。 其次,在面对两大平台之间可能存在的数据格式差异时,我们通过轻易云集成平台强大的定制化数据映射功能,将获取到的数据进行相应转换,确保各字段与目标数据库完全匹配。在成功转化后,这些处理好的数据会调用轻易云集成平台提供的批量写入API `batchSave` 接口,实现快速且安全地存储。 为应对异常情况及错误重试机制,我们设计了一套完备的监控与日志记录方案。任何执行失败或异常都会实时记录,并触发重试逻辑。这不仅提高了系统稳定性,也确保每一步操作都能被追溯和验证,从而减少潜在风险。 通过本文案例,希望读者能够深入了解复杂业务场景下如何使用现代化工具将不同系统有机联系起来,提高整体运作效率,并更好地满足企业个性化需求。 ![金蝶与CRM系统接口开发配置](https://pic.qeasy.cloud/D39.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在数据集成的生命周期中,第一步是从源系统获取数据,并对其进行初步加工。本文将详细探讨如何通过轻易云数据集成平台调用金蝶云星空接口`executeBillQuery`来实现这一过程。 #### 接口配置与调用 首先,我们需要配置接口调用的元数据。根据提供的元数据配置,接口调用的基本信息如下: - **API**: `executeBillQuery` - **方法**: `POST` - **表单ID**: `BD_MATERIAL` 请求参数包括分页信息、过滤条件和需要查询的字段集合。以下是一个典型的请求配置示例: ```json { "api": "executeBillQuery", "method": "POST", "number": "FNumber", "id": "FMATERIALID", "pagination": { "pageSize": 100 }, "idCheck": true, "request": [ {"field":"FMATERIALID","label":"实体主键","type":"string","value":"FMATERIALID"}, {"field":"FNumber","label":"编码","type":"string","value":"FNumber"}, {"field":"FName","label":"名称","type":"string","value":"FName"}, {"field":"FSpecification","label":"规格型号","type":"string","value":"FSpecification"}, {"field":"FOldNumber","label":"旧物料编码","type":"string","value":"FOldNumber"}, {"field":"FBARCODE","label":"条码","type":"string","value":"FBARCODE"}, {"field":"FDescription","label":"描述","type":"string","value":"FDescription"}, {"field":"FMaterialGroup_FNumber","label":"物料分组","type":"string","value":"FMaterialGroup.FNumber"}, {"field":"FErpClsID","label":"物料属性","type":"string","value":"FErpClsID"}, {"field":"FDocumentStatus","label":"数据状态","type":"string","value":"FDocumentStatus"}, {"field":"FForbidStatus","label":"禁用状态","type":"string","value":"FForbidStatus"}, {"field": "FilterString", "label": "过滤条件", "type": "string", "describe": "示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=", "value": "FApproveDate>='{{LAST_SYNC_TIME|dateTime}}' and FUseOrgId.FNumber='100'"} ], "otherRequest":[ {"field": "Limit", "label": "最大行数", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_PAGE_SIZE}"}, {"field": "StartRow", "label": "开始行索引", "type": "string", "describe": "金蝶的查询分页参数", "value": "{PAGINATION_START_ROW}"}, {"field": "TopRowCount", "label": "返回总行数", "type": int, describe: 金蝶的查询分页参数}, {"field": FieldKeys, label: 需查询的字段key集合, type: array, describe: 金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber, parser:{name:ArrayToString,params:,}} ] } ``` #### 数据清洗与转换 在获取到原始数据后,需要对其进行清洗和转换,以便后续处理和分析。以下是一些常见的数据清洗和转换操作: 1. **字段映射**:将源系统中的字段映射到目标系统中的字段。例如,将`FMATERIALID`映射为`MaterialID`,将`FOldNumber`映射为`OldMaterialCode`等。 2. **数据类型转换**:确保所有字段的数据类型符合目标系统的要求。例如,将字符串类型的日期转换为标准日期格式。 3. **缺失值处理**:对于缺失值,可以选择填充默认值、删除记录或进行其他处理。 以下是一个简单的数据清洗示例: ```python def clean_data(raw_data): cleaned_data = [] for record in raw_data: cleaned_record = { 'MaterialID': record['FMATERIALID'], 'Code': record['FNumber'], 'Name': record['FName'], 'Specification': record['FSpecification'], 'OldMaterialCode': record['FOldNumber'], 'Barcode': record['FBARCODE'], 'Description': record['FDescription'], 'MaterialGroup': record['FMaterialGroup_FNumber'], 'ErpClassID': record['FErpClsID'], 'DocumentStatus': record['FDocumentStatus'], 'ForbidStatus': record['FForbidStatus'] } cleaned_data.append(cleaned_record) return cleaned_data ``` #### 分页处理 由于一次性获取大量数据可能会导致性能问题,因此通常需要进行分页处理。根据元数据配置中的分页参数,我们可以逐页请求数据并进行处理。 以下是一个简单的分页处理示例: ```python def fetch_all_data(api_client): all_data = [] page_size = 100 start_row = 0 while True: response = api_client.executeBillQuery( Limit=page_size, StartRow=start_row, FilterString="...", FieldKeys="..." ) if not response or len(response) == 0: break all_data.extend(response) start_row += page_size return all_data ``` 通过上述步骤,我们可以高效地从金蝶云星空系统中获取并加工所需的数据,为后续的数据集成和分析打下坚实基础。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/S22.png~tplv-syqr462i7n-qeasy.image) ### 轻易云数据集成平台ETL转换与写入技术案例 在数据集成生命周期的第二步,关键任务是将源平台的数据进行ETL(提取、转换、加载)处理,最终写入目标平台。本文将详细探讨如何利用轻易云数据集成平台的API接口,实现物料查询数据的高效转换和写入。 #### 1. 数据请求与清洗 首先,我们需要从源系统(如Kingdee_YF)提取物料信息。这一步主要涉及数据的请求和初步清洗。假设我们已经完成了这一步,并获得了以下样本数据: ```json { "FName": "物料A", "FNumber": "001", "FDescription": "描述A", "FCreateOrgId": "100", "FUseOrgId": "100", "SubHeadEntity": { "FErpClsID": "1", "FBaseUnitId": "Pcs", "FCategoryID": "CHLB01_SYS" }, "SubHeadEntity1": { "FStockId": "WH001" } } ``` #### 2. 数据转换与写入 接下来,我们需要将上述数据转换为轻易云集成平台API接口所能接收的格式,并通过API接口将其写入目标平台。以下是元数据配置的详细解析和应用。 ##### 2.1 元数据配置解析 根据提供的元数据配置,我们需要将源数据映射到目标平台所需的数据结构。以下是元数据配置的关键部分: ```json { "api":"batchSave", "method":"POST", "idCheck":true, ... } ``` - `api`字段指定了要调用的API接口名称。 - `method`字段指定了HTTP请求方法,这里使用POST。 - `idCheck`字段表示是否进行ID校验。 ##### 2.2 请求参数映射 在`request`数组中,定义了各个字段的映射关系和类型。例如: ```json { "field":"FName", "label":"名称", "type":"string" }, { "field":"FNumber", "label":"编码", "type":"string" }, { "field":"SubHeadEntity", ... } ``` 这些配置指明了源数据中的字段如何映射到目标平台的数据结构中。例如,`FName`对应物料名称,类型为字符串。 ##### 2.3 嵌套对象处理 对于嵌套对象,如`SubHeadEntity`和`SubHeadEntity1`,我们需要进一步解析其内部字段: ```json { "field":"SubHeadEntity", ... "children":[ { ... { "field":"FBaseUnitId", ... "parser":{ ... } }, ... } ] } ``` 这里定义了子字段及其解析器(如`ConvertObjectParser`),用于将特定值进行转换。 ##### 2.4 其他请求参数 在`otherRequest`数组中,定义了一些额外的请求参数,例如表单ID、操作类型等: ```json { ... { "field":"FormId", ... "value":"BD_MATERIAL" }, { ... { ... { ... { ... { ... { ... { ... { ... { ... ![如何开发企业微信API接口](https://pic.qeasy.cloud/T29.png~tplv-syqr462i7n-qeasy.image)