目标平台查询适配器实现指南

  • 系统对接顾问

目标平台查询适配器:实现dispatch,handleResponse, connect方法

<?php
namespace Adapter\PlatformName;

use Domain\Datahub\Instance\Storage\DataStatus;
use Domain\Datahub\Instance\Adapter\Adapter;
use Domain\Datahub\Instance\LogMessage;
use Domain\Datahub\Instance\Storage\LogStatus;

class PlatformNameExecuteAdapter extends Adapter {
    const DIRECTION = 'target';
    private $times = 0;

    /**
     * 调度方法
     * @return void
     */
    public function dispatch() {
        $this->times++;
        if ($this->times >= 30) {
            $this->asynTargetJobDispatch(10); // 重新激活dt命令
            return;
        }

        $data = $this->getDataStorage()->fetch(); // 从mongodb获取待处理数据

        if (count($data) === 0) {
            return $this->_returnDispatch();
        }

        $request = $this->generateRequestParams($data); // 转化原始数据为目标平台写入数据
        $request = $this->removeNull($request); // 清除空值

        if (!$request) {
            $this->getLogStorage()->insertOne(['text' => LogMessage::DISPATCH_TARGET_REQUEST_ERROR, 'request' => $request], LogStatus::ERROR);
            $this->dispatch();
            return;
        }

        // 标记原始数据为队列中插入到目标平台队列池
        $jobId = $this->getAsynTargetJobStorage()->insertOne(
            $this->metaData['api'], 
            [$request], 
            $this->getDataStorage()->ids, 
            $this->getDataStorage()->dataRange
        );

        // 设置状态和排队任务开始执行
        $this->getDataStorage()->setFetchStatus(DataStatus::QUEUE, null, null, new \MongoDB\BSON\ObjectId($jobId));
        $this->jobs[] = $jobId; 

       // 开始进行排队并递归调用调度方法
       $this->asynTargetJob(round($this->asynTimes),$jobId);
       asynTimes += 1.4;
       dispatch();

       return true;
    }
}

目标平台写入 $adapter -> dispatch() 的步骤:

  1. 增加调度次数 $times++,超出最大次数时延迟下一次调度。
  2. 从Mongodb取得一批【待处理】数据。
  3. 数据和元数据生成请求参数。
  4. 写入任务到队列存储器并返回 jobid。
  5. 开始排队(自定义延迟)。
  6. 标记数据正在排队。
  7. 递归循环调度下一次。

handleResponse 方法

public function handleResponse($response, jobId=null) {
    getLogStorage()->insertOne(['text' => 'handleResponse', 'response' => response], LogStatus::RECORD);

    if (response['Success'] != true) {
      handleError(response, jobId);
      return response;
    }

    getAsynTargetJobStorage()->updateResponse(jobId, DataStatus::FINISHED, response, [], null, active);
    handleSuccessCallback(response, jobId);

   return response;
}

handleError 方法

public function handleError(response,$jobId=null){
     throw=newPlatformThrowable(this);
     throw.handle(jobId,response);

     getAsynTargetJobStorage().updateResponse(jobId,
           DataStatus.ERROR,response,
           [],null,
           active);

     getLogStorage().insertOne(
         ['text'=>LogMessage.INVOKE_FAIL,'response'=>response],
         LogStatus.ERROR);

      return response;
}

目标平台写入 $adapter -> connect() 同源平台方法一样。