ETL转换MySQL数据集成:从金蝶云星空到目标数据库

  • 轻易云集成顾问-蔡威
### 金蝶云星空到MySQL的数据集成:组织间结算价目表V2-mom案例分享 在本次技术案例中,我们专注于将金蝶云星空系统中的数据无缝集成到MySQL数据库。具体而言,我们配置并实施了“组织间结算价目表V2-mom”方案,旨在确保数据的高效传输和可靠处理。 首先介绍下我们所使用的两个关键API接口:从金蝶云星空获取数据时调用的`executeBillQuery`,以及将数据写入MySQL时则调用`execute`。这两个API分别负责截取源系统中的实时业务数据,并确保这些数据能够被精准地导入目标数据库。 为了应对大规模的数据流动,我们特别利用了以下几个平台特性: 1. **高吞吐量的数据写入能力**:通过优化批量处理机制,有效提高了大量业务数据快速写入MySQL的效率。 2. **实时监控与告警系统**:整个集成过程由集中监控和告警系统全程护航,确保每一步都能透明可视,同时提供及时的问题反馈和解决途径。 3. **自定义数据转换逻辑**:针对不同业务需求和多样化的数据结构,我们设计并应用了一系列自定义转换规则,以便更好地适应企业现有的信息架构。 此外,为了解决常见的数据质量问题,还特别引入了异常检测及错误重试机制。这不仅帮助发现潜在的数据问题,也提升了整体任务执行的可靠性。在实际操作过程中,各种分页与限流策略也得到了有效应用,从而保证不会因单次请求过多而导致资源超载或失败。 此后的内容将进一步展示如何详尽实现以上各项功能模块,以及详细论述其中涉及的重要技术细节,包括但不限于批次调度、接口调用、数据信息管理等方面。 ![系统集成平台API接口配置](https://pic.qeasy.cloud/D3.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在轻易云数据集成平台中,调用源系统接口是数据集成生命周期的第一步。本文将详细探讨如何通过调用金蝶云星空的`executeBillQuery`接口来获取并加工数据。 #### 接口配置与请求参数 根据元数据配置,我们需要通过POST方法调用`executeBillQuery`接口。以下是请求参数的详细配置: - **api**: `executeBillQuery` - **method**: `POST` - **FormId**: `IOS_PriceList` - **FieldKeys**: 需要查询的字段集合,使用逗号分隔 - **FilterString**: 过滤条件,例如:`FUseOrgId.FNumber='T02' and FDocumentStatus='C' and FApproveDate>='{{LAST_SYNC_TIME|dateTime}}'` - **Limit**: 最大行数,默认值为2000 - **StartRow**: 开始行索引,用于分页 - **TopRowCount**: 返回总行数 #### 请求字段说明 请求字段包括实体主键、状态、名称、编码等多个字段。以下是部分关键字段及其含义: - `FEntryID`: 实体行主键 - `FID`: 实体主键 - `FDOC_STATUS`: 状态 - `FFORBID_STATUS`: 失效状态 - `FNAME`: 名称 - `FNUMBER`: 编码 - `FCREATE_ORG_ID`: 核算组织 - `FUSE_ORG_ID`: 使用组织 这些字段将用于构建请求体,以便从金蝶云星空获取所需的数据。 #### 构建请求体 根据元数据配置,我们可以构建如下请求体: ```json { "FormId": "IOS_PriceList", "FieldKeys": "FEntryID,FID,FDOC_STATUS,FFORBID_STATUS,FNAME,FNUMBER,FCREATE_ORG_ID,FUSE_ORG_ID", "FilterString": "FUseOrgId.FNumber='T02' and FDocumentStatus='C' and FApproveDate>='{{LAST_SYNC_TIME|dateTime}}'", "Limit": 2000, "StartRow": "{PAGINATION_START_ROW}", "TopRowCount": true } ``` #### 调用接口与处理响应 在轻易云数据集成平台中,通过HTTP POST方法调用上述接口,并处理返回的JSON响应。以下是示例代码: ```python import requests url = "https://api.kingdee.com/executeBillQuery" headers = { 'Content-Type': 'application/json' } payload = { "FormId": "IOS_PriceList", "FieldKeys": "FEntryID,FID,FDOC_STATUS,FFORBID_STATUS,FNAME,FNUMBER,FCREATE_ORG_ID,FUSE_ORG_ID", "FilterString": "FUseOrgId.FNumber='T02' and FDocumentStatus='C' and FApproveDate>='2023-01-01'", "Limit": 2000, "StartRow": 0, "TopRowCount": True } response = requests.post(url, headers=headers, json=payload) data = response.json() # 数据处理逻辑,例如清洗和转换 processed_data = [] for entry in data['Result']: processed_entry = { 'EntryID': entry['FEntryID'], 'Name': entry['FNAME'], 'Number': entry['FNUMBER'], # 添加更多字段处理逻辑... } processed_data.append(processed_entry) # 输出或存储处理后的数据 print(processed_data) ``` #### 数据清洗与转换 在获取到原始数据后,需要进行清洗和转换,以满足业务需求。例如,可以对日期格式进行标准化,对编码进行校验等。 ```python from datetime import datetime def clean_and_transform(data): for entry in data: # 日期格式标准化 if 'CreateDate' in entry: entry['CreateDate'] = datetime.strptime(entry['CreateDate'], '%Y-%m-%d').strftime('%Y%m%d') # 编码校验和转换 if 'Number' in entry: entry['Number'] = entry['Number'].strip().upper() # 更多清洗和转换逻辑... clean_and_transform(processed_data) ``` 通过上述步骤,我们可以高效地从金蝶云星空获取并加工所需的数据,为后续的数据写入和业务分析奠定基础。 ![打通金蝶云星空数据接口](https://pic.qeasy.cloud/S22.png~tplv-syqr462i7n-qeasy.image) ### 数据集成生命周期中的ETL转换与写入MySQLAPI接口 在数据集成生命周期的第二阶段,我们需要将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,最终转为目标平台 MySQL API 接口所能够接收的格式,并写入目标平台。本文将详细探讨如何通过轻易云数据集成平台实现这一过程。 #### 元数据配置解析 我们使用以下元数据配置来指导整个ETL过程: ```json { "api": "execute", "effect": "EXECUTE", "method": "POST", "idCheck": true, "request": [ { "field": "main_params", "label": "main_params", "type": "object", "describe": "111", "children": [ {"field":"FEntryID","label":"实体行主键","type":"string","value":"{FEntryID}"}, {"field":"FID","label":"实体主键","type":"string","value":"{FID}"}, {"field":"FDOC_STATUS","label":"状态","type":"string","value":"{FDOC_STATUS}"}, {"field":"FFORBID_STATUS","label":"失效状态","type":"string","value":"{FFORBID_STATUS}"}, {"field":"FNAME","label":"名称","type":"string","value":"{FNAME}"}, {"field":"FNUMBER","label":"编码","type":"string","value":"{FNUMBER}"}, {"field":"FDESCRIPTION","label":"备注","type":"string","value":"{FDESCRIPTION}"}, {"field":"FCREATE_ORG_ID","label":"核算组织","type":"string","value":"{FCREATE_ORG_ID}"}, {"field":"FUSE_ORG_ID","label":"使用组织","type":"string","value":"{FUSE_ORG_ID}"}, {"field":"FCREATOR_ID","label":"创建人","type":"string","value":"{FCREATOR_ID}"}, {"field":"FMODIFIER_ID","label":"最后修改人","type":"string","value":"{FMODIFIER_ID}"}, {"field":"FCREATE_DATE","label":"创建日期","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["createDate"]}}}, {"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["modifyDate"]}}}, {"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["approveDate"]}}}, {"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["forbidDate"]}}}, {"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["entryEffectiveDate"]}}}, {"field":"","label":"","type":"","value":{"$dateFormat":{"formatString":"'yyyy-MM-dd HH:mm:ss'"},"sourceFieldName":{"$sourceFieldName":["entryExpiryDate"]}}} ] } ], "otherRequest": [ { "field": "main_sql", "label": "", "type": "", "describe":"", "value": "" } ] } ``` #### 数据请求与清洗 在数据请求阶段,我们从源系统中提取原始数据。通过配置 `request` 字段中的子字段,我们定义了需要提取的数据字段及其类型和描述。这些字段包括 `FEntryID`, `FID`, `FDOC_STATUS`, `FFORBID_STATUS` 等,确保我们获取到所有必要的信息。 ```json { "request":[ { "field": "main_params", ... } ] } ``` #### 数据转换 在数据转换阶段,我们需要将提取的数据转换为目标平台 MySQL API 所能接收的格式。这里,我们利用元数据配置中的 `children` 字段来映射每个字段的值。例如: ```json {"field": "FEntryID", "label": "", type: "", value: "{FEntryID}"} ``` 其中,`"{FEntryID}"` 表示从源数据中提取的 `FEntryID` 字段值。 此外,我们还需要对日期字段进行格式化处理,以确保符合目标平台要求。例如: ```json { "$dateFormat":{ formatString: "'yyyy-MM-dd HH:mm:ss'", sourceFieldName: ["createDate"] } } ``` 这种配置确保了日期字段被正确地转换为目标平台所需的格式。 #### 数据写入 最后,在数据写入阶段,我们使用 SQL 插入语句将转换后的数据写入 MySQL 数据库。元数据配置中的 `otherRequest` 字段定义了插入语句: ```sql INSERT INTO ty_aps.wms_interorg_settle_price (FEntryID,FID,FDOC_STATUS,FFORBID_STATUS,FNAME,FNUMBER,FDESCRIPTION,FCREATE_ORG_ID,FUSE_ORG_ID,FCREATOR_ID,FMODIFIER_ID,FCREATE_DATE,FMODIFIER_DATE,FPRICETYPE,FCURRENCYID,FEXPIRYDATE, FAPPROVE_DATE,FFORBID_DATE,FFORBIDDER_ID,FAPPROVER_ID,FIS_INCLUDED_TAX,FMATERIALID,FMATERIAL_NAME,FMATERIAL_MODEL,FAUXPROP_ID,FTRADETYPE,FBUSINESSTYPE,FPRICEUNITID,FPRICEBASE, FPRICE,FENTRY_EFFECTIVE_DATE,FENTRY_EXPRIY_DATE,FFORBID_STATUS_EN,FFORBIDDER_ID_EN,FFORBID_DATE_EN,FMATERIAL_TYPE_ID,FROW_AUDIT_STATUS,FTAX_PRICE,FTAX_RATE,FIS_FREE_GIFT,FBOM_ID, FLOT,FBASE_UNIT_ID,FBASE_UNIT_PRICE,FMTONO,FMATERIAL_GROUP_ID,FMATERIAL_GROUP_NAME,FENTITY_ACC_ID,FIS_ENABLE,FACCT_SYS_ID,FACCT_SYS_NAME) VALUES (:FEntryID,:FID,:FDOC_STATUS,:FFORBID_STATUS,:FNAME,:FNUMBER,:FDESCRIPTION,:FCREATE_ORG_ID,:FUSE_ORG_ID,:FCREATOR_ID,:FMODIFIER_ID,:FCREATE_DATE,:FMODIFIER_DATE,:FPRICETYPE,:FCURRENCYID,:FEXPIRYDATE, :FAPPROVE_DATE,:FFORBID_DATE,:FFORBIDDER_ID,:FAPPROVER_ID,:FIS_INCLUDED_TAX,:FMATERIALID,:FMATERIAL_NAME,:FMATERIAL_MODEL,:FAUXPROP_ID,:FTRADETYPE,:FBUSINESSTYPE,:FPRICEUNITID,:FPRICEBASE, :FPRICE,:FENTRY_EFFECTIVE_DATE,:FENTRY_EXPRIY_DATE,:FFORBID_STATUS_EN,:FFORBIDER_EN) ``` 通过这种方式,我们可以确保所有必要的数据字段都被正确地插入到 MySQL 数据库中。 #### 实践案例 假设我们从源系统中提取到如下数据: ```json { FEntryID: '12345', FID: '67890', FDOC_STATUS: 'Active', FFORBID_STATUS: 'Inactive', FNAME: 'Sample Name', FNUMBER: '001', FDESCRIPTION: 'Sample Description', ... } ``` 经过 ETL 转换后,生成的 SQL 插入语句如下: ```sql INSERT INTO ty_aps.wms_interorg_settle_price (FEntryID,FID,...) VALUES ('12345','67890',...) ``` 这样,通过轻易云数据集成平台的全异步、支持多种异构系统集成能力,实现了不同系统间的数据无缝对接,并最终将转换后的数据成功写入到目标 MySQL 平台中。 ![泛微OA与ERP系统接口开发配置](https://pic.qeasy.cloud/T25.png~tplv-syqr462i7n-qeasy.image)