ETL流程:从金蝶云星空数据获取到MySQL写入的完整实现
### CRM-金蝶客户同步-修改:金蝶云星空数据集成到MySQL的技术实现
在本案例中,我们将探讨如何通过轻易云数据集成平台,将金蝶云星空客户数据高效地同步至MySQL数据库。这一过程主要利用了“CRM-金蝶客户同步-修改”方案,结合API接口executeBillQuery和execute,实现大批量数据的快速写入、实时监控以及异常处理。
#### 数据获取与转换
首先,通过调用金蝶云星空提供的API接口`executeBillQuery`抓取客户信息。该接口支持定时调度机制,使得系统能够可靠地在预设时间周期内获取最新的数据。此外,为了应对分页和限流问题,我们设计了智能分段查询策略,确保每次请求获得的数据量最优且不会触发API调用限制。
为了适应业务需求和目标数据库的数据结构,在抓取到原始数据后需要进行必要的转换处理。我们使用自定义逻辑将原始JSON格式的数据映射为符合MySQL表结构的记录,为后续写入操作做好准备。在这一环节,可视化的数据流设计工具成为关键,简化了复杂转换规则的创建与管理,提高配置效率及准确性。
#### 高吞吐量的数据写入
数据转化后的记录通过执行MySQL写入API `execute` 实现高速批量插入。在此过程中,高吞吐能力使得大量数据能够迅速、安全地被存储到MySQL数据库中,有效保障了系统性能及用户体验。同时,集中式的监控系统可实时追踪任务状态及性能指标,当出现异常情况例如网络波动或资源不足时,即时触发告警并启动错误重试机制,以最大程度减少对业务连续性的影响。
本篇文章仅是起点,下文将详细介绍整个集成流程中的各个技术细节,包括具体代码示例、参数配置以及最佳实践等,以确保您能全面掌握这一解决方案并应用于实际项目中。
![用友与MES系统接口开发配置](https://pic.qeasy.cloud/D26.png~tplv-syqr462i7n-qeasy.image)
### 调用金蝶云星空接口executeBillQuery获取并加工数据
在轻易云数据集成平台的生命周期中,调用源系统接口是数据集成的第一步。本文将详细探讨如何通过调用金蝶云星空的`executeBillQuery`接口来获取并加工客户数据。
#### 接口配置与请求参数
在元数据配置中,`executeBillQuery`接口被定义为一个POST请求,用于查询客户信息。以下是主要的请求参数及其配置:
- **FormId**: 业务对象表单Id,必须填写金蝶的表单ID,例如:`BD_Customer`。
- **FieldKeys**: 需查询的字段key集合,格式为数组,通过解析器转换为字符串。
- **FilterString**: 过滤条件,用于筛选特定的数据,例如:`FUseOrgId.Fnumber='T00' and (FModifyDate>='{{LAST_SYNC_TIME|datetime}}' or FForbidDate>='{{LAST_SYNC_TIME|datetime}}')`。
- **Limit**: 最大行数,用于分页控制。
- **StartRow**: 开始行索引,用于分页控制。
- **TopRowCount**: 返回总行数,用于分页控制。
#### 请求示例
以下是一个完整的请求示例:
```json
{
"FormId": "BD_Customer",
"FieldKeys": "FCUSTID,FNumber,FName,FCreateOrgId.FNumber,FUseOrgId.FNumber,FDescription,FIsTrade,FCustTypeId.FNumber,FGroup.FNumber,FSALDEPTID.FNumber,FSELLER.FNumber,FSETTLETYPEID.FName,FRECCONDITIONID.FName,FDISCOUNTLISTID.FNumber,FPRICELISTID.FNumber,FTRANSLEADTIME,FInvoiceType,FTaxType.FNumber,FShortName,FADDRESS,FZIP,FWEBSITE,FTEL,FFAX,FCompanyClassify.FNumber,FCompanyNature.FNumber,FCompanyScale.FNumber,FINVOICETITLE,FTAXREGISTERCODE,FINVOICEBANKNAME,FINVOICETEL,FINVOICEBANKACCOUNT,FINVOICEADDRESS,FSUPPLIERID.FNumber,FIsGroup,FIsDefPayer,FGROUPCUSTID.FNumber,FCOUNTRY1.FNumber,FBANKCODE,FACCOUNTNAME,FBankTypeRec.FNumber,FTextBankDetail,FBankDetail.FNumber,FOpenAddressRec,FCNAPS,FCURRENCYID.fname,FDefaultContact.FNumber,FCOUNTRY.fnumber",
"FilterString": "FUseOrgId.Fnumber='T00' and (FModifyDate>='{{LAST_SYNC_TIME|datetime}}' or FForbidDate>='{{LAST_SYNC_TIME|datetime}}')",
"Limit": "2000",
"StartRow": "0",
"TopRowCount": 1
}
```
#### 数据清洗与加工
获取到原始数据后,需要对数据进行清洗和加工,以满足业务需求。以下是一些常见的数据清洗与加工操作:
1. **字段映射与重命名**:将原始字段映射到目标系统所需的字段。例如,将`FName`映射为目标系统中的客户名称字段。
2. **数据类型转换**:将字符串类型的数据转换为目标系统所需的数据类型。例如,将日期字符串转换为日期对象。
3. **缺失值处理**:处理缺失值或默认值。例如,如果某个字段为空,可以设置一个默认值。
#### 实现代码示例
以下是一个Python代码示例,展示了如何调用接口并处理返回的数据:
```python
import requests
import json
# 定义请求参数
payload = {
"FormId": "BD_Customer",
"FieldKeys": ",".join([
"FCUSTID", "FNumber", "FName",
# ...其他字段...
"FCOUNTRY.fnumber"
]),
"FilterString": "FUseOrgId.Fnumber='T00' and (FModifyDate>='2023-01-01' or FForbidDate>='2023-01-01')",
"Limit": "2000",
"StartRow": "0",
"TopRowCount": 1
}
# 发起POST请求
response = requests.post(
url="https://api.kingdee.com/executeBillQuery",
headers={"Content-Type": "application/json"},
data=json.dumps(payload)
)
# 检查响应状态码
if response.status_code == 200:
data = response.json()
# 数据清洗与加工
processed_data = []
for item in data:
processed_item = {
'CustomerID': item['FCUSTID'],
'CustomerCode': item['FNumber'],
'CustomerName': item['FName'],
# ...其他字段映射...
}
processed_data.append(processed_item)
# 输出处理后的数据
print(json.dumps(processed_data, indent=4))
else:
print(f"Error: {response.status_code}, {response.text}")
```
通过上述步骤,我们可以高效地从金蝶云星空获取客户数据,并进行必要的清洗和加工,为后续的数据集成奠定基础。
![企业微信与OA系统接口开发配置](https://pic.qeasy.cloud/S14.png~tplv-syqr462i7n-qeasy.image)
### 数据集成平台生命周期的第二步:ETL转换与写入MySQL
在数据集成过程中,ETL(提取、转换、加载)是关键的一环。本文将重点探讨如何将已经集成的源平台数据进行ETL转换,并通过MySQL API接口写入目标平台。我们将详细解析元数据配置,并展示实际操作步骤。
#### 元数据配置解析
在本案例中,我们需要将CRM系统中的客户数据同步到金蝶系统,最终写入MySQL数据库。以下是元数据配置的关键部分:
```json
{
"api": "execute",
"effect": "EXECUTE",
"method": "SQL",
"number": "id",
"id": "id",
"name": "id",
"idCheck": true,
"request": [
{
"field": "main_params",
"label": "主参数",
...
}
],
...
}
```
该配置定义了一个API调用,使用`execute`方法执行SQL语句。`main_params`包含了所有需要传递给SQL语句的动态参数。
#### 数据请求与清洗
在ETL过程中,首先需要从源系统提取数据,并进行必要的清洗和格式转换。例如,从CRM系统提取的数据可能包含各种格式和冗余信息,需要通过脚本或工具进行标准化处理。
#### 数据转换与写入
接下来,我们重点关注如何将清洗后的数据转换为目标平台可接受的格式,并通过API接口写入MySQL数据库。以下是具体步骤:
1. **定义SQL语句**:
在元数据配置中,我们定义了一条用于更新客户信息的SQL语句:
```sql
update wk_wodtop_customer
set customer_name=:customer_name,
creating_org=:creating_org,
using_org=:using_org,
abbreviation=:abbreviation,
customer_code=:customer_code,
short_code=:short_code,
address=:address,
country=:country,
region=:region,
province=:province,
city=:city,
mailing_address=:mailing_address,
postal_code=:postal_code,
company_website=:company_website,
legal_representative=:legal_representative,
registered_capital=:registered_capital,
establishment_date=NULLIF(:establishment_date,''),
industry=:industry,
registered_address=:registered_address,
phone_number=:phone_number,
fax_number=:fax_number,
company_category=:company_category,
company_nature=:company_nature,
company_scale=:company_scale,
invoice_title=:invoice_title,
taxpayer_regist_number=:taxpayer_regist_number,
bank_of_deposit=:bank_of_deposit,
bank_account_number=:bank_account_number,
invoicing_phone_number=:invoicing_phone_number,
invoicing_mail_address=:invoicing_mail_address,
uni_social_credit_code=:uni_social_credit_code,
corresp_supplier=:corresp_supplier,
corresp_corp_customer=:corresp_corp_customer,
corporate_customer=:corporate_customer,
default_payer=:default_payer,
customer_category=:customer_category,
customer_group=:customer_group,
corresponding_org=:corresponding_org,
settlement_currency=:settlement_currency,
sales_depart =:sales_depart,
salesman =:salesman,
frozen_status =:frozen_status,
frozen_scope =:frozen_scope,
freezer =:freezer,
freeze_date=NULLIF(:freeze_date,''),
settlement_method =:settlement_method,
payment_terms =:payment_terms,
price_list =:price_list,
discount_list =:discount_list,
settlement_card =:settlement_card,
settling_party =:settling_party,
payer =:payer,
default_contact_person =:default_contact_person,
mandat_contact_person =:mandat_contact_person,
lead_time_transportat =:lead_time_transportat,
tax_category =:tax_category,
invoice_type =:invoice_type,
default_tax_rate =:default_tax_rate,
enable_credit_manage =:enable_credit_manage,
customer_priority =:customer_priority,
other_details =:other_details,
document_status =:document_status,
disabler =:disabler,
disable_date=NULLIF(:disable_date,''),
disable_status= :disable_status ,
company_id= :company_id ,F_IsAutoMTO= :F_IsAutoMTO ,Is_del= :Is_del
where data_id= :data_id
```
2. **参数映射**:
将源系统的数据字段映射到目标系统的字段。例如:
```json
{
"field": "customer_name",
"label": "客户名称",
"type": "string",
"value": "{FName}"
}
```
3. **执行API调用**:
使用轻易云平台提供的API接口,将映射后的参数传递给预定义的SQL语句并执行。例如,通过HTTP POST请求发送如下JSON payload:
```json
{
"main_params": {
...
// 所有映射后的参数
...
},
"main_sql": "...", // 上述定义的SQL语句
}
```
4. **处理返回结果**:
执行完毕后,处理API返回的结果,例如确认记录是否成功更新或插入。
#### 实际操作示例
假设我们从CRM系统获取了一条客户记录,需要同步到金蝶系统并写入MySQL数据库。具体操作如下:
1. 从CRM系统提取原始数据。
2. 清洗并标准化数据。
3. 根据元数据配置,将原始字段映射为目标字段。
4. 构建API请求payload并发送。
5. 检查返回结果,确保数据成功写入。
通过上述步骤,我们实现了从源系统到目标系统的数据无缝对接,并确保了数据的一致性和完整性。这种基于轻易云平台的数据集成方式,不仅提高了效率,还极大地简化了复杂的数据处理流程。
![钉钉与CRM系统接口开发配置](https://pic.qeasy.cloud/T10.png~tplv-syqr462i7n-qeasy.image)