轻易云平台上的ETL转换与数据集成实践

  • 轻易云集成顾问-姚缘
### 案例分享:管易云·奇门数据集成到轻易云集成平台——查询管易退货单 在本篇技术案例中,我们将深入探讨如何通过轻易云数据集成平台高效地进行系统对接,从而实现从管易云·奇门系统中查询和处理退货单数据。具体来说,本次方案聚焦于gy.erp.trade.return.get接口的调用,将其返回的数据快速、高效地写入到轻易云集成平台,以确保业务流程的顺畅运行。 首先,考虑到需要处理大量的退货单记录,选择轻易云高吞吐量的数据写入能力至关重要。这一特性使得我们能够迅速且可靠地将大批量数据导入至目标系统,不仅提高了时效性,还保证了每条记录准确无误。 为了更直观和可控地管理这一过程,我们利用了轻易云提供的可视化数据流设计工具,通过拖拽式操作搭建整个数据流,让复杂的逻辑更加清晰明了。此外,在实际实施过程中,为了解决API调用中的分页与限流问题,我们设计了一套定时抓取机制,通过多线程并发请求有效应对这些限制,并采用错误重试策略来提升整体任务执行鲁棒性。 其次,对于API资产管理功能的使用,同样是本次项目的重要组成部分。通过统一视图全面掌握API资产状况,有助于及时发现并优化配置资源,从而进一步增强整个集成方案的效率。 总之,本次案例中的各个环节都体现出了精细化的数据质量监控及异常检测措施,有利于尽早发现潜在的问题并及时修正,做到真正意义上的实时监控,同时也为未来类似项目积累了宝贵经验。在下文中,我们将详细介绍具体实施步骤以及相关配置参数,希望能够为有类似需求的企业提供参考。 ![钉钉与WMS系统接口开发配置](https://pic.qeasy.cloud/D28.png~tplv-syqr462i7n-qeasy.image) ### 调用管易云·奇门接口gy.erp.trade.return.get获取并加工数据 在数据集成生命周期的第一步,我们需要从源系统获取数据并进行初步加工。本文将详细探讨如何通过轻易云数据集成平台调用管易云·奇门接口`gy.erp.trade.return.get`来查询退货单,并对返回的数据进行处理。 #### 接口调用配置 首先,我们需要配置接口调用的元数据。以下是我们使用的元数据配置: ```json { "api": "gy.erp.trade.return.get", "method": "POST", "number": "code", "id": "code", "pagination": { "pageSize": 100 }, "idCheck": true, "formatResponse": [ { "old": "receive_date", "new": "receive_date_new", "format": "date" } ], "request": [ {"field":"code","label":"单据编号","type":"string"}, {"field":"start_create","label":"创建时间开始段","type":"datetime"}, {"field":"end_create","label":"创建时间结束段","type":"datetime"}, {"field":"in_begin_time","label":"入库时间开始段","type":"datetime"}, {"field":"in_end_time","label":"入库时间结束段","type":"datetime"}, {"field":"shop_code","label":"店铺代码","type":"string"}, {"field":"drp_tenant_name","label":"分销商名称","type":"string"}, {"field":"platform_code","label":"平台单号","type":"string"}, {"field":"return_type","label":"退货类型代码","type":"string"}, {"field":"express_no","label":"快递单号","type":"string"}, {"field":"vip_name","label":"会员名称","type":"string"}, {"field":"agree","label":"同意状态","type":"string"}, {"field":"receive","label":"入库状态","type":"string", "value": "1"}, {"field":"cancel","label":"作废状态","type": "string"}, {"field": "no_parcel", "label": "是否三无包裹", "type": "string"}, {"field": "receiver_name", "label": "退货人姓名", "type": "string"}, {"field": "receiver_phone", "label": "退货人手机", "type": "string"}, {"field": "warehousein_code", "label": "退入仓库代码", type: string}, {"field": warehouseout_code, label: 退出仓库代码, type: string}, { field: modify_start_date, label: 修改时间开始段, type: string, value: {{LAST_SYNC_TIME|datetime}} }, { field: modify_end_date, label: 修改时间结束段, type: string, value: {{CURRENT_TIME|datetime}} } ], otherRequest: [ { field: page_no, label: 页码, type: string, describe: 默认为1, value: {PAGINATION_START_PAGE} }, { field: page_size, label: 每页大小, type: string, describe: 默认为10, value:{PAGINATION_PAGE_SIZE}} ], autoFillResponse:true, condition_bk:[ [{ field:"approve", logic:"eqv2", value:"1"}] ], condition:[ [{ field:"approve", logic:"eqv2", value:"1"}, { field:"shop_code", logic:"neqv2", value:"SXF001"}, { field:"shop_code", logic:"neqv2", value:"CB0066"}, { field:"platform_code", logic:"notlike", value:"XXXSDD"}, { field:"platform_code", logic:"notlike", value:"CKSQ"}] ] } ``` #### 请求参数说明 在请求参数部分,我们定义了多个字段来过滤和控制查询结果: - `code`: 单据编号,用于唯一标识一个退货单。 - `start_create` 和 `end_create`: 创建时间的起止范围,用于筛选特定时间段内的退货单。 - `in_begin_time` 和 `in_end_time`: 入库时间的起止范围。 - `shop_code`: 店铺代码,过滤特定店铺的退货单。 - `drp_tenant_name`: 分销商名称。 - `platform_code`: 平台单号。 - `return_type`: 退货类型代码。 - `express_no`: 快递单号。 - `vip_name`: 会员名称。 - `agree`, `receive`, `cancel`, `no_parcel`: 各种状态字段,用于进一步细化筛选条件。 - `receiver_name` 和 `receiver_phone`: 退货人信息。 - `warehousein_code` 和 `warehouseout_code`: 仓库代码,用于指定退入和退出仓库。 此外,我们还设置了分页参数`page_no`和`page_size`,确保可以分批次获取大量数据。 #### 数据格式化与转换 在获取到原始数据后,我们需要对其进行一定的格式化和转换。例如,将返回的数据中的`receive_date`字段重新命名为`receive_date_new`,并将其格式化为日期类型。这一步骤可以通过配置中的`formatResponse`字段来实现: ```json { formatResponse:[ { old: receive_date, new: receive_date_new, format: date } ] } ``` #### 条件过滤 为了确保我们获取的数据符合业务需求,可以在元数据中定义条件过滤器。例如,以下条件确保只获取审核通过且不属于特定店铺和平台的退货单: ```json { condition:[ [{ field:"approve", logic:"eqv2", value:"1"}, { field:"shop_code", logic:"neqv2", value:"SXF001"}, { field:"shop_code", logic:"neqv2", value:"CB0066"}, { field:"platform_code", logic:"notlike", value:"XXXSDD"}, { field:"platform_code", logic:"notlike", value:"CKSQ"}] ] } ``` #### 自动填充响应 为了简化后续的数据处理步骤,可以启用自动填充响应功能,即将接口返回的数据直接映射到目标字段中: ```json { autoFillResponse:true } ``` 通过以上配置,我们可以高效地调用管易云·奇门接口获取所需的退货单数据,并对其进行初步加工,为后续的数据转换与写入做好准备。 ![轻易云数据集成平台金蝶集成接口配置](https://pic.qeasy.cloud/S20.png~tplv-syqr462i7n-qeasy.image) ### 数据集成与ETL转换:将源平台数据写入目标平台 在数据集成的过程中,ETL(Extract, Transform, Load)转换是至关重要的一步。本文将详细探讨如何将已经集成的源平台数据进行ETL转换,并通过轻易云集成平台API接口,将其转为目标平台所能够接收的格式,最终写入目标平台。 #### 数据请求与清洗 首先,我们需要从源平台获取原始数据。在这个过程中,通常会涉及到对数据进行清洗,以确保其质量和一致性。例如,在处理管易退货单时,我们需要确保每条退货记录都包含完整且正确的信息,如订单号、退货原因、退货数量等。 #### 数据转换 接下来,我们进入数据转换阶段。此阶段的主要任务是将清洗后的数据转化为目标平台所能接受的格式。假设我们已经获得了以下管易退货单的数据: ```json [ { "order_id": "12345", "return_reason": "商品损坏", "return_quantity": 2 }, { "order_id": "67890", "return_reason": "不满意", "return_quantity": 1 } ] ``` 在这个例子中,我们需要将这些数据转化为轻易云集成平台API接口所能接受的格式。根据元数据配置,目标API接口要求如下: ```json { "api": "写入空操作", "effect": "EXECUTE", "method": "POST", "idCheck": true } ``` 我们需要构建一个符合上述配置的POST请求体。假设目标平台要求的数据格式如下: ```json { "operation": "return_order", "data": [ { "order_id": "12345", "reason": "商品损坏", "quantity": 2 }, { "order_id": "67890", "reason": "不满意", "quantity": 1 } ] } ``` 为了实现这一点,我们可以编写一个简单的Python脚本来进行转换: ```python import json # 原始数据 source_data = [ {"order_id": "12345", "return_reason": "商品损坏", "return_quantity": 2}, {"order_id": "67890", "return_reason": "不满意", "return_quantity": 1} ] # 转换后的数据结构 transformed_data = { 'operation': 'return_order', 'data': [] } for record in source_data: transformed_record = { 'order_id': record['order_id'], 'reason': record['return_reason'], 'quantity': record['return_quantity'] } transformed_data['data'].append(transformed_record) # 转换后的JSON字符串 transformed_json = json.dumps(transformed_data, ensure_ascii=False) print(transformed_json) ``` 运行上述脚本后,我们得到符合目标平台要求的数据格式: ```json { "operation": "return_order", "data": [ { "order_id": "12345", "reason": "商品损坏", "quantity": 2 }, { "order_id": "67890", "reason": "不满意", "quantity": 1 } ] } ``` #### 数据写入 最后一步是将转换后的数据通过API接口写入目标平台。根据元数据配置,我们需要发送一个POST请求。以下是使用Python的`requests`库来实现这一过程的示例代码: ```python import requests # API URL和请求头部信息 api_url = 'https://api.targetplatform.com/execute' headers = {'Content-Type': 'application/json'} # 发送POST请求并检查响应状态码 response = requests.post(api_url, headers=headers, data=transformed_json) if response.status_code == 200: print("Data successfully written to the target platform.") else: print(f"Failed to write data. Status code: {response.status_code}") ``` 通过上述步骤,我们成功地将源平台的数据进行了ETL转换,并通过轻易云集成平台API接口,将其写入了目标平台。 这种方法不仅确保了数据在不同系统间的无缝对接,还提高了整个数据处理过程的透明度和效率。在实际应用中,根据具体业务需求和技术环境,可能还需要进行更多细节上的调整和优化。 ![系统集成平台API接口配置](https://pic.qeasy.cloud/T4.png~tplv-syqr462i7n-qeasy.image)