轻易云平台实现ETL转换与数据写入的最佳实践

  • 轻易云集成顾问-彭萍

用友BIP数据集成到轻易云集成平台的解决方案——查询ys调拨单-v

在企业内外部数据互联互通的过程中,高效、可靠的数据集成是至关重要的环节。本文将聚焦于一个实际应用案例:如何将用友BIP系统中的调拨单数据(API接口:/yonbip/scm/transferapply/list),通过轻易云数据集成平台进行无缝对接,最终实现快速写入目标系统(API接口:wdt.purchase.provider.create)。

首先,为确保从用友BIP提取的数据不漏单,我们采用了定时可靠抓取机制。通过设定合理的时间间隔,配置定时任务,对接用友BIP的API接口,不仅能按时获取最新的数据,同时还能处理分页和限流问题。在对接过程中,每次请求都会记录日志并监控执行状态,以便及时排除可能出现的问题。

针对大批量数据如何快速、高效地写入轻易云平台这一关键点,我们设计了一种基于批量操作的方法。有别于传统的一条一条插入记录,这种方法能够显著减少网络开销和数据库锁资源消耗,从而提高整体性能。此外,通过定义规则来映射两者之间存在的数据格式差异,使得转换过程更加顺畅。这不仅降低了开发和维护成本,还提升了系统响应速度。

在实际运行中处理异常情况,是保证系统稳定性的重要环节。因此,在整个数据对接及处理流程中,轻易云平台提供了完善的异常处理与重试机制。一旦发生错误,无论是连接超时还是其他类型的问题,都可以自动进行多次重试,并持续监控其结果。如果发现问题无法自动解决,会立刻发送告警信息以便人工干预。

简而言之,本案例展示了从获取源头数据、处理并清洗到最终写入目标库这一系列步骤该如何高效实施,以及应注意哪些技术细节。通过这些技术手段,可以极大提高企业在跨系统间的数据交互效率,确保业务流程顺畅且不中断。 如何对接钉钉API接口

调用用友BIP接口获取并加工数据的技术案例

在数据集成生命周期的第一步,我们需要调用源系统用友BIP接口/yonbip/scm/transferapply/list来获取调拨单数据,并对其进行初步加工。本文将详细介绍如何配置和调用该接口,以及如何处理返回的数据。

接口配置与调用

首先,我们需要配置接口的请求参数。以下是元数据配置中的关键字段:

  • pageSize: 每页显示的数据数,默认值为10。
  • code: 单据编号,例如:DBDD0003。
  • pageIndex: 当前页数,默认值为1。
  • open_vouchdate_begin: 单据开始日期,例如:2020-09-09 00:00:00。
  • open_vouchdate_end: 单据结束日期,例如:2020-09-25 23:59:59。
  • inwarehouse: 调入仓库ID。
  • bustype: 交易类型ID,例如:110000000000037。
  • outorg: 调出组织ID,例如:1513866252718336。
  • inorg: 调入组织ID,例如:1520919005434112。
  • outwarehouse: 调出仓库ID,例如:1538112225530112。
  • status: 单据状态,默认值为1(已审核)。

这些参数将用于构建POST请求体,以便从用友BIP系统中获取调拨单列表。

{
  "pageSize": "100",
  "code": "DBDD0003",
  "pageIndex": "1",
  "open_vouchdate_begin": "2020-09-09 00:00:00",
  "open_vouchdate_end": "2020-09-25 23:59:59",
  "inwarehouse": "",
  "bustype": "110000000000037",
  "outorg": "1513866252718336",
  "inorg": "1520919005434112",
  "outwarehouse": "",
  "status": "1"
}

数据格式化与转换

在接收到响应数据后,需要对其进行格式化和转换。根据元数据配置中的formatResponse字段,我们需要将原始字段名转换为新的字段名,并确保数据类型一致。

例如,响应中的字段id需要转换为new_id,并且格式化为字符串。同样地,其他字段如transferApplys_id, inwarehouse, 和 outwarehouse也需要进行相应的转换。

{
  "old": "id",
  "new": "new_id",
  "format": "string"
},
{
  "old": "transferApplys_id",
  "new": "new_transferApplys_id",
  "format": "string"
},
{
  "old": "inwarehouse",
  "new": "new_inwarehouse",
  "format": "string"
},
{
  "old": "outwarehouse",
  "new": "new_outwarehouse",
  "format": "string"
}

示例代码

以下是一个示例代码片段,用于调用接口并处理返回的数据:

import requests
import json

# 配置请求参数
payload = {
    'pageSize': '100',
    'code': 'DBDD0003',
    'pageIndex': '1',
    'open_vouchdate_begin': '2020-09-09T00:00:00',
    'open_vouchdate_end': '2020-09-25T23:59:59',
    'bustype': '110000000000037',
    'outorg': '1513866252718336',
    'inorg': '1520919005434112',
    'status': '1'
}

# 发起POST请求
response = requests.post('https://api.yonyoucloud.com/yonbip/scm/transferapply/list', json=payload)

# 检查响应状态码
if response.status_code == 200:
    data = response.json()

    # 格式化响应数据
    formatted_data = []
    for item in data['data']:
        formatted_item = {
            'new_id': str(item['id']),
            'new_transferApplys_id': str(item['transferApplys_id']),
            'new_inwarehouse': str(item['inwarehouse']),
            'new_outwarehouse': str(item['outwarehouse'])
        }
        formatted_data.append(formatted_item)

    # 输出格式化后的数据
    print(json.dumps(formatted_data, indent=4))
else:
    print(f"请求失败,状态码:{response.status_code}")

通过上述步骤,我们成功地调用了用友BIP接口获取调拨单数据,并对其进行了必要的格式化和转换。这是轻易云数据集成平台生命周期中的关键一步,为后续的数据处理和写入奠定了基础。 用友与CRM系统接口开发配置

使用轻易云数据集成平台进行ETL转换与数据写入

在数据集成的生命周期中,第二步是将已经集成的源平台数据进行ETL(Extract, Transform, Load)转换,并转为目标平台API接口所能够接收的格式,最终写入目标平台。本文将详细探讨如何利用轻易云数据集成平台进行这一过程。

数据请求与清洗

首先,我们需要从源系统中提取原始数据。这一步通常包括通过API接口或数据库查询获取数据,并对其进行初步清洗和过滤。假设我们已经完成了这一步,接下来就是重点:将清洗后的数据进行转换,并通过API接口写入目标系统。

数据转换与写入

在本案例中,我们需要将供应商信息从源系统转换为轻易云集成平台所能接受的格式,并通过wdt.purchase.provider.create API接口写入目标系统。以下是具体步骤:

  1. 配置元数据: 根据提供的元数据配置,我们需要确保每个字段都能正确映射到目标API接口所需的字段。

    {
     "api": "wdt.purchase.provider.create",
     "method": "POST",
     "idCheck": true,
     "request": [
       {"field": "provider_no", "label": "供应商编号", "type": "string", "describe": "代表供应商所有属性的唯一编码,用于供应商区分,ERP内支持自定义(ERP供应商界面设置),用于创建供应商数据信息", "value": "{code}"},
       {"field": "provider_name", "label": "供应商名称", "type": "string", "describe": "供应商名称", "value": "{name}"},
       {"field": "min_purchase_num", "label": "最小采购量", "type": "string", "describe": "最小采购量", "value":"1"},
       {"field": "purchase_cycle_days",  "label":"采购周期","type":"string","describe":"采购周期","value":"1"},
       {"field":"arrive_cycle_days","label":"到货周期","type":"string","describe":"到货周期","value":"1"},
       {"field":"contact","label":"联系人","type":"string","describe":"联系人"},
       {"field":"telno","label":"座机","type":"string","describe":"座机"},
       {"field":"mobile","label":"移动电话","type":"string","describe":"手机号"},
       {"field":"fax","label":"传真","type":"string","describe":"传真"},
       {"field":"zip","label":"邮编","type":"string","describe":"邮政编码"},
       {"field":"email","label":"邮箱","type":"string","describe":"电子邮箱"},
       {"field":"qq","label":"qq","type":"string","describe":"腾讯QQ号码"},
       {"field":"wangwang","label":"旺旺号","type":"string","describe":"淘宝旺旺号"},
       {"field": "address", "label": "地址", "type": "string", "describe": "省、市、区(县)、地址详情"},
       {"field": "website", "label": "网址", "type": "string", "describe": "供应商官网地址"},
       {"field": "last_purchase_time", "label": "最后采购日期", "type": "string", "describe": "对供应商最后一次采购日期,不传默认接口创建供应商的年月日,格式:yyyy-MM-dd HH:mm:ss"},
       {"field": "is_disabled", "label": "停用", "type": "string", "describe": "是否停用"},
       {"field":{"charge_cycle_days"},{"label":{"结算周期"},"type":{"string"},"describe":{"对供应商的账款结算周期,单位(天),不传默认0"},"value":{"1"}}]
    }
  2. 数据映射: 在此步骤中,我们将源系统的数据字段映射到目标API接口所需的字段。例如:

    • code -> provider_no
    • name -> provider_name

    其他字段如min_purchase_num, purchase_cycle_days, arrive_cycle_days等则根据业务需求设置固定值或默认值。

  3. ETL转换: 在轻易云平台上,我们可以使用内置的数据转换工具,将上述映射关系应用于源数据。以下是一个简单的数据转换示例:

    def transform_data(source_data):
       transformed_data = {
           'provider_no': source_data['code'],
           'provider_name': source_data['name'],
           'min_purchase_num': '1',
           'purchase_cycle_days': '1',
           'arrive_cycle_days': '1',
           'contact': source_data.get('contact', ''),
           'telno': source_data.get('telno', ''),
           'mobile': source_data.get('mobile', ''),
           'fax': source_data.get('fax', ''),
           'zip': source_data.get('zip', ''),
           'email': source_data.get('email', ''),
           'qq': source_data.get('qq', ''),
           'wangwang': source_data.get('wangwang', ''),
           'address': source_data.get('address', ''),
           'website': source_data.get('website', ''),
           'last_purchase_time': source_data.get('last_purchase_time', datetime.now().strftime('%Y-%m-%d %H:%M:%S')),
           'is_disabled': source_data.get('is_disabled', ''),
           'charge_cycle_days': '1'
       }
       return transformed_data
  4. 调用API接口: 最后一步是将转换后的数据通过API接口发送至目标系统。可以使用HTTP库如requests来实现这一过程:

    import requests
    
    def send_to_target_api(transformed_data):
       api_url = '<轻易云集成平台API URL>'
       headers = {'Content-Type': 'application/json'}
    
       response = requests.post(api_url, json=transformed_data, headers=headers)
    
       if response.status_code == 200:
           print("Data successfully sent to target API")
       else:
           print(f"Failed to send data: {response.text}")
    
    # 示例调用
    source_data = {
        # 假设这是从源系统获取的数据
        'code': '12345',
        'name': 'ABC Supplier',
        # 其他字段...
    }
    
    transformed_data = transform_data(source_data)
    send_to_target_api(transformed_data)

通过以上步骤,我们实现了从源系统提取、清洗、转换并最终写入目标系统的全过程。这不仅提高了数据处理效率,还确保了数据的一致性和准确性。在实际操作中,可以根据具体业务需求和技术环境对上述流程进行调整和优化,以达到最佳效果。 如何开发企业微信API接口