使用轻易云平台进行ETL转换并写入SQLServer的最佳实践

  • 轻易云集成顾问-黄宏棵

汤臣倍健营销云数据集成到SQL Server:采购入库同步案例分享

在系统对接和数据集成过程中,如何确保数据的准确性与实时性一直是技术人员面临的重大挑战。本文将详细介绍一个实践案例,即如何通过轻易云平台高效、可靠地将汤臣倍健营销云的数据集成到SQL Server,实现采购入库同步。这一方案的成功运行不仅提升了业务效率,还保障了数据的一致性与完整性。

为了实现这一目标,我们主要使用了两个关键API接口:从汤臣倍健营销云获取数据的/erp/api/order/query/purInWarehsOrder接口,以及向SQL Server写入数据的insert接口。以下内容将聚焦于几个核心技术点:

  1. 定时可靠的数据抓取: 为保证及时获取最新的采购入库订单信息,我们设置了定时任务,通过调用汤臣倍健营销云提供的API,按计划周期抓取更新的数据。这个过程需要处理分页和限流问题,以防止大量数据请求导致系统异常。

  2. 批量快速写入: 由于涉及大量订单的信息同步,我们采用批量提交方式,将抓取到的新订单一次性插入至SQL Server数据库中。在此过程中,需要特别关注网络延迟、事务控制以及错误重试机制,以确保每次操作都能成功执行,并且不会遗漏任何记录。

  3. 处理格式差异: 针对来自不同源的数据,其字段格式和命名可能存在显著差异,因此,进行适当的数据映射非常重要。在该案例中,我们编写了自定义转换脚本,将汤臣倍健营销云返回的数据格式转换为符合SQL Server标准规范的结构,从而避免因格式不匹配导致的数据存储失败。

  4. 异常处理与日志监控: 实现稳定运行的不二法门是完备而有效的异常处理机制。当遇到任何非预期情况,如API响应超时、数据库连接失败等状况时,系统会自动触发重试逻辑,同时详细记录相关日志供运维人员进行故障排查。同时,通过轻易云平台内置实时监控功能,可以直观了解整个集成流程中的各个环节状态,有助于快速定位并解决潜在问题。

综上所述,这个邀请式采购-四川绪泰方案通过合理设计定时抓取、大规模批量写入、自定义数据映射和完善异常防护措施,实现了高效、高质量的数据集成。不仅有力支撑企业运营,也为后续类似系统对接项目积累宝贵经验。 打通钉钉数据接口

调用源系统汤臣倍健营销云接口获取并加工数据

在数据集成生命周期的第一步,我们需要从源系统获取数据,并进行初步的清洗和加工。本文将详细探讨如何调用汤臣倍健营销云接口/erp/api/order/query/purInWarehsOrder,并对返回的数据进行处理。

接口调用配置

首先,我们需要配置API调用的元数据。根据提供的元数据配置,我们可以看到该接口使用POST方法,主要参数包括经销商ID、订单号、订单状态、时间范围等。以下是具体的请求参数配置:

{
  "api": "/erp/api/order/query/purInWarehsOrder",
  "method": "POST",
  "number": "number",
  "id": "id",
  "pagination": {
    "pageSize": 30
  },
  "condition": [
    [
      {
        "field": "itemList.materialNumber",
        "logic": "notlike",
        "value": "F.A"
      }
    ]
  ],
  "idCheck": true,
  "request": [
    {
      "field": "tenantId",
      "label": "经销商id",
      "type": "string",
      "describe": "经销商id(必填,营销云id)如:34cc4109705e4c058b7b3b0352e57d31",
      "value": "{{TENANT_ID}}"
    },
    {
      "field": "yxyNumber",
      "label": "营销云销售订单号",
      "type": "string",
      "describe": "如:YD1215710122031701,传此参数时,其他时间状态等条件无效"
    },
    {
      "field": "number",
      "label": "系统订单号",
      "type": "string",
      "describe": "如:XOUT0000000293,传此参数时,其他时间状态等条件无效"
    },
    {
      "field": "status",
      "label": "订单状态",
      "type": "string",
      "",
describe":"0:未审核 1:已审核(已出库)","value":"1"
},
{
"field":"beginTime","label":"开始时间","type":"string","describe":"timeType为空或者0,基于创建时间查询,timeType为1,基于更新时间查询,格式:0000-00-00或0000-00-00 00:00:00,如果不传单号此字段必填","value":"{{LAST_SYNC_TIME|datetime}}"
},
{
"field":"endTime","label":"结束时间","type":"string","describe":"timeType为空或者0,基于创建时间查询,timeType为1,基于更新时间查询,格式:0000-00-00或0000-00-00 00:00:00,如果不传单号此字段必填","value":"{{CURRENT_TIME|datetime}}"
},
{
"field":"pageNo","label":"页码","type":"string","describe":"默认1","value":"1"
},
{
"field":"pageSize","label":"每页条数","type":"string","describe":"默认30","value":"30"
},
{
"field":"timeType","label":"时间段标志","type":"string","describe":"查询时间段标识,0:创建时间(默认),1:最后更新时间","value":"1"
}
]
}

数据请求与清洗

在发起API请求后,我们将获得一批采购入库订单数据。为了确保数据质量和一致性,需要对返回的数据进行初步清洗和过滤。

数据过滤

根据元数据配置中的条件,我们需要排除物料编号包含“F.A”的项。这可以通过以下逻辑实现:

def filter_data(data):
    return [item for item in data if 'F.A' not in item['itemList']['materialNumber']]
数据转换

在清洗完数据后,需要对其进行必要的转换,以便后续处理。例如,将日期字符串转换为标准的日期对象:

from datetime import datetime

def convert_dates(data):
    for item in data:
        item['beginTime'] = datetime.strptime(item['beginTime'], '%Y-%m-%d %H:%M:%S')
        item['endTime'] = datetime.strptime(item['endTime'], '%Y-%m-%d %H:%M:%S')
    return data

数据写入准备

经过清洗和转换后的数据,需要准备写入目标系统。在这个阶段,可以根据业务需求对数据进行进一步的处理,例如聚合、计算等。

def prepare_for_write(data):
    # 示例:计算每个订单的总金额
    for item in data:
        item['totalAmount'] = sum([detail['price'] * detail['quantity'] for detail in item['itemList']])
    return data

通过以上步骤,我们完成了从汤臣倍健营销云获取采购入库订单数据,并进行了初步的清洗和转换,为后续的数据写入做好了准备。这是轻易云数据集成平台生命周期中至关重要的一环,它确保了从源系统获取的数据质量,为业务决策提供可靠的数据支持。 如何对接用友BIP接口

使用轻易云数据集成平台进行ETL转换并写入SQL Server

在数据集成的生命周期中,ETL(提取、转换、加载)是一个关键步骤。本文将详细探讨如何使用轻易云数据集成平台将已经集成的源平台数据进行ETL转换,最终写入目标平台SQL Server。

数据请求与清洗

首先,我们需要从源系统中提取原始数据,并进行必要的清洗和预处理。这个过程通常包括去重、格式化和校验等操作。在此阶段,我们假设已经完成了数据的清洗,并且数据已经准备好进行下一步的转换和写入。

数据转换与写入

接下来,我们进入数据生命周期的第二步:将清洗后的数据进行ETL转换,并写入目标平台SQL Server。以下是具体的实现步骤:

  1. 定义API接口元数据配置

    通过元数据配置,我们可以定义API接口的参数及其映射关系。以下是一个典型的元数据配置示例:

    {
       "api": "insert",
       "method": "POST",
       "idCheck": true,
       "request": [
           {
               "label": "主表参数",
               "field": "main_params",
               "type": "object",
               "children": [
                   {"parent": "main_params", "label": "单号编号", "field": "djbh", "type": "string", "value": "{number}"},
                   {"parent": "main_params", "label": "采购入库传CGC  销售退回传XHH", "field": "djlx", "type": "string", "value":"CGC"},
                   {"parent": "main_params", "label": "日期", "field":"rq","type":"string","value":"{{auditTime|date}}"},
                   {"parent": "main_params", "label":"时间","field":"ontime","type":"string","value":"{{auditTime|time}}"},
                   {"parent":"main_params","label":"单位内码","field":"wldwid","type":"string","value":"WLD00000002"},
                   {"parent":"main_params","label":"含税金额","field":"hsje","type":"string","value":"{{itemList.taxlastmoney}}"},
                   {"parent":"main_params","label":"收货人","field":"shouhr","type":"string","value":"{recvContact}"},
                   {"parent":"main_params","label":"地址","field":"shhdz","type":"string","value":"{recvAddr}"},
                   {"parent":"main_params","label":"联系电话","field":"lxdh","type":"string","value":"{recvTel}"},
                   {"parent":"main_params","label":"备注","field":"beizhu","type":"string","value":"订单备注:{remark}"},
                   {"parent":"main_params","label":"营销云单号","field":"webdjbh","type":"string","value":" {yxyNumber}"}
               ]
           },
           {
               “label”: “扩展表参数”,
               “field”: “extend_params_1”,
               “type”: “array”,
               “value”: “itemList”,
               “children”: [
                   {“parent”: “extend_params_1”, “label”: “单号”, “field”: “djbh”, “type”: “string”, “value”: “{number}”},
                   {“parent”: “extend_params_1”, “label”:”序号”,”field”:”dj_sn”,”type”:”string”,”value”:”{oSn}”},
                   {“parent”:”extend_params_1”,”label”:”商品内码”,”field”:”spid”,”type”:”string”,”value”:"_findCollection find spid from d76b64f9-f0e0-3436-a2d9-14c5579faa1b where spbh2={extMaterialNo}"}
                   {“parent”:”extend_params_1”,”label”:仓库编号”,“field”:ckid”,“type”:字符串”,“值”:{{itemList.depotNo}}}
                    {“父母”:扩展参数_1”,“标签”:批号”,“字段”:pihao”,“类型:字符串”,“值:{{itemList._Flot}}}
                    {“父母”:扩展参数_1”,“标签:效期”,“字段:sxrq”,“类型:字符串”,“值:{{itemList._Fexp}}}
                    {“父母”:扩展参数_1”,“标签:生产日期”,“字段:baozhiqi”,“类型:字符串”,“值:{{itemList._Fmfg}}}
                    {“父母”:扩展参数_1”,“标签:数量 数量大于0,“字段:shl”,“类型:字符串”,“值:{{itemList.opernumber}}}
                    {“父母:扩展参数_1","标签:含税价","字段:hshj","类型:字符串","值:_function {{itemList.taxlastmoney}}\/{{itemList.opernumber}}" }
                    {“父母:扩展参数_1","标签:含税金额","字段:hsje","类型:字符串","值:{{itemList.taxlastmoney}}" }
                    {“父母:扩展参数_1","标签:相关单号,如果退回,采购退 等原来的单号","字段:xgdjbh","类型:字符串","值:{yxyNumber}" }
                    {“父母:扩展参数_1","标签:相关序号 如果退回,采购退 等对应明细的那个序号","字段:recnum","类型:字符串","值:{oSn}" }
                    {“父母:扩展参数_1","标签:组织ID","字段:hzid","类型:字符串","值:{orgId}" }
                    {“父母:扩展参数_1”,标签:“仓库名称”,字段:“ckname”,类型:“字符串”,值:“{tenantName}”
                 ]
             }
         ],
         otherRequest:[{
             标签:“主SQL语句”,
             字段:“主sql”,
             类型:“字符串”,
             值:“插入到gxkphz (djbh,djlx,rq,ontime,wldwid,hsje,shouhr,shhdz,lxdh,beizhu,webdjbh) 值 ( :djbh,:djlx,:rq,:ontime,:wldwid,:hsje,:shouhr,:shhdz,:lxdh,:beizhu,:webdjbh)"
         },{
             标签:“扩展SQL语句1”
             字段:“扩展sql_1”
             类型:“字符串”
             值:“插入到gxkpmx (djbh,dj_sn,spid,ckid,pihao,sxrq,baozhiqi,shl,hshj,hsje,xgdjbh,recnum,hzid,ckname) 值 ( :djbh,:dj_sn,:spid,:ckid,:pihao,:sxrq,:baozhiqi,:shl,:hshj,:hsje,:xgdjbh,:recnum,:hzid,:ckname)"
         }]
     }
  2. 映射与转换

    在元数据配置中,我们定义了主表和扩展表的数据映射关系。例如,主表中的单号编号映射为{number},而日期则通过模板引擎从原始数据中提取并格式化为指定格式。

  3. 生成SQL语句

    根据配置生成相应的SQL插入语句。例如,主表的插入语句如下:

    INSERT INTO gxkphz (djbh,djlx,rq,ontime,wldwid,hsje,shouhr,shhdz,lxdh,beizhu,webdjbh) 
    VALUES (:djbh, :djlx, :rq, :ontime, :wldwid, :hsje, :shouhr, :shhdz, :lxdh, :beizhu , :webdjbh)
  4. 执行写入操作

    最后,通过API接口将生成的SQL语句发送到目标平台SQL Server,执行插入操作。以下是一个简化的HTTP请求示例:

    POST /api/insert HTTP/1.1
    Host: target-sql-server.com
    Content-Type: application/json
    
    {
       "_sql_main_query": "...",
       "_sql_extend_query_1": "...",
       ...
    }

关键技术点

  • 模板引擎:使用模板引擎对原始数据进行格式化和转换,如日期和时间格式化。
  • 动态SQL生成:根据元数据配置动态生成SQL插入语句,提高灵活性。
  • 异步处理:利用轻易云的数据集成平台实现全异步处理,提高系统性能和响应速度。

通过以上步骤,我们成功地将清洗后的源平台数据进行了ETL转换,并写入到了目标平台SQL Server。这种方法不仅提高了数据处理效率,还保证了系统间的数据一致性和完整性。 系统集成平台API接口配置