ETL转换及数据写入金蝶云星空的最佳实践

  • 轻易云集成顾问-李国敏
### 通过轻易云数据集成:实现易仓到金蝶云星空的高效对接 在实际项目中,我们需要将易仓的数据无缝集成到金蝶云星空,以支持业务中的良品换货其他出库单管理。本文探讨了这一系统对接过程中,一些关键技术点和解决方案,包括如何确保数据不漏单、处理接口分页和限流问题,以及如何应对两者之间的数据格式差异。 #### 确保数据不漏单 首先,为了确保从易仓获取的数据完整性,我们使用了定时可靠地抓取接口数据的策略。采用getDeliveryDetailList API,通过设置合理的时间间隔,定期批量拉取出库单详细信息。这不仅提高了抓取效率,还极大降低因网络波动或系统异常导致的数据丢失风险。同时,实时监控与日志记录功能,使得每一步操作都透明可视,对出现的问题能够及时追溯和修复。 #### 大量数据快速写入 在将获取到的大量出库单数据写入金蝶云星空时,我们利用batchSave API进行批量处理。这种方式不仅能显著提升写入速度,还减少了一次调用所需的资源消耗。在具体实现上,通过分块提交大规模数据,并采取并行执行的方法,进一步优化性能。此外,在面对因为网络或者服务端压力导致的偶发错误时,实现了自动重试机制,从而保证整体任务的稳定性和可靠性。 #### 接口分页与限流问题处理 由于涉及大量订单信息,从易仓拉取数据必然碰到API分页和限流的问题。我们设计了一套智能化分页方案,每次调用getDeliveryDetailList API都会根据返回结果判断是否继续请求下一页,同时严格控制请求频率以避免触发限流限制。当遇到API调用次数受限制时,系统会进入等待状态,并在允许范围内尽快重试,以最大程度保障全程连续、高效运行。 以上是本次案例的一些前置准备工作及关键技术攻关。在后续内容中,我们还将深入探讨如何应对更多细节挑战,如数据映射与格式转换、异常情况处理等,使整个集成过程更加完善稳定。 ![如何开发钉钉API接口](https://pic.qeasy.cloud/D13.png~tplv-syqr462i7n-qeasy.image) ### 调用源系统易仓接口getDeliveryDetailList获取并加工数据 在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将详细探讨如何使用轻易云数据集成平台调用易仓接口`getDeliveryDetailList`,并对获取的数据进行加工处理。 #### 接口调用配置 首先,我们需要配置元数据,以便正确调用`getDeliveryDetailList`接口。以下是该接口的元数据配置: ```json { "api": "getDeliveryDetailList", "effect": "QUERY", "method": "POST", "number": "reference_no", "id": "il_id", "name": "reference_no", "idCheck": true, "request": [ {"field": "dateFor", "label": "统计开始日期", "type": "datetime", "describe": "Y-m-d\n时间格式 2020-01-01 或 2020-01-01 00:00 或 2020-01-01 00:00:00 和结束时间不能超过24H", "value": "{{LAST_SYNC_TIME|datetime}}"}, {"field": "dateTo", "label": "统计截止日期", "type": "datetime", "describe": "Y-m-d\n时间格式 2020-01-01 或 2020-01-01 00:00 或 2020-01-01 00:00:00 和开始时间不能超过24H", "value": "{{CURRENT_TIME|datetime}}"}, {"field": "warehouse_arr", "label": "仓库id数组", "type": "string", "describe": "默认全部"}, {"field": "warehouse_code_arr", "label": "仓库code数组", "type": "string", "describe":"最多1000个"}, {"field":"product_barcode","label":"产品代码","type":"string","describe":"产品代码"}, {"field":"product_barcode_type","label":"是否模糊查询产品代码","type":"int","describe":"1:模糊查询,0:精确查询"}, {"field":"operationUserType","label":"产品负责人","type":"string","describe":"buyer_id:采购负责人,seller_responsible_id:销售负责人,develop_responsible_id:开发负责人"}, {"field":"person","label":"负责人用户id","type":"string","describe":"负责人用户id"}, {"field":"category","label":"产品品类","type":"string","describe":"产品品类"}, {"field":"cu_type","label":"出库类型","type":"string","describe":"0借用,1领用,2不良品,3盘亏,5退货,6良品换货,7次品换良品,8良品转次品,9其他,10线下销售,11组装,12拆分,15按批次拆分,13良品还款,14不良品还款\n可组合查询,若查询多个状态需要用数组形式","value":"6"}, {"field":"page","label":"当前页","type":"string","describe":"当前页"}, {"field":"pageSize","label":"每一页条数","type":"string","describe":"最大1000","value":"200"}, {"field":"supplier_code","label":"供应商Code","type":"string","describe":"供应商Code"}, {"field":"il_id","label":"上一次分页的返回值","type":"string","describe":"利用索引的有序性,提高查询效率"} ], ... } ``` #### 请求参数详解 1. **dateFor**和**dateTo**:用于指定统计的时间范围。通过模板变量`{{LAST_SYNC_TIME|datetime}}`和`{{CURRENT_TIME|datetime}}`动态填充。 2. **warehouse_arr**和**warehouse_code_arr**:分别为仓库ID数组和仓库代码数组,用于指定特定仓库的数据。 3. **product_barcode**和**product_barcode_type**:用于根据产品代码进行查询,可以选择模糊或精确查询。 4. **operationUserType**和**person**:用于指定产品负责人的类型及其用户ID。 5. **category**:用于指定产品的类别。 6. **cu_type**:出库类型,此处固定为“6”,表示良品换货。 7. **page**和**pageSize**:用于分页请求,每页最大条数为1000。 8. **supplier_code**:用于指定供应商代码。 9. **il_id**:上一次分页返回值,用于提高查询效率。 #### 数据请求与清洗 在完成上述参数配置后,通过POST方法向易仓API发送请求。接收到响应数据后,需要对数据进行清洗和预处理,以确保其符合目标系统的要求。 例如,对于日期字段,可以使用以下Python代码进行格式转换: ```python from datetime import datetime def convert_date_format(date_str): try: return datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S').strftime('%Y-%m-%d') except ValueError: return date_str # 示例数据清洗 data = [{"dateFor": '2023-10-05 12:30:45', ...}] for record in data: record['dateFor'] = convert_date_format(record['dateFor']) ``` #### 数据转换与写入 在完成数据清洗后,将其转换为目标系统所需的格式,并写入目标数据库。例如,可以使用SQLAlchemy将数据写入关系型数据库: ```python from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker # 创建数据库连接 engine = create_engine('mysql+pymysql://user:password@host/dbname') Session = sessionmaker(bind=engine) session = Session() # 定义目标表结构(示例) class DeliveryDetail(Base): __tablename__ = 'delivery_detail' id = Column(Integer, primary_key=True) reference_no = Column(String(50)) ... # 插入数据 for record in data: new_record = DeliveryDetail(**record) session.add(new_record) session.commit() ``` 通过上述步骤,我们可以高效地调用易仓接口获取数据,并对其进行清洗、转换和写入,实现不同系统间的数据无缝对接。这不仅提升了业务透明度,还显著提高了工作效率。 ![用友与外部系统接口集成开发](https://pic.qeasy.cloud/S4.png~tplv-syqr462i7n-qeasy.image) ### 使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口 在数据集成的生命周期中,ETL(Extract, Transform, Load)转换是一个关键步骤。本文将详细探讨如何通过轻易云数据集成平台,将已经集成的源平台数据进行ETL转换,转为金蝶云星空API接口所能够接收的格式,并最终写入目标平台。 #### 数据请求与清洗 在进行数据转换之前,我们首先需要确保数据的完整性和准确性。这包括从源系统请求数据并进行必要的清洗操作,以确保数据符合目标系统的要求。此过程通常涉及对原始数据进行去重、格式化和标准化处理。 #### 数据转换与写入 在轻易云数据集成平台中,元数据配置是实现ETL转换的核心。以下是一个详细的元数据配置示例,用于将源平台的数据转换为金蝶云星空API接口所能接收的格式,并最终写入目标平台。 ```json { "api": "batchSave", "method": "POST", "idCheck": true, "operation": { "method": "merge", "field": "reference_no", "bodyName": "details", "bodySum": [], "header": ["reference_no", "cu_type", "warehouse_code", "add_time", "cu_note"], "body": ["product_barcode", "quantity", "warehouse_code"] }, "request": [ {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"{reference_no}"}, {"field":"FBillTypeID","label":"单据类型","type":"string","describe":"单据类型","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"QTCKD08_SYS"}, {"field":"FStockOrgId","label":"库存组织","type":"string","describe":"组织","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"{warehouse_code}","mapping":{"target":"63688a45a23a2e0fa5271b19","direction":"positive"}}, {"field":"FPickOrgId","label":"领用组织","type":"string","describe":"组织","parser":{"name":"ConvertObjectParser","params":"FNumber"}}, {"field":"FStockDirect","label":"库存方向","type":"string","describe":"下拉列表"}, {"field":"FDate","label":"日期","type":"string","describe":"日期","value":"{add_time}"}, {"field":"FCustId","label":"客户","type":"string","describe":"基础资料","parser":{"name":"ConvertObjectParser","params":"FNumber"}}, {"field":"FDeptId","label":"领料部门","type":"string","describe":"基础资料","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":""BM000015""}, {"field":""FPickerId" ","label":"领料人","type":"string","describe":"基础资料","parser":{"name":"ConvertObjectParser","params":"FNumber"}}, {"field":"FBizType","label":"业务类型","type":"string","describe":"下拉列表"}, {"field":"FOwnerTypeIdHead","label":"货主类型","type":"string","describe":"多类别基础资料列表","value"":""BD_OwnerOrg""}, {"field"":""FOwnerIdHead"",""label"":""货主"",""type"":""string"",""describe"":""多类别基础资料"", ""parser"":{ ""name"": ""ConvertObjectParser"", ""params"": ""FNumber"}},"value":"","warehouse_code":"","mapping":{"target":"","63688a45a23a2e0fa5271b19":"","direction":"","positive"}}, {"field":"","FNote":"","label":"","备注":"","type":"","string":"","describe":"","多行文本","","value":"","cu_note"}, { "" field ":" FEntity "," label ":" 明细信息 "," type ":" array "," children ": [ { "" field ":" FMaterialId "," label ":" 物料编码 "," type ":" string "," describe ":" 基础资料 "," parser ": { "" name ":" ConvertObjectParser "," params ":" FNumber "}," value ":" _findCollection find FNumber from 32df639a-9c45-3823-8a92-1e2ceb30649e where FOldNumber = {product_barcode} "," parent ":" FEntity "}, { "" field ":" FCMKBarCode "," label ":" 零售条形码 "," type ":" string "," describe ":" 文本 "," parent ":" FEntity "}, { "" field ":" FQty "," label ":" 实发数量 "," type ":" string "," describe ":数量,值:"{quantity}",父级:"{quantity}" ,父级:"{quantity}" ,父级:"{quantity}" ,父级:"{quantity}" ,父级:"{quantity}" ,父级:"{quantity}" ,父级:"{quantity}" ,父级:"{quantity}" ,父级:"{quantity}" ,父级:"{quantity}" ,父级:"{quantity}" ,父级:"{quantity}" ,父级: { "" field :“FAmount”,“标签”:总成本,“类型”:字符串,“描述”:金额,“家长”: “实体” }, { “字段”:“价格”,“标签”:成本价,“类型”:字符串,“描述”:单位价格,“家长”: “实体” }, { “字段”:“FSrcBillTypeId”,“标签”:源单类型,“类型”:字符串,“描述”:源单类型,“家长”: “实体” }, { “字段”:“FSrcBillNo”,“标签”:源单编号,“类型”:字符串,“描述”:源单编号,“家长”: “实体” }, { “字段”:“FOwnerTypeId”,“标签”:货主类型,“类型”:字符串,“描述”:多类别基础资料列表”,值:“BD_OwnerOrg”,“parent”:“FEntity”}, {“字段”:“FOwnerId”,“标签”:“货主”,“类型”:“字符串”,“描述”:“多类别基础资料”,解析器:{“名称”:“ConvertObjectParser”,“参数”:“数字”},值: "{仓库代码}", 父母: "{仓库代码}", 父母: "{仓库代码}", 父母: "{仓库代码}", 父母: "{仓库代码}", 父母: "{仓库代码}", 父母: "{仓库代码}", { “” 字段 ” : ” FEntryNote ” , ” 标签 ” : ” 备注 ” , ” 类型 ” : ” 字符串 ” , ” 描述 ” : 多行文本 , 父母 : 实体 } ] , 值 : 详情 } ], 其他请求:[ { 字段 : 表单 ID , 标签 : 表单 ID , 类型 : 字符串 , 描述 : 必须填写金蝶表单 ID 如 : PUR_PurchaseOrder , 值 : STK_MisDelivery } , { 字段 : 是否自动提交和审核 , 标签 : 提交和审核 , 类型 : 布尔值 , 值 : true } , { 字段 : 验证基础资料 , 标签 : 验证基础资料 , 类型 : 布尔值 , 描述 : 是否验证所有基础资料有效性,布尔类,默认false(非必录),值:true } , { 字段 : 操作 , 标签 : 执行操作 , 类型 : 字符串 , 值 : 保存 } , { 字段 : 交互标志 , 标签 : 允许负库存 , 类型 : 字符串 , 值 : STK_InvCheckResult } ] } ``` #### 核心技术点解析 1. **API调用配置**: - `api`字段指定了目标API接口为`batchSave`。 - `method`字段指定HTTP请求方法为`POST`。 2. **操作定义**: - `operation`部分定义了如何合并和映射数据字段,包括头部(header)和明细(body)信息。 3. **请求参数映射**: - 每个字段通过`request`数组中的配置项进行映射。例如,`FBillNo`映射到源数据中的`reference_no`。 4. **解析器使用**: - 使用`ConvertObjectParser`解析器将某些字段值转换为金蝶系统所需的格式,例如将物料编码从条形码转换为内部编码。 5. **其他请求参数**: - `otherRequest`部分包含一些额外的配置,如表单ID、是否自动提交审核等。 通过以上配置,我们可以实现从源系统到金蝶云星空API接口的数据无缝对接。在实际应用中,根据具体业务需求,还可以进一步调整和优化这些配置,以确保数据能够正确、高效地传输和处理。 ![打通企业微信数据接口](https://pic.qeasy.cloud/T15.png~tplv-syqr462i7n-qeasy.image)