轻易云数据集成平台实现ETL与API对接的技术方案

  • 轻易云集成顾问-钟家寿
### 聚水潭数据集成到金蝶云星辰V2:其他入库单对接方案 在系统对接的实际应用中,将聚水潭的其他入库单数据无缝集成到金蝶云星辰V2是一项技术性很强、需要细致设计和精心实施的任务。本案例分析集中探讨如何通过轻易云数据集成平台,运用有效的方法与策略,确保两个系统间的数据流动顺畅、高效且准确。 #### 聚水潭API调用信号获取与处理 首要任务是从聚水潭接口`/open/other/inout/query`抓取数据。为了保证不漏单,并处理分页和限流问题,我们实现了一个定时可靠的数据抓取机制。每隔固定时间间隔,通过业务调度器触发API请求,将返回的数据分页存储,以应对可能存在的大量记录。同时,为了避免因限制频次导致的抖动及超额请求错误,我们在代码层面上加入限流控制逻辑。 ```python # 示例代码片段:定时抓取并处理分页 def fetch_data_from_jushuitan(page_num): response = requests.get(f'https://api.jushuitan.com/open/other/inout/query?page={page_num}') if response.status_code == 200: data = response.json() return data['results'], data['total_pages'] else: handle_api_error(response) # 定时调度器调用示例 schedule.every(10).minutes.do(lambda: paginated_fetch(fetch_data_from_jushuitan)) def paginated_fetch(fetch_function): page_num = 1 while True: results, total_pages = fetch_function(page_num) process_results(results) # 对结果进行批量处理与加工 if page_num >= total_pages: break page_num += 1 ``` #### 数据格式转换与定制化映射 由于聚水潭和金蝶云星辰V2在不同系统环境下可能使用不同的数据结构,因此,在将获取的数据推送至金蝶云之前,需要进行格式转换和必要的字段映射。这一步骤不仅涉及基础信息如字段名对应,更需关注诸如日期格式、货币类型等细节,这些都可以通过自定义映射配置完成,从而确保提交到金蝶云星辰V2的数据完全符合其要求。 ```python # 格式转换示例函数: def transform_jushuitan_to_kingdee(data): transformed_data = { "billNo": data["order_no"], "date": format_date(data["create_time"]), "items": [{ "materialCode": item["sku"], "quantity": item["qty"], ... } for item in data["items"]], ![泛微OA与ERP系统接口开发配置](https://pic.qeasy.cloud/D22.png~tplv-syqr462i7n-qeasy.image) ### 调用聚水潭接口/open/other/inout/query获取并加工数据的技术案例 在数据集成过程中,调用源系统的API接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用聚水潭的`/open/other/inout/query`接口,并对返回的数据进行加工处理。 #### 接口配置与调用 聚水潭提供的`/open/other/inout/query`接口用于查询其他入库单信息。该接口采用POST方法,支持多种查询参数。以下是元数据配置中的关键字段: - `modified_begin` 和 `modified_end`: 用于指定查询时间范围,分别表示修改起始时间和结束时间。 - `so_ids`: 指定线上订单号。 - `types`: 单据类型,包括“其它退货”、“其它出库”、“其它进仓”。 - `status`: 单据状态,如“Confirmed”(生效)、“WaitConfirm”(待审核)、“Archive”(归档)、“Cancelled”(取消)。 - `page_index` 和 `page_size`: 分页参数,用于控制查询结果的分页。 #### 请求参数配置 在实际调用中,我们需要根据业务需求设置请求参数。以下是一个示例请求配置: ```json { "modified_begin": "{{LAST_SYNC_TIME|datetime}}", "modified_end": "{{CURRENT_TIME|datetime}}", "so_ids": "", "types": "其它退货、其它进仓", "status": "Confirmed", "page_index": "1", "page_size": "30" } ``` 其中,`{{LAST_SYNC_TIME|datetime}}`和`{{CURRENT_TIME|datetime}}`是动态变量,分别表示上次同步时间和当前时间。这些变量可以通过轻易云平台的内置函数自动填充。 #### 数据清洗与转换 在获取到原始数据后,需要对数据进行清洗和转换,以满足目标系统的要求。例如,将字符串类型的单据类型字段转换为数组: ```json { "types": { "parser": { "name": "StringToArray", "params": "、" } } } ``` 这个配置使用了一个名为`StringToArray`的解析器,将以顿号分隔的字符串转换为数组格式,从而便于后续处理。 #### 自动填充与补救机制 轻易云平台提供了自动填充响应和遗漏补救机制,以确保数据完整性。在元数据配置中,我们启用了自动填充响应: ```json "autoFillResponse": true ``` 此外,还配置了定时任务(crontab)来处理可能遗漏的数据请求: ```json "omissionRemedy": { "crontab": "0 0 * * *", "takeOverRequest": [ { "field": "modified_begin", "value": "_function FROM_UNIXTIME( unix_timestamp() -604800 , '%Y-%m-%d %H:%i:%s' )", "type": "string" } ] } ``` 这个定时任务每天零点运行一次,将起始时间设置为当前时间前一周,以确保没有遗漏的数据被忽略。 #### 实际应用案例 假设我们需要从聚水潭系统中获取过去一天内所有已生效的“其它退货”和“其它进仓”单据,并将其写入星辰系统。我们可以按照以下步骤进行操作: 1. **配置请求参数**:设置`modified_begin`为前一天零点,`modified_end`为当前时间,单据类型选择“其它退货”和“其它进仓”,状态选择“Confirmed”。 2. **调用接口**:通过轻易云平台发起POST请求,获取符合条件的数据。 3. **数据清洗与转换**:将返回的数据进行必要的清洗和格式转换,例如将字符串类型的字段转换为数组或其他目标系统所需格式。 4. **写入目标系统**:将处理后的数据写入星辰系统,完成数据集成过程。 通过上述步骤,我们可以高效地实现不同系统间的数据无缝对接,并确保数据的一致性和完整性。这不仅提升了业务透明度,还极大地提高了工作效率。 ![用友与WMS系统接口开发配置](https://pic.qeasy.cloud/S14.png~tplv-syqr462i7n-qeasy.image) ### 轻易云数据集成平台:ETL转换与金蝶云星辰V2API接口对接技术案例 在数据集成过程中,将源平台的数据转换为目标平台所能接受的格式是关键的一步。本文将详细探讨如何使用轻易云数据集成平台,将聚水潭其他入库单的数据通过ETL转换,写入金蝶云星辰V2API接口。 #### API接口配置 在本案例中,我们需要将聚水潭的其他入库单数据转化为金蝶云星辰V2API接口能够接收的格式。以下是我们使用的元数据配置: ```json { "api": "/jdy/v2/scm/inv_other_in", "effect": "EXECUTE", "method": "POST", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ {"field":"bill_date","label":"单据日期","type":"string","describe":"单据日期","value":"{io_date}"}, {"field":"bill_no","label":"单据编码","type":"string","describe":"单据编码","value":"{io_id}"}, {"field":"trans_type_id","label":"业务类型id","type":"string","describe":"业务类型id","value":"12"}, {"field":"operation_key","label":"操作类型","type":"string","describe":"操作类型,审核audit、提交submit","value":"audit"}, {"field":"remark","label":"备注","type":"string","value":"{remark}"}, { "field":"material_entity", "label":"商品分录", "type":"array", "describe":"商品分录", "value":"items", "children":[ {"field":"material_id","label":"商品","type":"string","describe":"商品","value":"_findCollection find id from b4a428a0-beb9-3d69-b96c-97c34a217550 where number={{items.sku_id}}"}, {"field":"qty","label":"数量","type":"string","describe":"数量","value":"{{items.qty}}"}, {"field":"unit_id","label":"单位","type":"string","describe":"单位","value":"_findCollection find base_unit_id from b4a428a0-beb9-3d69-b96c-97c34a217550 where number={{items.sku_id}}"}, {"field":"stock_id","label":"仓库","type":"string","describe":"仓库", "value": "_findCollection find id from 07c0fe43-86f2-30da-9fe5-d7459ecb961d where number={wms_co_id}-{wh_id}"}, {"field": "cost", "label": "入库成本", "type": "string", "value": "{{items.sale_amount}}"}, {"field": "unit_cost", "label": "出库成本单位", "type": "string", "value": "{{items.sale_price}}"} ] } ] } ``` #### 数据请求与清洗 首先,我们从聚水潭获取其他入库单的数据。假设我们已经完成了数据请求和初步清洗工作,接下来需要将这些数据进行ETL转换。 #### 数据转换与写入 1. **字段映射**:将聚水潭的数据字段映射到金蝶云星辰V2API接口所需的字段。例如,将聚水潭的`io_date`映射到`bill_date`,`io_id`映射到`bill_no`。 2. **静态值设置**:某些字段需要设置为固定值,例如`trans_type_id`设置为"12",表示业务类型ID。 3. **动态值处理**:对于一些动态生成的值,例如商品分录中的各个字段,需要通过查找表或计算得到。例如: - `material_id`通过SKU ID在指定表中查找得到。 - `unit_id`通过SKU ID在指定表中查找得到。 - `stock_id`通过仓库ID和公司ID组合查找得到。 4. **数组处理**:对于复杂结构的数据,如商品分录,需要遍历每个商品条目,并进行相应的字段转换和查找。 ```json { "bill_date": "{io_date}", "bill_no": "{io_id}", "trans_type_id": "12", "operation_key": "{audit}", ... } ``` 5. **发送请求**:将转换后的数据通过POST方法发送到金蝶云星辰V2API接口。确保请求格式和内容符合API要求,以保证数据能够正确写入目标系统。 ```python import requests url = 'https://api.kingdee.com/jdy/v2/scm/inv_other_in' headers = {'Content-Type': 'application/json'} data = { 'bill_date': '2023-10-01', 'bill_no': 'IN20231001', 'trans_type_id': '12', 'operation_key': 'audit', ... } response = requests.post(url, headers=headers, json=data) if response.status_code == 200: print('Data successfully written to Kingdee Cloud Star V2') else: print('Failed to write data:', response.text) ``` #### 实时监控与日志记录 在整个ETL过程中,实时监控和日志记录至关重要。轻易云平台提供了实时监控功能,可以查看每一步的数据流动和处理状态,及时发现并解决问题。同时,通过日志记录,可以追溯每一笔数据的处理过程,为后续优化提供依据。 以上就是使用轻易云数据集成平台,将聚水潭其他入库单的数据转化并写入金蝶云星辰V2API接口的技术案例。通过合理配置元数据,并结合实际业务需求进行动态值处理,实现了不同系统间的数据无缝对接。 ![如何对接企业微信API接口](https://pic.qeasy.cloud/T8.png~tplv-syqr462i7n-qeasy.image)