数据转换与写入:高效实现钉钉数据至MySQL

  • 轻易云集成顾问-陈洁琳

钉钉数据集成到MySQL:对账系统--货品价格(产品调价)

在企业信息化系统中,数据的高效传递和实时处理是关键环节。本文将分享通过轻易云数据集成平台实现钉钉与MySQL的数据对接案例,该方案聚焦于针对"对账系统--货品价格(产品调价)"的具体实施。

为确保任务顺利进行,我们需要应对以下技术挑战:

  1. 从钉钉接口定时采集数据

    • 利用v1.0/yida/processes/instances API,从钉钉获取最新的货品价格及其变动记录。
  2. 大规模数据写入 MySQL

    • 通过 execute API,实现大量数据快速、高效地批量写入 MySQL 数据库。
  3. 解决分页与限流问题

    • 因为API请求次数受限,需要合理设计分页逻辑和限流策略,以稳定和连续地抓取所有必要的数据避免遗漏。
  4. 自定义转换逻辑适应业务需求

    • 在传输过程中,对来自钉钉的数据进行必要转换,使其符合MySQL数据库结构要求,包括字段映射及格式调整等操作。
  5. 监控与告警机制保障任务执行

    • 配置集中监控和实时告警功能,随时跟踪各个API调用过程中的状态变化、性能指标以及潜在异常情况,实现高透明度管理。
  6. 异常处理及重试机制的实现

    • 当API调用或数据库操作遇到错误时,通过完善的重试机制及时恢复正常流程,减少人工干预。
  7. 保证整体解决方案简洁直观,可视化管理整个流程进程,提高操作效率并降低维护难度。

上述方案不仅囊括了对接过程中的核心技术点,还辅以实际运作细节提升可靠性。在下一步内容中,我们将详细探讨每一项技术要点如何具象落地,并分享一些实操经验技巧。如如何优化API接口性能、加强网络安全通信,以及处理各种突发状况下保持服务可用性等实际问题。 钉钉与WMS系统接口开发配置

调用钉钉接口获取并加工数据

在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何使用轻易云数据集成平台调用钉钉接口v1.0/yida/processes/instances,并对获取的数据进行初步加工。

接口配置与请求参数

首先,我们需要配置API接口及其请求参数。以下是元数据配置中的关键部分:

{
  "api": "v1.0/yida/processes/instances",
  "method": "POST",
  "number": "title",
  "id": "processInstanceId",
  "pagination": {"pageSize": 50},
  "idCheck": true,
  "request": [
    {"field": "pageSize", "label": "分页大小", "type": "string", "describe": "分页大小", "value": "50"},
    {"field": "pageNumber", "label": "分页页码", "type": "string", "describe": "分页页码", "value": "1"},
    {"field": "appType", "label": "应用ID", "type": "string", "describe": "应用ID", 
        "value":"APP_UYN987QNZ82Q4QK409VT"},
    {"field":"systemToken","label":"应用秘钥","type":"string","describe":"应用秘钥",
        "value":"DR766X813F8925E1F57YN8U6ZQFR26RQKCJFL04"},
    {"field":"userId","label":"用户的userid","type":"string","describe":"用户的userid",
        "value":"16000443318138909"},
    {"field":"language","label":"语言","type":"string","describe":"语言,取值:zh_CN:中文(默认值)en_US:英文"},
    {"field":"formUuid","label":"表单ID","type":"string","describe":"表单ID",
        "value":"FORM-5Q966D91RDWAYU08B9LR84QB6FHN3I9Q9ZTHL5"},
    {"field":"searchFieldJson","label":"条件","type":"object","children":[
      {"field":"selectField_llkks8u6","label":"调整类型","type":"string",
        "value":"产品调价(产品中心)"},
      {"parent":"searchFieldJson","label":"品牌","field":"selectField_lmlugdfx",
        "type":"string"},
      {"parent":"searchFieldJson","label":"三级分类","field":
        "selectField_lnfs8m04","type":
        "string"}]},
    {"field":"createFromTimeGMT","label":
        "创建时间起始值","type":
        "string",
        "describe":
        "创建时间起始值",
        "value":
        "2024-03-20 00:00:00"},
    {"field":
        "createToTimeGMT",
        "label":
        "创建时间终止值",
        "type":
        "string",
        "describe":
        "创建时间终止值",
        "value":
        "{{CURRENT_TIME|datetime}}"},
    {"field":
        "modifiedFromTimeGMT",
        "label":
        "修改时间起始值",
        "type":
        "string",
        "describe":
        "修改时间起始值"},
    {"field":
        "modifiedToTimeGMT",
        "label":
        "修改时间终止值",
        "type":
        "string",
        "describe":
           修改时间终止值},
    {"field":
           taskId,
           label:
           任务ID,
           type:
           string,
           describe:
           任务ID},
    {"field":
           instanceStatus,
           label:
           实例状态,
           type:
           string,
           describe:
            实例状态,
            value:
            COMPLETED},
    {"field":
            approvedResult,
            label:
            流程审批结果,
            type:
            string,
            describe:
            流程审批结果,
            value:
            agree}
]
}

请求参数解析

  1. 基本参数

    • pageSizepageNumber 用于控制分页。
    • appTypesystemToken 是用于身份验证的关键参数。
    • userId 表示请求发起者的用户ID。
  2. 语言设置

    • language 参数可以设置为中文(zh_CN)或英文(en_US),默认为中文。
  3. 表单和查询条件

    • formUuid 指定了要查询的表单ID。
    • searchFieldJson 包含了具体的查询条件,如调整类型、品牌和三级分类。
  4. 时间范围

    • createFromTimeGMTcreateToTimeGMT 用于指定创建时间范围。
    • modifiedFromTimeGMTmodifiedToTimeGMT 用于指定修改时间范围。
  5. 其他过滤条件

    • taskId, instanceStatus, 和 approvedResult 用于进一步过滤实例状态和审批结果。

数据请求与清洗

在发送请求后,我们会得到一个包含多个实例数据的响应。接下来需要对这些数据进行清洗和初步加工,以便后续处理。以下是一个简单的数据清洗示例:

import requests
import json

# 配置API URL和请求头
url = 'https://api.dingtalk.com/v1.0/yida/processes/instances'
headers = {
    'Content-Type': 'application/json',
}

# 构建请求体
payload = {
    'pageSize': '50',
    'pageNumber': '1',
    'appType': 'APP_UYN987QNZ82Q4QK409VT',
    'systemToken': 'DR766X813F8925E1F57YN8U6ZQFR26RQKCJFL04',
    'userId': '16000443318138909',
    'language': 'zh_CN',
    'formUuid': 'FORM-5Q966D91RDWAYU08B9LR84QB6FHN3I9Q9ZTHL5',
    'searchFieldJson': {
        'selectField_llkks8u6': '产品调价(产品中心)'
        # 可以添加更多条件
    },
    'createFromTimeGMT': '2024-03-20 00:00:00',
    'createToTimeGMT': '{{CURRENT_TIME|datetime}}',
}

# 发起POST请求
response = requests.post(url, headers=headers, data=json.dumps(payload))

# 检查响应状态码
if response.status_code == 200:
    data = response.json()

    # 数据清洗示例:提取所需字段并转换格式
    cleaned_data = []

    for instance in data['data']:
        cleaned_instance = {
            'title': instance['title'],
            'processInstanceId': instance['processInstanceId'],
            # 添加更多需要提取的字段
        }
        cleaned_data.append(cleaned_instance)

else:
    print(f'Error: {response.status_code}')

小结

通过上述步骤,我们成功调用了钉钉接口并对返回的数据进行了初步清洗。这一步骤为后续的数据转换与写入奠定了基础。在实际操作中,可以根据业务需求进一步优化和扩展数据处理逻辑,以确保数据集成过程高效且准确。 如何开发钉钉API接口

数据转换与写入目标平台的技术实现

在数据集成生命周期的第二步中,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台 MySQL API 接口所能够接收的格式,并最终写入目标平台。以下是具体的技术实现过程及相关细节。

1. 数据请求与清洗

首先,我们需要从源平台获取原始数据,并对其进行必要的清洗和预处理。这一步骤确保数据的准确性和一致性,为后续的数据转换打下基础。

{
  "api": "execute",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "field": "main_params",
      "label": "主参数",
      "type": "object",
      "children": [
        {"field": "change_type", "label": "变更类型", "type": "string", "value": "B"},
        {"field": "brand", "label": "品牌", "type": "string", "value": "{selectField_lmlugdfx}"},
        {"field": "supplier_code", 
         "label": "供应商编码", 
         "type": "string", 
         "value": "_findCollection find textField_ln2uyh3e from 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 where textField_lfjcloll={{tableField_lnedzxul_textField_lnedzxum}}"
        },
        {"field": "goods_code", 
         "label": "货品编码", 
         "type": "string", 
         "value": "{{tableField_lnedzxul_textField_lnedzxum}}"
        },
        {"field": "goods_name", 
         "label": "货品名称", 
         "type": "string", 
         "value":"{{tableField_lnedzxul_selectField_lnedzxun}}"
        },
        {"field":"price","label":"实际结算价格","type":"float","value":"{{tableField_lnedzxul_numberField_lnoes31u}}","default":"abc"},
        {"field":"distrib_price","label":"分销价格","type":"float","value":"{{tableField_lnedzxul_numberField_lneeit5z}}"},
        {"field":"min_price","label":"最低售价","type":"float","value":"{{tableField_lnedzxul_numberField_lneeit62}}"},
        {"field":"class_a","label":"一级分类","type":"string","value":"厨卫成品"},
        {"field":"class_b","label":"二级分类","type":"string","value":"{selectField_lmlugdg3}"},
        {"field":"class_c","label":"三级分类","type":"string","value":"_findCollection find selectField_lfjclolt from 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 where textField_lfjcloll={{tableField_lnedzxul_textField_lnedzxum}}"},
        {"field":"source_Id","label":"系统来源","type":"int","value":"5"},
        {"field":"effective_time",
         "label":"生效日期",
         "type" :"datetime",
         `value`:"_function FROM_UNIXTIME(  ( {dateField_lothx6po} \/ 1000 )  ,'%Y-%m-%d' )"
        },
        {"field" :"create_time",
          `label`:"创建日期",
          `type`:"datetime",
          `value`:"_function DATE_FORMAT('{gmtCreate}','%Y-%m-%d 00:00:00')"
       },
       {`field`:"create_by",
          `label`:"创建人",
          `type`:"int",
          `value`:"1"
       },
       {`field`:"status",
          `label`:"状态",
          `type`:"int"
       },
       {`field`:"approve_status",
          `label`:"审核状态",
          `type`:"int"
       },
       {`field`:"brand_coefficient`,
           `label`:``品牌系数`,
           `type`:``float`,
           ``value`:``_mongoQuery 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 findField=content.numberField_lfjclona where={\"content.textField_lfjcloll\":{\"$eq\":\"{{tableField_lhykr1el_textField_lhykr1en}}\"}}
       }
     ]
   }
 ]
}

2. 数据转换

在数据清洗完成后,下一步是将数据转换为目标平台 MySQL API 接口能够接收的格式。我们需要根据元数据配置中的字段定义,将各个字段映射到相应的数据库表字段中。

{
  `"otherRequest":[
    {
      `"field"`:`"main_sql"`,
      `"label"`:`"主语句"`,
      `"type"`:`"string"`,
      `"value"`:`INSERT INTO \``lhhy_srm\``.`goods_price`\n(\n\``change_type`\`,\n\``brand`\`,\n\``supplier_code`\`,\n\``goods_code`\`,\n\``goods_name`\`,\n\``price`\`,\n\``distrib_price`\`,\n\``min_price`\`,\n\``brand_coefficient`\`,\n\``class_a`\`,\n\``class_b`\`,\n\``class_c`\`,\n\``source_Id`\`, \n \ ``effective_time`\`,`create_time`,`create_by`,`status`,`approve_status`) VALUES (\n<{change_type: }>,<{brand: }>,<{supplier_code: }>,<{goods_code: }>,<{goods_name: }>,<{price: }>,<{distrib_price: }>,<{min_price: }>,<{brand_coefficient: }>,<{class_a: }>,<{class_b: }>,<{class_c: }>,<{source_Id: }> , <{effective_time}> , <{create_time}> , <{create_by}> , <{status}> , <{approve_status}>);`
    }
   ],
   `"buildModel"`:`true`
}

3. 数据写入

最后一步是将转换后的数据通过 MySQL API 接口写入目标数据库。在这里,我们使用 SQL 插入语句将数据插入到指定的表中。

INSERT INTO `lhhy_srm`.`goods_price`
(
    `change_type`,
    `brand`,
    `supplier_code`,
    `goods_code`,
    `goods_name`,
    `price`,
    `distrib_price`,
    `min_price`,
    `brand_coefficient`,
    `class_a`,
    `class_b`,
    `class_c`,
    `source_Id`,
    `effective_time`,
    `create_time`,
    `create_by`,
    `status`,
    `approve_status`
)
VALUES
(
    'B',
    '{selectField_lmlugdfx}',
    '_findCollection find textField_ln2uyh3e from 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 where textField_lfjcloll={{tableField_lnedzxul_textField_lnedzxum}}',
    '{{tableField_lnedzxul_textField_lnedzxum}}',
    '{{tableField_lnedzxul_selectField_lnedzxun}}',
    '{{tableField_lnedzxul_numberField_lnoes31u}}',
    '{{tableField_lnedzxul_numberField_lneeit5z}}',
    '{{table_field.lned.zx.ul.number.Field.lnee.it62}}',
     '_mongoQuery 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 find.Field=content.number.Field.lfj.clona where={\"content.text.Field.lfj.cloll\":{\"$eq\":\"{{table.Field.lhy.kr1.el.text.Field.lhy.kr1.en}\"}',
     '厨卫成品',
     '{select.Field.lml.ugd.g3}',
     '_find.Collection find select.Field.lfj.clolt from 68e141c6-4351-3f2f-b9a2-5eaee8.f01.a55 where text.Field.lfj.cloll={{table.Field.lne.dz.xu.l.text.Field.lne.dz.xu.m}}',
     '5',
     '_function FROM_UNIXTIME( ( {date.Field.lo.thx.6.po} /1000 ) ,'%Y-%m-%d' )',
     '_function DATE_FORMAT('{gmt.Create}','%Y-%m-%d 00:00:00')',
     '1',
     '1'
);

钉钉与CRM系统接口开发配置