钉钉数据集成到MySQL:对账系统--货品价格(产品调价)
在企业信息化系统中,数据的高效传递和实时处理是关键环节。本文将分享通过轻易云数据集成平台实现钉钉与MySQL的数据对接案例,该方案聚焦于针对"对账系统--货品价格(产品调价)"的具体实施。
为确保任务顺利进行,我们需要应对以下技术挑战:
-
从钉钉接口定时采集数据:
- 利用
v1.0/yida/processes/instances
API,从钉钉获取最新的货品价格及其变动记录。
- 利用
-
大规模数据写入 MySQL:
- 通过
execute
API,实现大量数据快速、高效地批量写入 MySQL 数据库。
- 通过
-
解决分页与限流问题:
- 因为API请求次数受限,需要合理设计分页逻辑和限流策略,以稳定和连续地抓取所有必要的数据避免遗漏。
-
自定义转换逻辑适应业务需求:
- 在传输过程中,对来自钉钉的数据进行必要转换,使其符合MySQL数据库结构要求,包括字段映射及格式调整等操作。
-
监控与告警机制保障任务执行:
- 配置集中监控和实时告警功能,随时跟踪各个API调用过程中的状态变化、性能指标以及潜在异常情况,实现高透明度管理。
-
异常处理及重试机制的实现:
- 当API调用或数据库操作遇到错误时,通过完善的重试机制及时恢复正常流程,减少人工干预。
-
保证整体解决方案简洁直观,可视化管理整个流程进程,提高操作效率并降低维护难度。
上述方案不仅囊括了对接过程中的核心技术点,还辅以实际运作细节提升可靠性。在下一步内容中,我们将详细探讨每一项技术要点如何具象落地,并分享一些实操经验技巧。如如何优化API接口性能、加强网络安全通信,以及处理各种突发状况下保持服务可用性等实际问题。
调用钉钉接口获取并加工数据
在数据集成的生命周期中,调用源系统接口是至关重要的一步。本文将深入探讨如何使用轻易云数据集成平台调用钉钉接口v1.0/yida/processes/instances
,并对获取的数据进行初步加工。
接口配置与请求参数
首先,我们需要配置API接口及其请求参数。以下是元数据配置中的关键部分:
{
"api": "v1.0/yida/processes/instances",
"method": "POST",
"number": "title",
"id": "processInstanceId",
"pagination": {"pageSize": 50},
"idCheck": true,
"request": [
{"field": "pageSize", "label": "分页大小", "type": "string", "describe": "分页大小", "value": "50"},
{"field": "pageNumber", "label": "分页页码", "type": "string", "describe": "分页页码", "value": "1"},
{"field": "appType", "label": "应用ID", "type": "string", "describe": "应用ID",
"value":"APP_UYN987QNZ82Q4QK409VT"},
{"field":"systemToken","label":"应用秘钥","type":"string","describe":"应用秘钥",
"value":"DR766X813F8925E1F57YN8U6ZQFR26RQKCJFL04"},
{"field":"userId","label":"用户的userid","type":"string","describe":"用户的userid",
"value":"16000443318138909"},
{"field":"language","label":"语言","type":"string","describe":"语言,取值:zh_CN:中文(默认值)en_US:英文"},
{"field":"formUuid","label":"表单ID","type":"string","describe":"表单ID",
"value":"FORM-5Q966D91RDWAYU08B9LR84QB6FHN3I9Q9ZTHL5"},
{"field":"searchFieldJson","label":"条件","type":"object","children":[
{"field":"selectField_llkks8u6","label":"调整类型","type":"string",
"value":"产品调价(产品中心)"},
{"parent":"searchFieldJson","label":"品牌","field":"selectField_lmlugdfx",
"type":"string"},
{"parent":"searchFieldJson","label":"三级分类","field":
"selectField_lnfs8m04","type":
"string"}]},
{"field":"createFromTimeGMT","label":
"创建时间起始值","type":
"string",
"describe":
"创建时间起始值",
"value":
"2024-03-20 00:00:00"},
{"field":
"createToTimeGMT",
"label":
"创建时间终止值",
"type":
"string",
"describe":
"创建时间终止值",
"value":
"{{CURRENT_TIME|datetime}}"},
{"field":
"modifiedFromTimeGMT",
"label":
"修改时间起始值",
"type":
"string",
"describe":
"修改时间起始值"},
{"field":
"modifiedToTimeGMT",
"label":
"修改时间终止值",
"type":
"string",
"describe":
修改时间终止值},
{"field":
taskId,
label:
任务ID,
type:
string,
describe:
任务ID},
{"field":
instanceStatus,
label:
实例状态,
type:
string,
describe:
实例状态,
value:
COMPLETED},
{"field":
approvedResult,
label:
流程审批结果,
type:
string,
describe:
流程审批结果,
value:
agree}
]
}
请求参数解析
-
基本参数:
pageSize
和pageNumber
用于控制分页。appType
和systemToken
是用于身份验证的关键参数。userId
表示请求发起者的用户ID。
-
语言设置:
language
参数可以设置为中文(zh_CN)或英文(en_US),默认为中文。
-
表单和查询条件:
formUuid
指定了要查询的表单ID。searchFieldJson
包含了具体的查询条件,如调整类型、品牌和三级分类。
-
时间范围:
createFromTimeGMT
和createToTimeGMT
用于指定创建时间范围。modifiedFromTimeGMT
和modifiedToTimeGMT
用于指定修改时间范围。
-
其他过滤条件:
taskId
,instanceStatus
, 和approvedResult
用于进一步过滤实例状态和审批结果。
数据请求与清洗
在发送请求后,我们会得到一个包含多个实例数据的响应。接下来需要对这些数据进行清洗和初步加工,以便后续处理。以下是一个简单的数据清洗示例:
import requests
import json
# 配置API URL和请求头
url = 'https://api.dingtalk.com/v1.0/yida/processes/instances'
headers = {
'Content-Type': 'application/json',
}
# 构建请求体
payload = {
'pageSize': '50',
'pageNumber': '1',
'appType': 'APP_UYN987QNZ82Q4QK409VT',
'systemToken': 'DR766X813F8925E1F57YN8U6ZQFR26RQKCJFL04',
'userId': '16000443318138909',
'language': 'zh_CN',
'formUuid': 'FORM-5Q966D91RDWAYU08B9LR84QB6FHN3I9Q9ZTHL5',
'searchFieldJson': {
'selectField_llkks8u6': '产品调价(产品中心)'
# 可以添加更多条件
},
'createFromTimeGMT': '2024-03-20 00:00:00',
'createToTimeGMT': '{{CURRENT_TIME|datetime}}',
}
# 发起POST请求
response = requests.post(url, headers=headers, data=json.dumps(payload))
# 检查响应状态码
if response.status_code == 200:
data = response.json()
# 数据清洗示例:提取所需字段并转换格式
cleaned_data = []
for instance in data['data']:
cleaned_instance = {
'title': instance['title'],
'processInstanceId': instance['processInstanceId'],
# 添加更多需要提取的字段
}
cleaned_data.append(cleaned_instance)
else:
print(f'Error: {response.status_code}')
小结
通过上述步骤,我们成功调用了钉钉接口并对返回的数据进行了初步清洗。这一步骤为后续的数据转换与写入奠定了基础。在实际操作中,可以根据业务需求进一步优化和扩展数据处理逻辑,以确保数据集成过程高效且准确。
数据转换与写入目标平台的技术实现
在数据集成生命周期的第二步中,我们需要将已经集成的源平台数据进行ETL转换,转为目标平台 MySQL API 接口所能够接收的格式,并最终写入目标平台。以下是具体的技术实现过程及相关细节。
1. 数据请求与清洗
首先,我们需要从源平台获取原始数据,并对其进行必要的清洗和预处理。这一步骤确保数据的准确性和一致性,为后续的数据转换打下基础。
{
"api": "execute",
"method": "POST",
"idCheck": true,
"request": [
{
"field": "main_params",
"label": "主参数",
"type": "object",
"children": [
{"field": "change_type", "label": "变更类型", "type": "string", "value": "B"},
{"field": "brand", "label": "品牌", "type": "string", "value": "{selectField_lmlugdfx}"},
{"field": "supplier_code",
"label": "供应商编码",
"type": "string",
"value": "_findCollection find textField_ln2uyh3e from 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 where textField_lfjcloll={{tableField_lnedzxul_textField_lnedzxum}}"
},
{"field": "goods_code",
"label": "货品编码",
"type": "string",
"value": "{{tableField_lnedzxul_textField_lnedzxum}}"
},
{"field": "goods_name",
"label": "货品名称",
"type": "string",
"value":"{{tableField_lnedzxul_selectField_lnedzxun}}"
},
{"field":"price","label":"实际结算价格","type":"float","value":"{{tableField_lnedzxul_numberField_lnoes31u}}","default":"abc"},
{"field":"distrib_price","label":"分销价格","type":"float","value":"{{tableField_lnedzxul_numberField_lneeit5z}}"},
{"field":"min_price","label":"最低售价","type":"float","value":"{{tableField_lnedzxul_numberField_lneeit62}}"},
{"field":"class_a","label":"一级分类","type":"string","value":"厨卫成品"},
{"field":"class_b","label":"二级分类","type":"string","value":"{selectField_lmlugdg3}"},
{"field":"class_c","label":"三级分类","type":"string","value":"_findCollection find selectField_lfjclolt from 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 where textField_lfjcloll={{tableField_lnedzxul_textField_lnedzxum}}"},
{"field":"source_Id","label":"系统来源","type":"int","value":"5"},
{"field":"effective_time",
"label":"生效日期",
"type" :"datetime",
`value`:"_function FROM_UNIXTIME( ( {dateField_lothx6po} \/ 1000 ) ,'%Y-%m-%d' )"
},
{"field" :"create_time",
`label`:"创建日期",
`type`:"datetime",
`value`:"_function DATE_FORMAT('{gmtCreate}','%Y-%m-%d 00:00:00')"
},
{`field`:"create_by",
`label`:"创建人",
`type`:"int",
`value`:"1"
},
{`field`:"status",
`label`:"状态",
`type`:"int"
},
{`field`:"approve_status",
`label`:"审核状态",
`type`:"int"
},
{`field`:"brand_coefficient`,
`label`:``品牌系数`,
`type`:``float`,
``value`:``_mongoQuery 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 findField=content.numberField_lfjclona where={\"content.textField_lfjcloll\":{\"$eq\":\"{{tableField_lhykr1el_textField_lhykr1en}}\"}}
}
]
}
]
}
2. 数据转换
在数据清洗完成后,下一步是将数据转换为目标平台 MySQL API 接口能够接收的格式。我们需要根据元数据配置中的字段定义,将各个字段映射到相应的数据库表字段中。
{
`"otherRequest":[
{
`"field"`:`"main_sql"`,
`"label"`:`"主语句"`,
`"type"`:`"string"`,
`"value"`:`INSERT INTO \``lhhy_srm\``.`goods_price`\n(\n\``change_type`\`,\n\``brand`\`,\n\``supplier_code`\`,\n\``goods_code`\`,\n\``goods_name`\`,\n\``price`\`,\n\``distrib_price`\`,\n\``min_price`\`,\n\``brand_coefficient`\`,\n\``class_a`\`,\n\``class_b`\`,\n\``class_c`\`,\n\``source_Id`\`, \n \ ``effective_time`\`,`create_time`,`create_by`,`status`,`approve_status`) VALUES (\n<{change_type: }>,<{brand: }>,<{supplier_code: }>,<{goods_code: }>,<{goods_name: }>,<{price: }>,<{distrib_price: }>,<{min_price: }>,<{brand_coefficient: }>,<{class_a: }>,<{class_b: }>,<{class_c: }>,<{source_Id: }> , <{effective_time}> , <{create_time}> , <{create_by}> , <{status}> , <{approve_status}>);`
}
],
`"buildModel"`:`true`
}
3. 数据写入
最后一步是将转换后的数据通过 MySQL API 接口写入目标数据库。在这里,我们使用 SQL 插入语句将数据插入到指定的表中。
INSERT INTO `lhhy_srm`.`goods_price`
(
`change_type`,
`brand`,
`supplier_code`,
`goods_code`,
`goods_name`,
`price`,
`distrib_price`,
`min_price`,
`brand_coefficient`,
`class_a`,
`class_b`,
`class_c`,
`source_Id`,
`effective_time`,
`create_time`,
`create_by`,
`status`,
`approve_status`
)
VALUES
(
'B',
'{selectField_lmlugdfx}',
'_findCollection find textField_ln2uyh3e from 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 where textField_lfjcloll={{tableField_lnedzxul_textField_lnedzxum}}',
'{{tableField_lnedzxul_textField_lnedzxum}}',
'{{tableField_lnedzxul_selectField_lnedzxun}}',
'{{tableField_lnedzxul_numberField_lnoes31u}}',
'{{tableField_lnedzxul_numberField_lneeit5z}}',
'{{table_field.lned.zx.ul.number.Field.lnee.it62}}',
'_mongoQuery 68e141c6-4351-3f2f-b9a2-5eaee8f01a55 find.Field=content.number.Field.lfj.clona where={\"content.text.Field.lfj.cloll\":{\"$eq\":\"{{table.Field.lhy.kr1.el.text.Field.lhy.kr1.en}\"}',
'厨卫成品',
'{select.Field.lml.ugd.g3}',
'_find.Collection find select.Field.lfj.clolt from 68e141c6-4351-3f2f-b9a2-5eaee8.f01.a55 where text.Field.lfj.cloll={{table.Field.lne.dz.xu.l.text.Field.lne.dz.xu.m}}',
'5',
'_function FROM_UNIXTIME( ( {date.Field.lo.thx.6.po} /1000 ) ,'%Y-%m-%d' )',
'_function DATE_FORMAT('{gmt.Create}','%Y-%m-%d 00:00:00')',
'1',
'1'
);