ETL流程:从金蝶云星空数据获取到MySQL写入的完整实现

  • 轻易云集成顾问-吕修远
### CRM-金蝶客户同步-修改:金蝶云星空数据集成到MySQL的技术实现 在本案例中,我们将探讨如何通过轻易云数据集成平台,将金蝶云星空客户数据高效地同步至MySQL数据库。这一过程主要利用了“CRM-金蝶客户同步-修改”方案,结合API接口executeBillQuery和execute,实现大批量数据的快速写入、实时监控以及异常处理。 #### 数据获取与转换 首先,通过调用金蝶云星空提供的API接口`executeBillQuery`抓取客户信息。该接口支持定时调度机制,使得系统能够可靠地在预设时间周期内获取最新的数据。此外,为了应对分页和限流问题,我们设计了智能分段查询策略,确保每次请求获得的数据量最优且不会触发API调用限制。 为了适应业务需求和目标数据库的数据结构,在抓取到原始数据后需要进行必要的转换处理。我们使用自定义逻辑将原始JSON格式的数据映射为符合MySQL表结构的记录,为后续写入操作做好准备。在这一环节,可视化的数据流设计工具成为关键,简化了复杂转换规则的创建与管理,提高配置效率及准确性。 #### 高吞吐量的数据写入 数据转化后的记录通过执行MySQL写入API `execute` 实现高速批量插入。在此过程中,高吞吐能力使得大量数据能够迅速、安全地被存储到MySQL数据库中,有效保障了系统性能及用户体验。同时,集中式的监控系统可实时追踪任务状态及性能指标,当出现异常情况例如网络波动或资源不足时,即时触发告警并启动错误重试机制,以最大程度减少对业务连续性的影响。 本篇文章仅是起点,下文将详细介绍整个集成流程中的各个技术细节,包括具体代码示例、参数配置以及最佳实践等,以确保您能全面掌握这一解决方案并应用于实际项目中。 ![用友与MES系统接口开发配置](https://pic.qeasy.cloud/D26.png~tplv-syqr462i7n-qeasy.image) ### 调用金蝶云星空接口executeBillQuery获取并加工数据 在轻易云数据集成平台的生命周期中,调用源系统接口是数据集成的第一步。本文将详细探讨如何通过调用金蝶云星空的`executeBillQuery`接口来获取并加工客户数据。 #### 接口配置与请求参数 在元数据配置中,`executeBillQuery`接口被定义为一个POST请求,用于查询客户信息。以下是主要的请求参数及其配置: - **FormId**: 业务对象表单Id,必须填写金蝶的表单ID,例如:`BD_Customer`。 - **FieldKeys**: 需查询的字段key集合,格式为数组,通过解析器转换为字符串。 - **FilterString**: 过滤条件,用于筛选特定的数据,例如:`FUseOrgId.Fnumber='T00' and (FModifyDate>='{{LAST_SYNC_TIME|datetime}}' or FForbidDate>='{{LAST_SYNC_TIME|datetime}}')`。 - **Limit**: 最大行数,用于分页控制。 - **StartRow**: 开始行索引,用于分页控制。 - **TopRowCount**: 返回总行数,用于分页控制。 #### 请求示例 以下是一个完整的请求示例: ```json { "FormId": "BD_Customer", "FieldKeys": "FCUSTID,FNumber,FName,FCreateOrgId.FNumber,FUseOrgId.FNumber,FDescription,FIsTrade,FCustTypeId.FNumber,FGroup.FNumber,FSALDEPTID.FNumber,FSELLER.FNumber,FSETTLETYPEID.FName,FRECCONDITIONID.FName,FDISCOUNTLISTID.FNumber,FPRICELISTID.FNumber,FTRANSLEADTIME,FInvoiceType,FTaxType.FNumber,FShortName,FADDRESS,FZIP,FWEBSITE,FTEL,FFAX,FCompanyClassify.FNumber,FCompanyNature.FNumber,FCompanyScale.FNumber,FINVOICETITLE,FTAXREGISTERCODE,FINVOICEBANKNAME,FINVOICETEL,FINVOICEBANKACCOUNT,FINVOICEADDRESS,FSUPPLIERID.FNumber,FIsGroup,FIsDefPayer,FGROUPCUSTID.FNumber,FCOUNTRY1.FNumber,FBANKCODE,FACCOUNTNAME,FBankTypeRec.FNumber,FTextBankDetail,FBankDetail.FNumber,FOpenAddressRec,FCNAPS,FCURRENCYID.fname,FDefaultContact.FNumber,FCOUNTRY.fnumber", "FilterString": "FUseOrgId.Fnumber='T00' and (FModifyDate>='{{LAST_SYNC_TIME|datetime}}' or FForbidDate>='{{LAST_SYNC_TIME|datetime}}')", "Limit": "2000", "StartRow": "0", "TopRowCount": 1 } ``` #### 数据清洗与加工 获取到原始数据后,需要对数据进行清洗和加工,以满足业务需求。以下是一些常见的数据清洗与加工操作: 1. **字段映射与重命名**:将原始字段映射到目标系统所需的字段。例如,将`FName`映射为目标系统中的客户名称字段。 2. **数据类型转换**:将字符串类型的数据转换为目标系统所需的数据类型。例如,将日期字符串转换为日期对象。 3. **缺失值处理**:处理缺失值或默认值。例如,如果某个字段为空,可以设置一个默认值。 #### 实现代码示例 以下是一个Python代码示例,展示了如何调用接口并处理返回的数据: ```python import requests import json # 定义请求参数 payload = { "FormId": "BD_Customer", "FieldKeys": ",".join([ "FCUSTID", "FNumber", "FName", # ...其他字段... "FCOUNTRY.fnumber" ]), "FilterString": "FUseOrgId.Fnumber='T00' and (FModifyDate>='2023-01-01' or FForbidDate>='2023-01-01')", "Limit": "2000", "StartRow": "0", "TopRowCount": 1 } # 发起POST请求 response = requests.post( url="https://api.kingdee.com/executeBillQuery", headers={"Content-Type": "application/json"}, data=json.dumps(payload) ) # 检查响应状态码 if response.status_code == 200: data = response.json() # 数据清洗与加工 processed_data = [] for item in data: processed_item = { 'CustomerID': item['FCUSTID'], 'CustomerCode': item['FNumber'], 'CustomerName': item['FName'], # ...其他字段映射... } processed_data.append(processed_item) # 输出处理后的数据 print(json.dumps(processed_data, indent=4)) else: print(f"Error: {response.status_code}, {response.text}") ``` 通过上述步骤,我们可以高效地从金蝶云星空获取客户数据,并进行必要的清洗和加工,为后续的数据集成奠定基础。 ![企业微信与OA系统接口开发配置](https://pic.qeasy.cloud/S14.png~tplv-syqr462i7n-qeasy.image) ### 数据集成平台生命周期的第二步:ETL转换与写入MySQL 在数据集成过程中,ETL(提取、转换、加载)是关键的一环。本文将重点探讨如何将已经集成的源平台数据进行ETL转换,并通过MySQL API接口写入目标平台。我们将详细解析元数据配置,并展示实际操作步骤。 #### 元数据配置解析 在本案例中,我们需要将CRM系统中的客户数据同步到金蝶系统,最终写入MySQL数据库。以下是元数据配置的关键部分: ```json { "api": "execute", "effect": "EXECUTE", "method": "SQL", "number": "id", "id": "id", "name": "id", "idCheck": true, "request": [ { "field": "main_params", "label": "主参数", ... } ], ... } ``` 该配置定义了一个API调用,使用`execute`方法执行SQL语句。`main_params`包含了所有需要传递给SQL语句的动态参数。 #### 数据请求与清洗 在ETL过程中,首先需要从源系统提取数据,并进行必要的清洗和格式转换。例如,从CRM系统提取的数据可能包含各种格式和冗余信息,需要通过脚本或工具进行标准化处理。 #### 数据转换与写入 接下来,我们重点关注如何将清洗后的数据转换为目标平台可接受的格式,并通过API接口写入MySQL数据库。以下是具体步骤: 1. **定义SQL语句**: 在元数据配置中,我们定义了一条用于更新客户信息的SQL语句: ```sql update wk_wodtop_customer set customer_name=:customer_name, creating_org=:creating_org, using_org=:using_org, abbreviation=:abbreviation, customer_code=:customer_code, short_code=:short_code, address=:address, country=:country, region=:region, province=:province, city=:city, mailing_address=:mailing_address, postal_code=:postal_code, company_website=:company_website, legal_representative=:legal_representative, registered_capital=:registered_capital, establishment_date=NULLIF(:establishment_date,''), industry=:industry, registered_address=:registered_address, phone_number=:phone_number, fax_number=:fax_number, company_category=:company_category, company_nature=:company_nature, company_scale=:company_scale, invoice_title=:invoice_title, taxpayer_regist_number=:taxpayer_regist_number, bank_of_deposit=:bank_of_deposit, bank_account_number=:bank_account_number, invoicing_phone_number=:invoicing_phone_number, invoicing_mail_address=:invoicing_mail_address, uni_social_credit_code=:uni_social_credit_code, corresp_supplier=:corresp_supplier, corresp_corp_customer=:corresp_corp_customer, corporate_customer=:corporate_customer, default_payer=:default_payer, customer_category=:customer_category, customer_group=:customer_group, corresponding_org=:corresponding_org, settlement_currency=:settlement_currency, sales_depart =:sales_depart, salesman =:salesman, frozen_status =:frozen_status, frozen_scope =:frozen_scope, freezer =:freezer, freeze_date=NULLIF(:freeze_date,''), settlement_method =:settlement_method, payment_terms =:payment_terms, price_list =:price_list, discount_list =:discount_list, settlement_card =:settlement_card, settling_party =:settling_party, payer =:payer, default_contact_person =:default_contact_person, mandat_contact_person =:mandat_contact_person, lead_time_transportat =:lead_time_transportat, tax_category =:tax_category, invoice_type =:invoice_type, default_tax_rate =:default_tax_rate, enable_credit_manage =:enable_credit_manage, customer_priority =:customer_priority, other_details =:other_details, document_status =:document_status, disabler =:disabler, disable_date=NULLIF(:disable_date,''), disable_status= :disable_status , company_id= :company_id ,F_IsAutoMTO= :F_IsAutoMTO ,Is_del= :Is_del where data_id= :data_id ``` 2. **参数映射**: 将源系统的数据字段映射到目标系统的字段。例如: ```json { "field": "customer_name", "label": "客户名称", "type": "string", "value": "{FName}" } ``` 3. **执行API调用**: 使用轻易云平台提供的API接口,将映射后的参数传递给预定义的SQL语句并执行。例如,通过HTTP POST请求发送如下JSON payload: ```json { "main_params": { ... // 所有映射后的参数 ... }, "main_sql": "...", // 上述定义的SQL语句 } ``` 4. **处理返回结果**: 执行完毕后,处理API返回的结果,例如确认记录是否成功更新或插入。 #### 实际操作示例 假设我们从CRM系统获取了一条客户记录,需要同步到金蝶系统并写入MySQL数据库。具体操作如下: 1. 从CRM系统提取原始数据。 2. 清洗并标准化数据。 3. 根据元数据配置,将原始字段映射为目标字段。 4. 构建API请求payload并发送。 5. 检查返回结果,确保数据成功写入。 通过上述步骤,我们实现了从源系统到目标系统的数据无缝对接,并确保了数据的一致性和完整性。这种基于轻易云平台的数据集成方式,不仅提高了效率,还极大地简化了复杂的数据处理流程。 ![钉钉与CRM系统接口开发配置](https://pic.qeasy.cloud/T10.png~tplv-syqr462i7n-qeasy.image)