案例分享:BDS对账吉客云采购入库明细集成MySQL
在处理企业级数据时,高效、准确且稳定的数据集成是至关重要的。近期,我们成功完成了BDS对账吉客云采购入库明细数据到MySQL的集成,实现了两者间的数据互通和同步。本次项目采用了轻易云数据集成平台,通过其强大的可视化配置和全生命周期管理功能,使得整个过程更加直观和透明。
技术方案概述
为了实现将吉客云采购入库明细(API接口:erp.storage.goodsdocin)定期、安全地批量写入到MySQL数据库(API接口:executeReturn),我们设计了一套详细的技术解决方案:
-
高吞吐量的数据写入
我们利用平台支持的大规模数据吞吐能力,将大量的采购入库明细快速、高效地写入到目标MySQL数据库中。这确保了即使在业务峰值期间,也能保持系统性能稳定。 -
集中监控与告警
数据流动过程中,我们借助集中式监控和告警系统实时跟踪任务状态及性能表现。一旦发生异常或预设条件触发,即刻产生报警,并提供详细日志记录,便于及时定位问题并采取相应措施。 -
分页与限流处理
吉客云API对于返回结果存在分页限制,同时也有调用次数限额。因此,在抓取数据时我们特别注意设置适当的分页参数,并优化请求频率以避免因超出调用次数而被封禁的问题。 -
自定义数据转换逻辑
在实际应用中,我们发现吉客云与MySQL之间的数据格式存在一定差异。为此,通过自定义的数据转换规则,对获取到的JSON格式原始数据进行必要的清洗、转化,以符合目标数据库表结构及业务需求。 -
可靠性保障机制
为确保每笔交易都能完整无误地记录下来,我们实现了基于事物控制和错误重试机制。在出现网络抖动或短暂不可用情况时,可自动重新尝试,最大程度上减少漏单现象。
接下来部分将深入探讨这一技术框架下具体实施步骤,包括如何通过轻易云平台完成从初始配置、任务调度,到最终落盘验证等环节,进一步解析每个核心操作点。
调用源系统吉客云接口erp.storage.goodsdocin获取并加工数据
在数据集成生命周期的第一步,我们需要从源系统吉客云调用接口erp.storage.goodsdocin
来获取采购入库明细数据,并对其进行初步加工。以下是具体的技术实现和配置细节。
接口调用配置
为了从吉客云获取采购入库明细数据,我们需要配置API请求参数。根据提供的元数据配置,API调用采用POST方法,以下是具体的请求参数:
- 分页页码(pageIndex):用于控制分页请求的当前页码。
- 分页页数(pageSize):每页返回的数据条数,默认设置为50。
- 入库单号(goodsDocNo):指定要查询的入库单号。
- 创建时间的起始时间(startDate):用于过滤创建时间在此时间之后的数据,动态值为上次同步时间。
- 创建时间的结束时间(endDate):用于过滤创建时间在此时间之前的数据,动态值为当前时间。
- 入库类型(inouttype):固定值为101,表示采购入库。
- 仓库ID(warehouseId)、仓库编号(warehouseCode)、供应商ID(vendId)、供应商编号(vendCode)等字段可根据需求进行填充。
示例请求体如下:
{
"pageIndex": "1",
"pageSize": "50",
"goodsDocNo": "",
"startDate": "{{LAST_SYNC_TIME|datetime}}",
"endDate": "{{CURRENT_TIME|datetime}}",
"inouttype": "101",
"warehouseId": "",
"warehouseCode": "",
"vendId": "",
"vendCode": "",
"billNo": "",
"userName": "",
"gmtModifiedStart": "",
"gmtModifiedEnd": ""
}
数据清洗与加工
在获取到原始数据后,需要对数据进行清洗和初步加工,以便后续的数据转换与写入。以下是几个关键步骤:
-
字段映射与转换:
- 将原始数据中的字段映射到目标系统所需的字段。例如,将
recId
映射为目标系统中的唯一标识符。 - 对日期格式进行标准化处理,将所有日期字段统一转换为目标系统所需的格式。
- 将原始数据中的字段映射到目标系统所需的字段。例如,将
-
数据过滤与校验:
- 根据业务规则过滤掉不符合条件的数据。例如,只保留状态为“已审核”的入库单据。
- 对关键字段进行校验,如检查
goodsDocNo
是否为空,确保数据完整性。
-
增量更新处理:
- 利用
startDate
和endDate
参数实现增量更新,只获取自上次同步以来的新数据。 - 在处理过程中记录本次同步的最大更新时间,以便下次同步时使用。
- 利用
示例代码片段如下:
import requests
import datetime
# 配置API请求参数
api_url = 'https://api.jikecloud.com/erp.storage.goodsdocin'
headers = {'Content-Type': 'application/json'}
payload = {
'pageIndex': '1',
'pageSize': '50',
'startDate': last_sync_time.strftime('%Y-%m-%d %H:%M:%S'),
'endDate': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'inouttype': '101'
}
# 发起API请求
response = requests.post(api_url, headers=headers, json=payload)
data = response.json()
# 数据清洗与加工
cleaned_data = []
for record in data['records']:
if record['status'] == '已审核':
cleaned_record = {
'id': record['recId'],
'goodsDocNo': record['goodsDocNo'],
# 更多字段映射...
}
cleaned_data.append(cleaned_record)
# 返回清洗后的数据
return cleaned_data
通过上述步骤,我们成功调用了吉客云接口erp.storage.goodsdocin
获取采购入库明细,并对其进行了必要的数据清洗和加工,为后续的数据转换与写入做好了准备。
数据集成生命周期中的ETL转换与写入MySQL
在数据集成生命周期的第二步,我们需要将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,转为目标平台 MySQL API 接口所能够接收的格式,并最终写入目标平台。这一过程不仅仅是简单的数据传输,而是涉及到复杂的数据清洗、转换和加载步骤。本文将详细探讨如何利用元数据配置完成这一任务。
元数据配置解析
元数据配置是实现ETL过程的核心,通过定义字段映射和转换规则,确保源数据能够正确地转换为目标平台所需的格式。以下是我们使用的元数据配置:
{
"api": "executeReturn",
"method": "POST",
"idCheck": true,
"request": [
{
"field": "main_params",
"label": "主参数",
"type": "object",
"children": [
{"field": "bill_no", "label": "出入库单号", "type": "string", "value":"{goodsdocNo}"},
{"field": "in_out_type", "label": "出入库类型", "type": "string", "describe":"出入库类型(101-采购入库,205-采购退货)", "value":"101"},
{"field": "in_out_date", "label": "出入库时间", "type": "datetime",
"value":"_function FROM_UNIXTIME( ( {inOutDate} / 1000 ) ,'%Y-%m-%d %T' )"},
{"field": "in_out_reason", "label": "出入库原因",
"type":"string","value":"{inOutReason}"},
{"field":...}
]
},
{
...
}
],
...
}
数据清洗与转换
在这一阶段,我们需要对源数据进行清洗和转换,以确保其符合目标平台MySQL API接口的要求。例如,in_out_date
字段需要将Unix时间戳转换为标准日期时间格式,这可以通过如下函数实现:
_function FROM_UNIXTIME( ( {inOutDate} / 1000 ), '%Y-%m-%d %T' )
类似地,其他字段也需要进行相应的处理,如将状态码、对账类型等字段映射到目标系统所需的值。
构建SQL语句
根据元数据配置,我们需要构建相应的SQL插入语句,以便将清洗和转换后的数据写入MySQL数据库。以下是主表和子表的插入语句示例:
主表插入语句:
INSERT INTO `lhhy_srm`.`supplier_purchase_in_out`
(`bill_no`, `in_out_type`, `in_out_date`, `in_out_reason`, `source_bill_no`,
`status`, `send_type`, `create_type`, `suppiler_code`, `suppiler_name`,
`warehouse_code`, `warehouse_name`, `purchase_org_code`, `purchase_org_name`,
`purchase_dept`, `purchaser`, `delivery_bill`, `take_delivery_bill`,
`purchase_memo`, `remark`, `create_time`, `create_by`)
VALUES
(<{bill_no: }>, <{in_out_type: }>, <{in_out_date: CURRENT_TIMESTAMP}>,
<{in_out_reason: }>, <{source_bill_no: }>, <{status: }>, <{send_type: }>,
<{create_type: 1}>, <{suppiler_code: }>, <{suppiler_name: }>,
<{warehouse_code: }>, <{warehouse_name: }>, <{purchase_org_code: }>,
<{purchase_org_name: }>, <{purchase_dept: }>, <{purchaser: }>,
<{delivery_bill: }>, <{take_delivery_bill: }>, <{purchase_memo: }>,
<{remark: }>, <{create_time: }>, <{create_by: }> );
子表插入语句:
INSERT INTO `lhhy_srm`.`supplier_purchase_in_out_detail`
(`order_id`, `goods_no`, `goods_name`, `bar_code`,
`spec_name`, `cate_name`, `brand_name`,
`price`,`unit`,`quantity`,`tax_price`,
`tax_rate`,`tax_amount`,`purchaser_tax_price`,
`supplier_tax_price`,`confirm_tax_price`,`goods_memo`,
`remark`)
VALUES
(<{lastInsertId: }>,<{goods_no: }>,<{goods_name: }>
,< {bar_code :}> ,< {spec_name :}> ,< {cate_name :}> ,
< {brand_name :}> ,< {price :}> ,< {unit :}> ,< {quantity :}>
,< {tax_price :}> ,< {tax_rate :0 }> ,< {tax_amount :0 }> ,
< {purchaser_tax_price :0 }> ,< {supplier_tax_price :0 }>
,< {confirm_tax_price :0 }> ,< {goods_memo :}> ,< {remark :}> );
数据加载
最后一步是将构建好的SQL语句通过API接口发送到MySQL数据库,实现数据的最终写入。这里我们使用POST方法,通过API接口executeReturn
执行上述SQL语句。
通过以上步骤,我们实现了从源平台到目标平台的数据ETL转换与写入,确保了数据在不同系统间的无缝对接。这一过程不仅提升了业务透明度和效率,也确保了数据的一致性和准确性。