ETL转换实战:从旺店通到轻易云集成平台的数据处理

  • 轻易云集成顾问-陈洁琳

查询销售退货单(旺店通):金蝶云星空数据集成到轻易云集成平台

在本案例中,我们将重点探讨如何通过轻易云数据集成平台,将金蝶云星空中的销售退货单(旺店通)数据高效对接并集成。为了确保整个系统对接过程的顺利进行,我们采用了多个技术手段,涵盖数据抓取、分页处理、限流控制以及异常处理与错误重试机制等方面。

首先,通过调用金蝶云星空提供的executeBillQuery接口获取销售退货单的数据。为了防止漏单和提升整体性能,我们设计了一套定时可靠的数据抓取方案,对接口进行周期性轮询,并使用批量请求优化接口访问频率。

其次,大量数据写入到轻易云集成平台是一个关键环节。为此,我们利用轻易云自身强大的批量数据处理能力,实现快速且稳定的数据写入操作。此外,为解决两者之间可能存在的数据格式差异问题,制定了一系列定制化的数据映射规则,从而保证数据转换准确无误。

与此同时,在实际应用过程中,不可避免地会遇到API限流和分页需求。我们特别关注这一部分,通过设置合理的分页策略和限流管理措施,以平衡系统负载,确保整体流程顺畅运行。在出现异常状况时,我们还引入了一套完整的异常捕获和错误重试机制,从而提高系统的健壮性与容错能力。

最后,对于整个集成过程中的实时监控与日志记录功能,也在方案中被一一实现,通过透明化操作界面,使得每个环节都清晰可见,为后续故障排查提供极大便利。 如何开发金蝶云星空API接口

调用金蝶云星空接口executeBillQuery获取并加工数据

在数据集成的生命周期中,调用源系统接口获取数据是至关重要的一步。本文将详细介绍如何通过轻易云数据集成平台调用金蝶云星空的executeBillQuery接口,获取销售退货单数据并进行初步加工。

元数据配置解析

首先,我们需要理解元数据配置中的各个字段及其作用。以下是元数据配置的详细解析:

{
  "api": "executeBillQuery",
  "method": "POST",
  "number": "FBillNo",
  "id": "FEntity_FENTRYID",
  "pagination": {
    "pageSize": 100
  },
  "request": [
    {"field":"FID","label":"实体主键","type":"string","value":"FID"},
    {"field":"FBillNo","label":"单据编号","type":"string","value":"FBillNo"},
    {"field":"FSaleOrgId","label":"销售组织","type":"string","value":"FSaleOrgId"},
    {"field":"FStockOrgId","label":"库存组织","type":"string","value":"FStockOrgId"},
    {"field":"FSettleOrgId","label":"结算组织","type":"string","value":"FSettleOrgId"},
    {"field":"FApproveDate","label":"日期","type":"string","value":"FApproveDate"},
    {"field":"FRetcustId","label":"客户","type":"string","value":"FRetcustId.FNumber"},
    {"field":"FMaterialId_FNumber","label":"物料","type":"string","value":"FMaterialId.FNumber"},
    {"field":"FOwnerIdHead","label":"货主","type":"string","value":"FOwnerIdHead"},
    {"field":"FRealQty","label":"实发数量","type":"string","value":"FRealQty"},
    {"field":"FStockId_FNumber","label":"仓库id","type":"string","value":"FStockId.FNumber"},
    {"field": "FTaxPrice", "label": "含税单价", "type": "string", "value": "FTaxPrice"},
    {"field": "FRetcustId_FNumber", "label": "投诉对应客户", "type": "string", "value": "FRetcustId.FNumber"},
    {"field": "FSrcBillNo", "label": "源单编号", "type": "string", "value": "FSrcBillNo"},
    {"field": "FSalesManId_FName", "label": "销售员", "type": "string", "value": "FSalesManId.FName"},
    {"field": "FEntity_FENTRYID", "label": "", type: string, value: FEntity_FENTRYID},
    {"field" : FPrice, label: 单价, type: string, value: FPrice},
    {"field" : FBillTypeID, label: 单据类型, type: string, value: FBillTypeID},
    {"field" : FDate, label: 日期, type: string, value: FDate},
    {"field" : FOrderNo, label: 订单编号, type: string, value: FOrderNo},
    {"field" : F_PBLH_order_id, label: 纷享销客订单内码, type: string, value: F_PBLH_order_id},
    { field : F_PBLH_order_Entryid , label : 纷享销客订单分录内码 , type : string , value : F_PBLH_order_Entryid },
     { field : FRetcustId_F_PBLH_FXID , label : 客户分享id , type : string , value : FRetcustId.F_PBLH_FXID },
     { field : FInvoicedQty , label : 关联应收数量 , type : string , value : FInvoicedQty },
     { field : FSOEntryId , label : 销售订单EntryId , type : string , value : FSOEntryId }
   ],
   otherRequest:[
      { field:"Limit", label:"最大行数", type:"string", describe:"金蝶的查询分页参数", value:"{PAGINATION_PAGE_SIZE}"},
      { field:"StartRow", label:"开始行索引", type:"string", describe:"金蝶的查询分页参数", value:"{PAGINATION_START_ROW}"},
      { field:"TopRowCount", label:"返回总行数", type:"int", describe:"金蝶的查询分页参数"}
      { field :"FilterString" , label :"过滤条件" , type :"string" , describe :"示例写法 FSupplierId.FNumber = 'VEN00010' and FApproveDate>=" , value :"F_PRSH_FROM = '旺店通' and FRealQty <> FInvoicedQty and FDocumentStatus='C'"}
      { field :"FieldKeys" , label :"需查询的字段key集合" , type :"array" , describe :"金蝶分录主键ID格式:FPOOrderEntry_FEntryId,其它格式 FPurchaseOrgId.FNumber" ,"parser":{"name ":"ArrayToString ","params ": ","}}
      { field :"FormId" , label :"业务对象表单 Id ", type :"string ", describe :"必须填写金蝶的表单 ID 如:PUR_PurchaseOrder ", value :"SAL_RETURNSTOCK"}
   ],
   buildModel:true
}

调用API接口

为了调用executeBillQuery接口,我们需要构建一个POST请求。以下是一个示例请求体:

{
  "_FormData_":{
     "_FormMetaData_":{
        "_FormMetaData_":[
           {
              "_FieldKey_":"",
              "_FieldLabel_":"",
              "_FieldType_":"",
              "_FieldValue_":[]
           }
        ]
     },
     "_FormData_":[
        {
           "_FieldKey_":"",
           "_FieldLabel_":"",
           "_FieldType_":"",
           "_FieldValue_":[]
        }
     ]
  }
}

在实际操作中,我们需要根据元数据配置填充请求体中的字段。例如:

{
  "_FormData_":{
     "_FormMetaData_":{
        "_FormMetaData_":[
           {
              "_FieldKey_":"",
              "_FieldLabel_":"",
              "_FieldType_":"",
              "_FieldValue_":[]
           }
        ]
     },
     "_FormData_":[
        {
           "_FieldKey_":"",
           "_FieldLabel_":"",
           "_FieldType_":"",
           "_FieldValue_":[]
        }
     ]
  }
}

数据清洗与转换

在获取到原始数据后,下一步是对数据进行清洗和转换。这一步骤包括但不限于:

  1. 去除冗余字段:只保留业务所需字段。
  2. 格式转换:将日期、金额等字段转换为标准格式。
  3. 数据校验:确保每个字段的数据符合预期,例如非空校验、数值范围校验等。

例如,对于日期字段FApproveDate,我们可以将其转换为标准的ISO日期格式:

from datetime import datetime

def convert_date(date_str):
   return datetime.strptime(date_str, '%Y-%m-%d').isoformat()

data['FApproveDate'] = convert_date(data['FApproveDate'])

数据写入目标系统

经过清洗和转换的数据可以通过轻易云平台写入到目标系统。在此过程中,可以利用平台提供的数据映射功能,将源系统的数据字段映射到目标系统的相应字段。

例如,将清洗后的销售退货单数据写入到ERP系统中:

{
   ...
   // 映射后的目标系统字段和值
   ...
}

通过上述步骤,我们实现了从调用金蝶云星空接口获取销售退货单数据,到对数据进行清洗和转换,再到最终将处理后的数据写入目标系统的完整流程。这一过程不仅提高了业务透明度和效率,还确保了不同系统间的数据一致性和准确性。 如何对接用友BIP接口

数据集成生命周期中的ETL转换:从旺店通到轻易云

在数据集成的生命周期中,ETL(Extract, Transform, Load)转换是一个至关重要的环节。本文将详细探讨如何将从旺店通查询到的销售退货单数据进行ETL转换,并通过轻易云集成平台API接口写入目标平台。

1. 数据提取与清洗

首先,从源系统旺店通中提取销售退货单数据。假设我们已经通过API获取了这些数据,并进行了初步的清洗工作。这些数据可能包括订单ID、退货原因、退货数量等信息。在这一阶段,我们确保数据的完整性和准确性,为后续的转换步骤打下基础。

2. 数据转换

接下来,我们需要将清洗后的数据转换为目标平台能够接收的格式。轻易云集成平台API接口要求的数据格式如下:

{
    "api": "写入空操作",
    "effect": "EXECUTE",
    "method": "POST",
    "idCheck": true
}

在实际操作中,我们需要将旺店通的数据映射到上述格式中。例如,假设我们从旺店通获取的数据结构如下:

{
    "order_id": "12345",
    "return_reason": "商品损坏",
    "return_quantity": 2
}

我们需要编写一个转换脚本,将这些字段映射到轻易云集成平台API所需的格式。以下是一个示例Python脚本,用于完成这一转换:

import json

# 假设这是从旺店通获取的数据
source_data = {
    "order_id": "12345",
    "return_reason": "商品损坏",
    "return_quantity": 2
}

# 转换为目标平台所需的格式
target_data = {
    "api": "写入空操作",
    "effect": "EXECUTE",
    "method": "POST",
    "idCheck": True,
    # 添加具体业务相关的数据字段
    "data": {
        "orderID": source_data["order_id"],
        "reason": source_data["return_reason"],
        "quantity": source_data["return_quantity"]
    }
}

# 转换为JSON字符串以便发送请求
target_json = json.dumps(target_data)
print(target_json)

3. 数据写入

完成数据转换后,下一步是通过轻易云集成平台API接口将数据写入目标平台。根据元数据配置,我们使用POST方法发送请求,并确保进行ID检查。

以下是一个示例Python代码,用于通过HTTP请求将转换后的数据写入目标平台:

import requests

# 定义目标URL(假设为示例URL)
url = 'https://api.qingyiyun.com/data/write'

# 设置请求头部信息,通常包括认证信息和内容类型
headers = {
    'Content-Type': 'application/json',
    'Authorization': 'Bearer YOUR_ACCESS_TOKEN'
}

# 发送POST请求,将转换后的JSON数据写入目标平台
response = requests.post(url, headers=headers, data=target_json)

# 检查响应状态码和内容,以确保请求成功
if response.status_code == 200:
    print("Data written successfully.")
else:
    print(f"Failed to write data: {response.status_code}, {response.text}")

4. 实时监控与错误处理

在整个ETL过程中,实时监控和错误处理同样重要。轻易云集成平台提供了实时监控功能,可以帮助我们追踪每个环节的数据流动和处理状态。一旦发现异常情况,如网络问题或数据格式错误,应及时记录并采取相应措施。

例如,可以在发送请求之前添加更多的错误检查逻辑:

try:
    response = requests.post(url, headers=headers, data=target_json)
    response.raise_for_status()  # 如果响应状态码不是200,会抛出HTTPError异常
except requests.exceptions.RequestException as e:
    print(f"An error occurred: {e}")

通过上述步骤,我们可以高效地完成从旺店通到轻易云集成平台的数据ETL转换和写入过程。这不仅提升了业务透明度和效率,还确保了不同系统间的数据无缝对接。 用友与CRM系统接口开发配置