阿里云 Tablestore 通道服务同步数据操作

Updated on with 734 views

因业务需要,将原有阿里云 Tablestore(简称OTS)的数据变更,同步至其他数据库中,例如 MySQL 或 PGSQL 等。

通道服务

OTS 提供了通道服务,其接口封装分为两部分:管控接口和自动化的数据消费框架,只需要监听通道,并对其变更数据进行消费就能完成数据库同步作用。

OTS通道服务文档

创建通道

参数说明

  • TableName:需要创建的通道表名。
  • TunnelName:需要创建的通道名称。
  • TunnelType:需要创建的通道类型, 支持全量(BaseData)、增量(Stream)和全量加增量(BaseAndStream)三类。

可以根据业务需求开启通道,我为了方便选择开启全量加增量通道。

OTS 同步框架

对于具体业务实现并不具体展开,在此仅给出 OTS 同步数据的一个大致框架。

Tunnel

Tunnel 类用于初始化通道并提供消费通道入口。

import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.*;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
import lombok.Builder;
import lombok.extern.log4j.Log4j2;

/**
 * 消费通道
 */
@Log4j2
@Builder
public class Tunnel {
    private String endPoint;
    private String accessKeyId;
    private String accessKeySecret;
    private String instanceName;
    private String tableName;
    private TunnelClient client;

    /**
     * 初始化通道
     */
    private TunnelClient initTunnel() {
        this.client = new TunnelClient(this.endPoint, this.accessKeyId, this.accessKeySecret, this.instanceName);
        return client;
    }

    /**
     * 获取通道ID
     */
    private String getTunnelId() {
        TunnelClient client = this.initTunnel();
        String tunnelName = "tunnel_" + this.tableName;

        try {
            // Tunnel不存在时创建新全量+增量通道
            CreateTunnelRequest request = new CreateTunnelRequest(this.tableName, tunnelName, TunnelType.BaseAndStream);
            CreateTunnelResponse resp = client.createTunnel(request);
            String tunnelId = resp.getTunnelId();
            log.info("create tunnel success! tunnel Id: {}", tunnelId);
            return tunnelId;
        } catch (Exception e) {
            // Tunnel存在时获取通道ID
            DescribeTunnelRequest request = new DescribeTunnelRequest(this.tableName, tunnelName);
            DescribeTunnelResponse resp = client.describeTunnel(request);
            TunnelInfo tunnelInfo = resp.getTunnelInfo();
            String tunnelId = tunnelInfo.getTunnelId();
            log.info("tunnel already exist! tunnel Id: {}", tunnelId);
            return tunnelId;
        }
    }

    /**
     * 消费通道
     */
    public void startTunnel() {
        String tunnelId = this.getTunnelId();
        TunnelWorkerConfig config = new TunnelWorkerConfig(new SyncProcessor(this.tableName));
        if (tunnelId != null) {
            TunnelWorker worker = new TunnelWorker(tunnelId, this.client, config);
            try {
                worker.connectAndWorking();
            } catch (Exception e) {
                e.printStackTrace();
                worker.shutdown();
                this.client.shutdown();
            }
        }
    }
}

SyncProcessor

SyncProcessor 类是一个同步处理器,根据不同的记录类型,如 UPDATE、PUT、DELETE,完成数据库同步的业务逻辑,或者其他业务。

import com.alicloud.openservices.tablestore.model.PrimaryKeyColumn;
import com.alicloud.openservices.tablestore.model.RecordColumn;
import com.alicloud.openservices.tablestore.model.StreamRecord;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import lombok.extern.log4j.Log4j2;

import java.util.List;
import java.util.Map;

/**
 * 同步通道处理器
 */
@Log4j2
public class SyncProcessor implements IChannelProcessor {
    /**
     * 处理过程
     */
    @Override
    public void process(ProcessRecordsInput input) {
        try {
            // ProcessRecordsInput里包含有拉取到的数据
            log.info("Process {} records, NextToken: {}", input.getRecords().size(), input.getNextToken());

            for (StreamRecord record : input.getRecords()) {
                String recordType = record.getRecordType().toString();

                // UPDATE 数据操作
                if (StreamRecord.RecordType.UPDATE.toString().equals(recordType)) {

                }

                // PUT 数据操作
                if (StreamRecord.RecordType.PUT.toString().equals(recordType)) {

                }

                // DELETE 数据操作
                if (StreamRecord.RecordType.DELETE.toString().equals(recordType)) {

                }
                Thread.sleep(500);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭通道
     */
    @Override
    public void shutdown() {
        log.error("Sync Exit");
    }
}

标题:阿里云 Tablestore 通道服务同步数据操作
作者:Jeffrey

Responses
取消