ETL技术:从MySQL到金蝶云星空的数据转换与写入

  • 轻易云集成顾问-曹润

MySQL数据集成到金蝶云星空的实战案例分享

在当今复杂多变的数据环境中,如何高效地实现各系统之间的数据无缝对接,是每个技术团队面临的重要挑战。本文将集中分享一个具体案例:MySQL数据库中的生产汇报单数据(方案名称:MOM-SCHB-生产汇报单新增-XSL)如何通过轻易云数据集成平台,高效集成至金蝶云星空。

系统架构概述

本次集成过程中,我们充分利用了轻易云的平台特性,如高吞吐量的数据写入能力和实时监控等功能。首先,通过MySQL提供的select接口定期抓取生产汇报单的新增数据,并借助其强大的批处理机制,确保大量数据能够快速且稳定地传输至金蝶云星空。另外,由于两者存储结构和格式存在一定差异,我们自定义了适应业务需求的数据转换逻辑,以满足最终映射要求。

数据流设计与监控

使用轻易云提供的可视化数据流设计工具,我们打造了一条清晰直观且易于管理的数据流。不仅如此,该平台还具备强大的集中监控和告警系统,使我们可以实时跟踪整个任务执行过程中的状态及性能。一旦发现任何异常情况,系统会立刻发出告警,有助于及时调整策略以保障数据传输的连续性与准确性。

关键技术要点解析

  1. API调用与分页处理

    • MySQL select API: 用于从源数据库中提取增量更新所需的信息。
    • 分页查询与限流控制:为了优化性能并避免网络堵塞,对大规模记录进行分批拉取处理。
  2. 批量写入至金蝶云星空

    • 使用batchSave API,将转换后的大量数据迅速推送到目标系统。这不仅提升了整体效率,同时也有效减少了接口请求次数,大幅节省资源消耗。
  3. 解决格式差异与异常重试机制配置

    • 针对不同系统间的数据格式不一致问题,通过配置灵活转换规则使得各字段顺利映射。
    • 在遇到失败或错误时,可自动触发重试机制,有效提高任务成功率并保障业务连续性。

透过以上一系列步骤,本项目成功实现了MySQL数据库向金蝶云星空的大规模、高可靠性的自动化集成。在实际操作过程中,不同阶段出现的问题都能依托完备的平台工具体系得到及时解决,从而保证整个流程顺畅运行。后续文章将详细展开每个环节中的具体实施细节,以及相关 如何对接用友BIP接口

调用MySQL接口select获取并加工数据的技术案例

在数据集成生命周期的第一步中,调用源系统MySQL接口select获取并加工数据是至关重要的。本文将深入探讨如何通过轻易云数据集成平台配置元数据来实现这一过程。

元数据配置解析

首先,我们需要理解元数据配置中的各个字段及其作用。以下是关键字段的详细解析:

  • api: 指定API类型,这里为select,表示进行查询操作。
  • effect: 操作类型,这里为QUERY,表示查询。
  • method: HTTP请求方法,这里为POST
  • id: 数据源标识符,这里为sourceid
  • request: 请求参数列表,包含主参数(main_params),用于SQL语句中的动态字段绑定。
  • otherRequest: 其他请求参数列表,包含主SQL语句(main_sql)。

主参数配置

主参数(main_params)包含两个子参数:限制结果集返回的行数(limit)和偏移量(offset)。这些参数用于分页查询,以提高查询效率和可控性。

{
  "field": "limit",
  "label": "限制结果集返回的行数",
  "type": "int",
  "describe": "必要的参数!LIMIT 子句用于限制查询结果返回的行数。",
  "value": "{PAGINATION_PAGE_SIZE}"
},
{
  "field": "offset",
  "label": "偏移量",
  "type": "int",
  "describe": "OFFSET 子句用于指定查询结果的起始位置或偏移量。",
  "value": "{PAGINATION_START_ROW}"
}

主SQL语句配置

主SQL语句(main_sql)使用动态字段绑定,以确保字段与请求参数一一对应。以下是具体的SQL语句示例:

select
t1.header_id as sourceid,
CONCAT('MSCHB', DATE_FORMAT(t1.transaction_date, '%Y%m%d'), t1.header_id) as '单据编号',
ifnull((select aft_quiet_time from ty_mes.mt_account_period where t1.transaction_date < aft_quiet_time order by account_period_id desc limit 1), t1.transaction_date) as '单据日期',
t2.attribute2 as '生产订单主键',
t2.attribute3 as '生产订单明细主键',
SUBSTRING_INDEX(t1.wo_number, '_', 1) as '生产订单号',
SUBSTRING_INDEX(t1.wo_number, '_', -1) as '生产订单行号',
t1.completed_qty as '完成数量',
t1.ok_qty as '合格数量',
t1.start_date as '开始时间',
t1.end_date as '结束时间',
t1.manual_time as '工时',
t1.time_uom as '时间单位',
t2.material_code as '物料编码',
ifnull(t3.value, '15040501') as '生产车间',
t2.manufacturing_site_code as '生产组织',
t4.value as '生产汇报类型'
from ty_mes.hme_wo_report_process_itf t1
left join ty_aps.hps_make_order_iface t2 on t1.tenant_id = t2.tenant_id and t1.wo_number = t2.make_order_num
left join hzero_platform.hpfm_lov_value t3 on t2.department = t3.meaning and t3.lov_code = 'TY.KINGDEE.WORKSHOP' and t2.manufacturing_site_code = t3.tag and t3.enabled_flag = 1
left join hzero_platform.hpfm_lov_value t4 on t2.manufacturing_site_code = t4.meaning and t4.lov_code = 'MOM_KINGDEE_SCHB_TYPE' and t4.enabled_flag = 1
where t1.tenant_id = 7
and t1.iface_sequence = 4
and t1.`STATUS` in ('N', 'E')
limit :limit offset :offset

参数绑定与执行

在执行查询之前,需要将请求参数与SQL语句中的占位符进行绑定。通过这种方式,可以提高查询语句的可读性和维护性,并确保动态字段与请求参数正确对应。

例如,将:limit替换为实际值:

limit {PAGINATION_PAGE_SIZE} offset {PAGINATION_START_ROW}

实际应用案例

假设我们需要从MySQL数据库中获取某个租户ID为7、接口序列为4且状态为'N'或'E'的数据,并分页显示每页10条记录,从第20条记录开始。我们可以这样配置:

{
  "api": "select",
  "effect": "QUERY",
  "method": "POST",
  "id": "sourceid",
  "request": [
    {
      "field": "main_params",
      "label": "主参数",
      ...
      {
        ...
        {
          ...
          {
            ...
            {
              ...
              {
                ...
                {
                  ...
                  {
                    ...
                    {
                      ...
                      {
                        ...
                        {
                          ...
                          {
                            ...
                            {
                              ...
                              {
                                ...
                                {
                                  ...
                                  {
                                    ...
                                    {
                                      ...
                                      {
                                        ...
                                        {
                                          ...
                                          }
                                        }
                                      }
                                    }
                                  }
                                }
                              }
                            }
                          }
                        }
                      }
                    }
                  }
                }
              }
            }
          }
        },
        {"field":"offset","label":"偏移量","type":"int","describe":"OFFSET 子句用于指定查询结果的起始位置或偏移量。","value":"20"}
      ]
    },
    {"field":"main_sql","label":"主SQL语句","type":"string","describe":"...","value":"..."}
  ],
}

通过上述配置,我们可以高效地从MySQL数据库中获取并加工所需的数据,实现不同系统间的数据无缝对接。这一步骤不仅是数据集成生命周期的重要环节,也是确保数据准确性和完整性的基础。 数据集成平台可视化配置API接口

使用轻易云数据集成平台进行ETL转换并写入金蝶云星空API接口的技术案例

在数据集成生命周期的第二步中,我们将已经集成的源平台数据进行ETL(提取、转换、加载)转换,以适配目标平台金蝶云星空API接口的格式,并最终将数据写入目标平台。以下是详细的技术实现过程。

1. 配置元数据

根据提供的元数据配置,我们需要将源数据转换为金蝶云星空API接口所能接收的格式。以下是关键字段及其配置:

  • api: "batchSave"
  • method: "POST"
  • number: "FBillNo"
  • id: "FID"
  • name: "FBillNo"
  • idCheck: true

请求字段配置如下:

[
    {"field":"FBillNo","label":"单据编号","type":"string","describe":"单据编号","value":"{{单据编号}}"},
    {"field":"FBillType","label":"单据类型","type":"string","describe":"单据类型","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"SCHBD01_SYS"},
    {"field":"FDate","label":"单据日期","type":"string","describe":"单据日期","value":"{{单据日期}}"},
    {"field":"FPrdOrgId","label":"生产组织","type":"string","describe":"生产组织","parser":{"name":"ConvertObjectParser","params":"FNumber"},"value":"{{生产组织}}"},
    {"field":"FWorkshipIdH","label":"生产车间","type":"string","describe":"生产车间","parser":{"name":"ConvertObjectParser","params":"FNumber"}},
    {"field":"FDescription","label":"备注","type": "string", "describe": "备注", "value": "轻易云"}
]

2. 数据转换与写入

在进行ETL转换时,需要特别注意以下几点:

  1. 字段映射和解析:某些字段需要通过ConvertObjectParser进行解析,例如FBillTypeFPrdOrgId等。这些字段通常需要将源系统中的值转换为目标系统可识别的编码。

  2. 数组和嵌套结构处理:例如,FEntity字段是一个数组,包含多个子字段。每个子字段也可能包含复杂的数据结构,需要逐一处理。

  3. 关联关系表处理:例如,FEntity_Link字段用于描述与源单的关联关系,需要确保每个子字段都正确映射和填充。

以下是一个示例请求体,展示了如何将源数据转换为金蝶云星空API所需的格式:

{
    "FormId": "PRD_MORPT",
    "IsAutoSubmitAndAudit": true,
    "IsVerifyBaseDataField": true,
    "Operation": "Save",
    "Model": {
        "FBillNo": "{{单据编号}}",
        "FBillType": {
            "FNumber": "SCHBD01_SYS"
        },
        "FDate": "{{单据日期}}",
        "FPrdOrgId": {
            "FNumber": "{{生产组织}}"
        },
        ...
        "FEntity": [
            {
                "FSrcEntryId": "{{生产订单明细主键}}",
                ...
                "FMOMAINENTRYID": "{{生产订单明细主键}}",
                ...
                "FEntity_Link": [
                    {
                        "FEntity_Link_FRuleId": {
                            "Value": "PRD_MO2MORPT"
                        },
                        ...
                    }
                ]
            }
        ]
    }
}

3. API调用与错误处理

使用POST方法调用金蝶云星空API,将转换后的数据发送到目标平台。在实际应用中,还需要考虑错误处理机制,例如:

  • 验证返回结果:检查API返回结果中的状态码和消息,确保操作成功。
  • 重试机制:在网络异常或服务器错误时,设置重试机制以提高可靠性。
  • 日志记录:记录每次API调用的请求和响应,以便后续排查问题。
import requests
import json

url = 'https://api.kingdee.com/batchSave'
headers = {'Content-Type': 'application/json'}
data = {
    # 上述示例请求体内容
}

response = requests.post(url, headers=headers, data=json.dumps(data))

if response.status_code == 200:
    print("Data successfully saved to Kingdee Cloud.")
else:
    print(f"Failed to save data: {response.text}")

通过上述步骤,我们可以实现从源系统到金蝶云星空平台的数据无缝对接,并确保数据准确性和一致性。这种方法不仅提高了业务流程自动化程度,还显著提升了整体效率。 金蝶与CRM系统接口开发配置

更多系统对接方案