借助轻易云平台完成ETL转换并写入金蝶云星空的技术实践

  • 轻易云集成顾问-孙传友

聚水潭盘盈单对接金蝶其他入库单ok

数据集成在企业信息系统协同工作中起着关键作用,特别是在涉及多平台的数据传输时。本文分享一个实际案例:如何通过轻易云数据集成平台,将聚水潭的盘盈单精准对接到金蝶云星空的其他入库单。

为了实现这一目标,我们需要确保几个关键技术点:

  1. 可靠抓取聚水潭接口数据: 使用聚水潭提供的 inventory.count.query API 定时获取最新的盘盈数据信息。此API允许我们查询特定时间段内所有新增或更新过的数据记录,保证不遗漏任何重要数据。

  2. 处理分页和限流问题: 由于 inventory.count.query API 返回的数据可能非常庞大,所以需要进行分页处理,并遵守API调用频率限制,以免触发限流机制。这一步骤至关重要,以确保稳定、高效地获取全部必要数据。

  3. 大量数据快速写入到金蝶云星空: 获取到完整、正确的盘盈记录后,需要通过金蝶云星空提供的 batchSave API 批量写入这些数据。批量操作能够显著提升系统性能与运行效率,但同时也要求我们细致检查每一条记录是否符合输入规范。

  4. 处理数据格式差异: 聚水潭与金蝶云星空之间的数据结构存在差异,我们必须进行详细分析并实施相应转换,使得跨平台传输过程中保持一致性和准确性。例如,对日期格式、数值单位等字段进行标准化转换,是避免潜在错误的重要环节。

  5. 异常处理及重试机制: 在整个对接流程中,不可避免会遇到网络故障、服务器响应超时等异常情况。因此,设计一个健壮的错误检测及重试机制尤为必要,这样可以保证即使部分上传失败,也能自动重新尝试直至成功,同时有效日志记录便于后续审计跟踪和问题排查。

本文将详细讲解这些步骤中的每个技术要点,展示如何利用高效可靠的方法,实现从聚水潭到金蝶云星空完美无缝的数据集成。 如何对接用友BIP接口

调用聚水潭接口inventory.count.query获取并加工数据的技术案例

在数据集成过程中,调用源系统的API接口是至关重要的一步。本文将详细探讨如何通过轻易云数据集成平台调用聚水潭的inventory.count.query接口,获取盘盈单数据并进行初步加工。

接口调用配置

首先,我们需要配置元数据,以便正确调用聚水潭的inventory.count.query接口。以下是元数据配置的详细说明:

{
  "api": "inventory.count.query",
  "method": "POST",
  "number": "io_id",
  "id": "io_id",
  "idCheck": true,
  "condition": [
    [{"field":"items.qty","logic":"gt","value":0}],
    [{"field":"batch.qty","logic":"gt","value":0}]
  ],
  "request": [
    {"field":"page_index","label":"开始页码","type":"string","describe":"第几页,从第一页开始,默认1"},
    {"field":"page_size","label":"每页条数","type":"string","describe":"每页多少条,默认30,最大50"},
    {"field":"modified_begin","label":"修改开始时间","type":"string","describe":"修改起始时间,和结束时间必须同时存在,时间间隔不能超过七天,与线上单号不能同时为空", "value":"{{LAST_SYNC_TIME|datetime}}"},
    {"field":"modified_end","label":"修改结束时间","type":"string","describe":"修改结束时间,和起始时间必须同时存在,时间间隔不能超过七天,与线上单号不能同时为空", "value":"{{CURRENT_TIME|datetime}}"},
    {"field":"io_ids","label":"盘点单号","type":"string","describe":"指定盘点单号,多个用逗号分隔,最多50,和时间段不能同时为空"},
    {"field":"status","label":"单据状态","type":"string","describe":"单据状态,Confirmed=生效,WaitConfirm待审核,Creating=草拟,Archive=归档,Cancelled=作废", "value": "Confirmed"}
  ]
}

请求参数解析

  1. page_index: 开始页码,从第一页开始。
  2. page_size: 每页条数,默认30条记录。
  3. modified_begin: 修改开始时间,用于限定查询范围。
  4. modified_end: 修改结束时间,与修改开始时间共同使用。
  5. io_ids: 指定盘点单号,可以指定多个,用逗号分隔。
  6. status: 单据状态,此处我们只查询已生效的单据(Confirmed)。

数据请求与清洗

在调用接口时,我们需要构建请求体,并根据元数据中的条件对返回的数据进行初步清洗。以下是一个示例请求体:

{
  "page_index": "1",
  "page_size": "30",
  "modified_begin": "{{LAST_SYNC_TIME|datetime}}",
  "modified_end": "{{CURRENT_TIME|datetime}}",
  "status": "Confirmed"
}

通过POST方法发送请求后,我们会收到一组盘盈单数据。为了确保数据质量,我们需要根据条件对数据进行过滤。例如,只保留items.qtybatch.qty大于0的记录。

数据转换与写入

在清洗完毕后,需要将数据转换为目标系统所需的格式,并写入目标系统。在这个案例中,我们假设目标系统为金蝶其他入库单。以下是一个简单的数据转换示例:

def transform_data(data):
    transformed_data = []
    for record in data:
        if record['items']['qty'] > 0 and record['batch']['qty'] > 0:
            transformed_record = {
                'entry_id': record['io_id'],
                'product_code': record['items']['product_code'],
                'quantity': record['items']['qty'],
                'batch_number': record['batch']['batch_no']
            }
            transformed_data.append(transformed_record)
    return transformed_data

通过上述代码,我们将聚水潭的数据转换为金蝶系统所需的格式。

实践中的注意事项

  1. 分页处理:由于每次请求只能返回有限数量的数据,需要实现分页逻辑以获取所有符合条件的数据。
  2. 异常处理:在实际操作中,需要处理可能出现的网络异常、接口错误等情况,以确保数据集成过程的稳定性。
  3. 实时监控:利用轻易云平台提供的实时监控功能,可以及时发现并解决数据流动中的问题,提高整体效率。

通过以上步骤,我们成功地调用了聚水潭的inventory.count.query接口,并对获取的数据进行了初步加工,为后续的数据转换与写入打下了坚实基础。这一过程展示了轻易云平台在异构系统集成中的强大能力和灵活性。 金蝶与外部系统打通接口

使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口的技术案例

在数据集成生命周期的第二步,我们将已经集成的源平台数据进行ETL转换,转为目标平台金蝶云星空API接口所能够接收的格式,并最终写入目标平台。本文将详细探讨这一过程中涉及的技术细节和元数据配置。

元数据配置解析

我们使用的元数据配置如下:

{
  "api": "batchSave",
  "method": "POST",
  "idCheck": true,
  "operation": {
    "rowsKey": "array",
    "rows": 1,
    "method": "batchArraySave"
  },
  "request": [
    {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"{io_id}"},
    {"field":"FBillTypeID","label":"单据类型","type":"string","describe":"单据类型","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"PYQTRKD"},
    {"field":"FStockOrgId","label":"库存组织","type":"string","describe":"组织","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"100"},
    {"field":"FStockDirect","label":"库存方向","type":"string","describe":"下拉列表","value":"GENERAL"},
    {"field":"FDate","label":"日期","type":"string","describe":"日期","value":"{io_date}"},
    {"field":"FOwnerTypeIdHead","label":"货主类型","type":"string","describe":"多类别基础资料列表","value":"BD_OwnerOrg"},
    {"field":"FOwnerIdHead","label":"货主","type":"string","describe":"多类别基础资料","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":
      "_findCollection find F_HZ_OrgId_Fnumber from 0cba013f-c6e7-3826-bf89-bb0b11e996e1 where FNumber={{items.sku_id}}"},
    {"field":"FNOTE","label":"备注","type": "string", "describe": "多行文本", "value": "{remark}"},
    {
      "field": "FEntity",
      "label": "明细信息",
      "type": "array",
      "children": [
        {"field": "FMATERIALID", "label": "物料编码", 
            "type": "string", 
            "describe": "基础资料", 
            "parser":{"name": "ConvertObjectParser", 
            "params": "FNumber"}, 
            "value": "{{items.sku_id}}", 
            "parent": "FEntity"
        },
        {"field": "FSTOCKID", 
            "label": "收货仓库", 
            "type": "string", 
            "describe": "基础资料", 
            "parser":{"name": 
                "ConvertObjectParser", 
                "params":
                    "FNumber"}, 
                    "value":
                     "{wms_co_id}", 
                     parent: "
                     FEntity"
        },
        {"field":
                "FSTOCKSTATUSID",
                label: "
                库存状态",
                type: "
                string",
                value:
                 KCZT01_SYS,
                 parser:{
                     name:
                     ConvertObjectParser,
                     params:
                     FNumber
                 }
        },
        {"field":
                FLOT,
                label:
                 批号,
                 type:
                 string,
                 describe:
                 批次,
                 value:
                 {{items.batch_id}},
                 parent:
                 FEntity,
                 parser:{
                     name:
                     ConvertObjectParser,
                     params:
                     FNumber
                 }
        },
        {"field":
                FPRODUCEDATE,
                label:
                 生产日期,
                 type:
                 string,
                 describe:
                 批次,
                 value:
                 {{items.product_date}},
                 parent:
                 FEntity
        },
        {"field":
                FEXPIRYDATE,
                label:
                 有效期至,
                 type:
                 string,
                 describe:
                 批次,
                 value:
                     {{items.expiration_date}},
                     parent:"
                     FEntity
        },
        {"
            field:FQty,label:实收数量,type:string,describe:数量,value:{{items.qty}},parent:FEntity},
        {"
            field:FPrice,label:成本价,type:string,describe:单价,parent:FEntity},
        {"
            field:FEntryNote,label:备注,type:string,describe:多行文本,parent:FEntity},
        {"
            field:FOWNERTYPEID,label:货主类型,type:string,value:BD_OwnerOrg},
        {"
            field:FOWNERID,label:货主,type:string,value:_findCollection find F_HZ_OrgId_Fnumber from 0cba013f-c6e7-3826-bf89-bb0b11e996e1 where FNumber={{items.sku_id}},parser:{name:ConvertObjectParser,params:FNumber}}
       ],
       value:"items"
     }
   ],
   otherRequest:[
     {"
       field:"FormId,label:"业务对象表单Id,type:"string,describe:"必须填写金蝶的表单ID如:PUR_PurchaseOrder,value:"STK_MISCELLANEOUS},
     {"
       field:IsVerifyBaseDataField,label:"验证基础资料,type:"bool,describe:"是否验证所有的基础资料有效性,布尔类,默认false(非必录),value:true},
     {"
       field:Operation,label:"执行的操作,type:"string,value:"Save},
     {"
       field:IsAutoSubmitAndAudit,label:"提交并审核,type:"bool,value:false}
   ]
}

技术实现步骤

  1. API接口调用配置

    • api字段指定了调用的API接口为batchSave
    • method字段指定了HTTP请求方法为POST
    • idCheck字段设置为true,确保每条记录都有唯一标识。
  2. 操作参数设置

    • operation字段中定义了批量保存操作,使用了batchArraySave方法。
    • rowsKeyrows分别指定了数组键和行数。
  3. 请求参数映射

    • 请求参数通过request数组进行定义,每个字段都包含具体的信息,如字段名称、标签、类型、描述和取值方式等。
    • 特别注意的是部分字段使用了解析器(如:ConvertObjectParser),用于将特定格式的数据转换为目标系统可识别的格式。例如:
      {
      "field": "FBillTypeID",
      label:"单据类型",
      type:"string",
      describe:"单据类型",
      parser:{"name:"ConvertObjectParser",params:"FNumber"},
      value:"PYQTRKD"
      }
  4. 嵌套数组处理

    • 对于复杂的数据结构,如明细信息(FEntity),使用嵌套数组来表示。每个子项也包含类似的字段配置,并且可以继续嵌套其他子项。
  5. 其他请求参数

    • otherRequest数组中定义了一些额外的请求参数,如业务对象表单ID(FormId)、是否验证基础资料(IsVerifyBaseDataField)、执行操作(Operation)以及是否自动提交并审核(IsAutoSubmitAndAudit)。

数据转换与写入示例

假设我们有以下源数据需要转换并写入金蝶云星空:

{
  io_id: 'PI20231001',
  io_date: '2023-10-01',
  remark: '盘盈入库',
  items:[
    {
      sku_id:'SKU12345',
      batch_id:'BATCH001',
      product_date:'2023-09-15',
      expiration_date:'2024-09-15',
      qty:'100'
    }
  ],
  wms_co_id:'WH001'
}

根据上述元数据配置,该源数据将被转换为以下格式:

{
  FormId: 'STK_MISCELLANEOUS',
  IsVerifyBaseDataField:true,
  Operation:'Save',
  IsAutoSubmitAndAudit:false,
  Model:{
    FBillNo:'PI20231001',
    FBillTypeID:{'FNumber':'PYQTRKD'},
    FStockOrgId:{'FNumber':'100'},
    FStockDirect:'GENERAL',
    FDate:'2023-10-01',
    FOwnerTypeIdHead:'BD_OwnerOrg',
    FOwnerIdHead:{'FNumber':'_findCollection find F_HZ_OrgId_Fnumber from 0cba013f-c6e7-3826-bf89-bb0b11e996e1 where FNumber=SKU12345'},
    NOTE:'盘盈入库',
    Entity:[
      {
        FMATERIALID:{'FNumber':'SKU12345'},
        STOCKID:{'FNumber':'WH001'},
        STOCKSTATUSID:{'FNumber':'KCZT01_SYS'},
        LOT:{'FNumber':'BATCH001'},
        PRODUCEDATE:'2023-09-15',
        EXPIRYDATE:'2024-09-15',
        Qty:'100'
      }
    ]
  }
}

通过轻易云的数据集成平台,这些转换后的数据将通过API接口发送到金蝶云星空,实现无缝对接和写入。 企业微信与OA系统接口开发配置