轻易云平台的数据清洗与转换实战经验

  • 轻易云集成顾问-曾平安

汤臣倍健营销云数据集成到SQL Server的技术案例分析

在本次案例中,我将分享如何通过轻易云数据集成平台,将汤臣倍健营销云的数据可靠地同步至SQL Server,具体方案命名为“新版订单同步-江油市旌泰”。该方案主要涉及API接口调用、数据格式转换、分页处理及异常重试机制等多个关键技术环节。

首先,我们需要确保从汤臣倍健营销云接口/api/openapi/v1/erp/order/honour/agreement/header准确抓取订单数据。为了避免漏单和保障抓取任务的定时性与可靠性,通过配置轻易云的数据调度功能,可以实现对该接口的定时调用。同时,为了应对大批量数据传输需求,我们制定了一系列策略,以快速写入所获取的数据到SQL Server数据库中。

在实际操作过程中,处理分页和限流问题成为解决高效抓取的重要一环。我们使用动态参数化设置,对API进行多次分页查询,并结合限流配置,确保不会因为过于频繁地请求服务端而导致被封禁。此外,在接收到的数据与目标数据库之间存在格式差异时,需要做相应的转换。这就要求我们编写高效的数据映射规则,实现JSON格式向SQL可识别结构的自动转化。

最后,通过实施严格的实时监控与日志记录机制,以及针对可能出现的对接异常设计可靠重试策略,有效提高整个流程中的稳定性和透明度。当发生操作失败或网络延迟等不可控因素引起的问题时,这些措施能够及时捕捉并重新尝试,使得业务持续运行,减少故障时间,提高系统整体响应效率。 如何对接钉钉API接口

调用汤臣倍健营销云接口获取并加工数据

在数据集成的生命周期中,第一步是调用源系统接口获取原始数据。本文将详细探讨如何通过轻易云数据集成平台调用汤臣倍健营销云的接口/api/openapi/v1/erp/order/honour/agreement/header,并对获取的数据进行初步加工。

接口调用配置

首先,我们需要配置API调用的元数据。根据提供的元数据配置,我们可以看到该接口使用POST方法进行请求,且支持分页,每页返回20条记录。以下是具体的请求参数配置:

{
  "api": "/api/openapi/v1/erp/order/honour/agreement/header",
  "method": "POST",
  "number": "no",
  "id": "id",
  "pagination": {
    "pageSize": 20
  },
  "beatFlat": ["details"],
  "idCheck": true,
  "request": [
    {"field":"orgId","label":"组织ID","type":"string","value":"c0cdee4a7d744a199cccc3c8bf7ff3d9"},
    {"field":"page","label":"页码","type":"string","value":"1"},
    {"field":"id","label":"订单id","type":"string"},
    {"field":"applyerId","label":"要货方id","type":"string"},
    {"field":"supplierId","label":"供货方id","type":"string"},
    {"field":"no","label":"订单号","type":"string"},
    {"field":"distributionType","label":"分销类型","type":"string"},
    {"field":"distributorId","label":"分销商id","type":"string"},
    {"field":"orderStatus","label":"订单状态","type":"string", "value": "WAIT_FINANCE_AUDIT,WAIT_DELIVERY,PART_DELIVERY,ALL_DELIVERY,WAIT_OUT_STORAGE,PART_OUT_STORAGE"},
    {"field":"createDt","label":"创建时间","type":"string"},
    {"field":"orderTypeCode","label":"订单类型,如普通订单、直运销售","type":"string"},
    {"field":"isDeliveryFreezed","label":"是否暂停发货","type":"string"},
    {"field":"relatedApplyerId","label":"关联交易经销商id","type":"string"},
    {"field":"saleDistribution","label":"销售渠道","type":"string"},
    {"field":"disApplyerId","label":"分销商id","type":"string"},
    {"field": "startDt", "label": "订单时间(开始)", "type": "string" },
    {"field": "endDt", "label": "订单时间(结束)", "type": "string" },
    {"field": "appStartDt", "label": "审批时间(开始)", "type": "string" },
    {"field": "appEndDt", "label": "审批时间(结束)", "type": "string" },
    {"field": "lastStartDt", "label": "最后修改时间(开始)", "type": "string", 
        "value": "{{LAST_SYNC_TIME|datetime}}"
    },
    {"field": 
        "lastEndDt", 
        "label":
            "最后修改时间(结束)",
            "type":
                "string",
                "value":
                    "{{CURRENT_TIME|datetime}}"
                },
                {"label":
                    "单据类型1订单、2退货",
                    "field":
                        "nature",
                        "type":
                            "string",
                              value:
                                  “1”
                              }
                          ]
                      }

数据请求与清洗

在发起API请求时,我们需要确保所有必要参数已正确填充。特别是分页参数和日期范围参数,这些参数直接影响到我们获取的数据量和准确性。

  • 分页处理:由于每次请求只返回20条记录,因此我们需要循环调用API,直到没有更多数据为止。
  • 日期范围:使用{{LAST_SYNC_TIME|datetime}}{{CURRENT_TIME|datetime}}来动态设置最后修改时间的范围,以确保我们只获取到最新的数据。

以下是一个示例代码片段,用于发起API请求并处理响应数据:

import requests
import json

url = 'https://example.com/api/openapi/v1/erp/order/honour/agreement/header'
headers = {'Content-Type': 'application/json'}
params = {
  'orgId': 'c0cdee4a7d744a199cccc3c8bf7ff3d9',
  'page': '1',
  'orderStatus': 'WAIT_FINANCE_AUDIT,WAIT_DELIVERY,PART_DELIVERY,ALL_DELIVERY,WAIT_OUT_STORAGE,PART_OUT_STORAGE',
  'lastStartDt': last_sync_time,
  'lastEndDt': current_time,
  'nature': '1'
}

response = requests.post(url, headers=headers, data=json.dumps(params))
data = response.json()

# 数据清洗
cleaned_data = []
for record in data['records']:
  cleaned_record = {
      'order_id': record['id'],
      'order_no': record['no'],
      # 更多字段映射...
  }
  cleaned_data.append(cleaned_record)

数据转换与写入

在完成数据清洗后,我们需要将其转换为目标系统所需的格式,并写入目标数据库或系统。这一步通常包括字段映射、格式转换等操作。

例如,将清洗后的数据写入一个SQL数据库:

import sqlite3

conn = sqlite3.connect('orders.db')
cursor = conn.cursor()

for record in cleaned_data:
  cursor.execute('''
      INSERT INTO orders (order_id, order_no)
      VALUES (?, ?)
  ''', (record['order_id'], record['order_no']))

conn.commit()
conn.close()

通过以上步骤,我们实现了从汤臣倍健营销云接口获取原始数据,并对其进行初步加工和存储,为后续的数据处理和分析奠定了基础。 如何对接钉钉API接口

轻易云数据集成平台生命周期第二步:ETL转换与写入SQL Server API接口

在轻易云数据集成平台的生命周期中,ETL(Extract, Transform, Load)转换是关键的一步。本文将深入探讨如何将已经集成的源平台数据进行ETL转换,并转为目标平台SQL Server API接口所能够接收的格式,最终写入目标平台。

数据请求与清洗

在进行ETL转换之前,首先需要对源数据进行请求与清洗。通过元数据配置,我们可以定义需要提取的数据字段,并对其进行初步清洗和格式化。例如:

{
  "label": "订单单号",
  "field": "djbh",
  "type": "string",
  "value": "{no}"
}

上述配置表示从源数据中提取订单单号,并将其映射到目标字段djbh

数据转换

在数据清洗完成后,下一步是将数据转换为目标平台SQL Server所能接受的格式。这一步骤至关重要,因为不同系统之间的数据格式和要求可能存在差异。轻易云平台通过灵活的元数据配置,实现了这一过程的自动化和高效化。

例如,以下是一个复杂的数据转换配置:

{
  "label": "商品ID",
  "field": "spid",
  "type": "string",
  "value": "_findCollection find spid from d76b64f9-f0e0-3436-a2d9-14c5579faa1b where spbh2={details_extNo}"
}

此配置表示从特定集合中查找商品ID,并根据details_extNo字段进行匹配。这种方式确保了数据的一致性和准确性。

数据写入

完成数据转换后,最后一步是将处理后的数据写入目标平台SQL Server。通过API接口实现这一过程,可以确保数据传输的可靠性和实时性。以下是一个典型的API接口调用配置:

{
  "api": "insert",
  "method": "POST",
  "idCheck": true,
  "request": [
    {
      "label": "主表参数",
      "field": "main_params",
      ...
    }
  ],
  ...
}

其中,api字段指定了调用的API类型为插入操作,method字段指定了HTTP方法为POST。此外,通过设置idCheck为true,可以确保每次插入操作都进行唯一性检查,避免重复数据。

具体的数据插入语句可以通过如下配置实现:

{
  "label": "主SQL语句",
  "field": "main_sql",
  "type": "string",
  "value": 
    `INSERT INTO Inter_ddmx 
     (djbh, dj_sn, spid, shl, Pihao, Sxrq, Baozhiqi, hshj, hsje, beizhu, rq, ontime, wldwname, wldwid, dizhi, shr, shrdh, ywy, hzid, ckname) 
     VALUES 
     (:djbh ,:dj_sn,:spid,:shl,:Pihao,:Sxrq,:Baozhiqi,:hshj,:hsje,:beizhu,:rq,:ontime,:wldwname,:wldwid,:dizhi,:shr,:shrdh,:ywy,:hzid,:ckname)`
}

此SQL语句用于将转换后的数据插入到目标表Inter_ddmx中。每个占位符(如:djbh, :dj_sn等)对应于前面定义的数据字段,确保了数据的一致性和完整性。

强制关联

为了确保所有相关的数据都被正确处理,可以启用强制关联功能:

{
  "enforcedAssociation": true
}

此设置确保所有依赖关系都被严格遵守,从而避免由于缺失或不一致的数据导致的问题。

综上所述,通过轻易云数据集成平台,我们可以高效地完成从源平台到目标平台SQL Server的数据ETL转换与写入过程。通过灵活的元数据配置和强大的API接口支持,实现了不同系统间的数据无缝对接,提高了业务流程的透明度和效率。 数据集成平台API接口配置