从金蝶云到MySQL:详细解析数据集成与提高效率的技术手段

  • 轻易云集成顾问-胡秀丛

金蝶云星空数据集成到MySQL:实际案例分享

在现代企业环境中,如何高效、可靠地进行系统对接和数据集成一直是一个重要课题。本文将聚焦于一项具体的技术实施案例,即通过轻易云数据集成平台,将金蝶云星空的销售出库审核中数据拉取并存储到MySQL数据库中。本次操作方案名称为MOM-XSCK-T02.01销售出库审核中数据拉取-表头存储过程——样本勿删。

首先,我们需要从金蝶云星空获取销售出库审核中的相关业务数据。这一步骤通过调用API接口executeBillQuery来完成。在实现这一目标时,必须考虑以下几个关键点:

  1. 定时可靠的数据抓取: 为确保销售出库审核中的实时性和完整性,我们设计了一套定时任务机制。该机制能够准时触发API调用,从而按预设频率抓取最新的业务数据。

  2. 分页与限流处理: 由于接口存在分页及流量限制,我们采用了分批次、多轮请求的方法来获取全量的数据。这不仅能避免因单次请求量过大导致的API超载问题,还能提高整体抓取效率。

其次,当我们成功获取到金蝶云星空的数据后,需要将其写入到MySQL数据库内。此环节使用了专门定义的API getbackexecute进行批量写入操作,在这过程中要特别关注以下几点:

  1. 自定义转换逻辑: 金蝶云星空的数据结构可能与MySQL有所差异。因此,在写入之前,需要进行必要的数据转换,以适配特定的业务需求和数据库结构。

  2. 快速高效的大量写入支持: MySQL 数据库存储能力较强,但在面对大量插入操作时依然可能出现性能瓶颈。因此,本方案优化了大规模并行插入策略,使得海量业务数据能够在短时间内高效完成存储任务。

  3. 异常处理与错误重试机制: 在实际生产环境中,不可避免会遇到网络波动或偶发性的系统故障,为此,我们构建了一套完善的异常捕获及自动重试机制,用以保障整个流程平稳运行,并最大程度减少因意外情况导致的数据丢失风险。

除了上述步骤,本案例还集成了集中监控和告警系统,以实时跟踪每一次任务执行状态及性能表现。一旦检测到潜在异常,该系统能够迅速发送告警通知,方便运营人员及时排查解决问题。

总之,通过针对性的技术手段和周密配置,此方案实现了对金蝶云 如何开发金蝶云星空API接口

调用金蝶云星空接口executeBillQuery获取并加工数据

在数据集成的生命周期中,第一步是从源系统获取数据。本文将深入探讨如何使用轻易云数据集成平台调用金蝶云星空的executeBillQuery接口来获取销售出库审核中的数据,并对其进行初步加工。

接口配置与请求参数

我们使用的接口是executeBillQuery,其主要功能是查询销售出库单的相关信息。该接口采用POST方法进行调用,以下是具体的元数据配置:

{
  "api": "executeBillQuery",
  "effect": "QUERY",
  "method": "POST",
  "number": "fbillno",
  "id": "fid",
  "request": [
    {"field": "fid", "label": "主键", "type": "string", "value": "fid"},
    {"field": "fbillno", "label": "销售出库单号", "type": "string", "value": "fbillno"},
    {"field": "FDocumentStatus", "label": "状态", "type": "string", 
        "describe":"B审核中C已审核D重新审核","value":"FDocumentStatus"},
    {"field": "FSaleOrgId", "label": "销售组织", 
        "type":"string","value":"FSaleOrgId.fnumber"},
    {"field":"fdate","label":"日期","type":"string","value":"fdate"},
    {"field":"FStockOrgId","label":"发货组织","type":"string","value":"FStockOrgId.fnumber"},
    {"field":"FCustomerID","label":"客户代码","type":"string","value":"FCustomerID.fnumber"},
    {"field":"FCustomerID_name","label":"客户名称","type":"string","value":"FCustomerID.fname"},
    {"field":"FCarrierID","label":"承运商","type":"string","value":"FCarrierID.fnumber"},
    {"field":"FCarriageNO","label":"运输单号","type":"string","value":"FCarriageNO"},
    {"field":"FSalesManID","label":"销售员","type":"string","value":"FSalesManID.fname"},
    {"field":"F_MNote","label":"备注","type":"string","value":"FMNote"},
    {"field":"FConsignee","label":"收货人","type":"string","value":"FConsignee"},
    {"field":"FCreatorId_fname","label":"创建人","type":"","value":
        "FCreatorId.fname"},
    {"field":
        "FCreateDate",
        "label":
        "创建日期",
        "type":
        "string",
        "value":
        "FCreateDate"
    },
    {
      ...

数据请求与清洗

在调用接口时,我们需要传递一系列参数,这些参数定义了我们希望获取的数据字段及其过滤条件。例如:

  • FormId: 表单ID,必须填写金蝶的表单ID如:SAL_OUTSTOCK
  • FilterString: 用于筛选数据的条件,例如:FStockOrgId.fnumber in ('T02.01', 'T02') and FCreatorId.fname <> 'MomUser' and FDocumentStatus='B' and F_FSYNCMOM=0

通过这些参数,我们可以精确地控制查询结果,确保只获取到符合业务需求的数据。

数据转换与写入

在获取到原始数据后,我们需要对其进行转换和清洗,以便后续处理。例如,将日期格式统一、去除无效字符、合并重复记录等。这一步骤可以通过轻易云平台提供的可视化工具来完成,大大简化了操作复杂度。

以下是一个示例请求体:

{
  ...
  {
      ...
      {
        ...
        {
          ...
          {
            ...
            {
              ...
              {
                ...
                {
                  ...
                  {
                    ...
                    {
                      ...
                      {
                        ...
                        {
                          ...
                          {
                            ...
                            {
                              ...
                              {
                                ...
                                {
                                  ...
![打通用友BIP数据接口](https://pic.qeasy.cloud/S7.png~tplv-syqr462i7n-qeasy.image)
### 使用轻易云数据集成平台进行ETL转换并写入MySQLAPI接口

在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台 MySQL API 接口所能够接收的格式,最终写入目标平台。以下是一个详细的技术案例,展示如何使用轻易云数据集成平台完成这一过程。

#### 元数据配置解析

首先,我们来看一下元数据配置。该配置定义了如何将数据从源系统转换并写入目标 MySQL 平台。以下是关键字段的解析:

- `api`: 指定调用的API名称,这里是`getbackexecute`。
- `effect`: 表示执行操作类型,这里是`EXECUTE`。
- `method`: HTTP请求方法,这里是`POST`。
- `idCheck`: 是否进行ID校验,这里为`true`。
- `request`: 包含具体的数据字段和其映射关系。

#### 数据字段映射

在`request`部分,我们定义了多个字段及其对应的值。每个字段都有以下属性:

- `field`: 字段名。
- `label`: 字段标签。
- `type`: 字段类型,如字符串、对象等。
- `value`: 该字段的值,可以是静态值或动态值(如从源系统获取)。

以下是几个关键字段及其配置:

```json
{
  "field": "INSTRUCTION_DOC_ID",
  "label": "INSTRUCTION_DOC_ID",
  "type": "string",
  "value": "{fid}"
}

这个字段表示指令文档ID,其值来自源系统中的fid

{
  "field": "SITE_ID",
  "label": "站点ID",
  "type": "string",
  "value": "_function case '{FStockOrgId}' when 'T02.01' then '11001' when 'T02' then '8001' else '' end"
}

这个字段表示站点ID,其值通过一个函数根据不同条件进行转换。

SQL存储过程调用

在元数据配置中,还定义了一个存储过程调用:

{
  "field": "main_sql",
  "label": "main_sql",
  "type": "string",
  "describe": "111",
  "value": "call ty_mes.xsck(:INSTRUCTION_DOC_ID,:TENANT_ID,:INSTRUCTION_DOC_NUM,:INSTRUCTION_DOC_TYPE,:INSTRUCTION_DOC_STATUS,:SITE_ID,:SUPPLIER_ID,:SUPPLIER_SITE_ID,:CUSTOMER_ID,:CUSTOMER_SITE_ID,:SOURCE_ORDER_TYPE,:SOURCE_ORDER_ID,:DEMAND_TIME,:COST_CENTER_ID,:PERSON_ID,:IDENTIFICATION,:REMARK,:REASON,:SOURCE_SYSTEM,:LATEST_HIS_ID,:CID,:CREATION_DATE,:LAST_UPDATE_DATE,:CONTACT_PERSON,:CREATED_BY,:CREATE_DATE,:PO_NUMBER,:SO_NUMBER,:SO_FID,:is_successx)"
}

这个存储过程ty_mes.xsck会被调用,并传递多个参数,这些参数对应于前面定义的字段。

实现步骤

  1. 数据提取:从源系统提取原始数据,确保所有必要字段都已获取。
  2. 数据清洗与转换:根据元数据配置,对提取的数据进行清洗和转换。例如,将源系统中的日期格式转换为目标系统所需的格式。
  3. 构建请求:根据元数据配置构建HTTP POST请求,将清洗后的数据填充到请求体中。
  4. 执行存储过程:通过API调用执行存储过程,将处理后的数据写入MySQL数据库。

示例代码

以下是一个简化版的Python代码示例,展示如何实现上述步骤:

import requests
import json

# 定义API URL和头信息
api_url = 'http://example.com/api/getbackexecute'
headers = {'Content-Type': 'application/json'}

# 构建请求体
payload = {
    'main_params': {
        'INSTRUCTION_DOC_ID': source_data['fid'],
        'TENANT_ID': '7',
        'INSTRUCTION_DOC_NUM': source_data['fbillno'],
        # ...其他字段...
    },
    'main_sql': 'call ty_mes.xsck(:INSTRUCTION_DOC_ID, :TENANT_ID, :INSTRUCTION_DOC_NUM, ...)'
}

# 发起POST请求
response = requests.post(api_url, headers=headers, data=json.dumps(payload))

# 检查响应状态
if response.status_code == 200:
    print('Data successfully written to MySQL API')
else:
    print('Failed to write data:', response.text)

通过上述步骤和代码示例,我们可以高效地将源平台的数据经过ETL转换后写入目标MySQL平台,实现不同系统间的数据无缝对接。 打通企业微信数据接口