因业务需要,将原有阿里云 Tablestore(简称OTS)的数据变更,同步至其他数据库中,例如 MySQL 或 PGSQL 等。
OTS 提供了通道服务,其接口封装分为两部分:管控接口和自动化的数据消费框架,只需要监听通道,并对其变更数据进行消费就能完成数据库同步作用。
可以根据业务需求开启通道,我为了方便选择开启全量加增量通道。
对于具体业务实现并不具体展开,在此仅给出 OTS 同步数据的一个大致框架。
Tunnel 类用于初始化通道并提供消费通道入口。
import com.alicloud.openservices.tablestore.TunnelClient;
import com.alicloud.openservices.tablestore.model.tunnel.*;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorker;
import com.alicloud.openservices.tablestore.tunnel.worker.TunnelWorkerConfig;
import lombok.Builder;
import lombok.extern.log4j.Log4j2;
/**
* 消费通道
*/
@Log4j2
@Builder
public class Tunnel {
private String endPoint;
private String accessKeyId;
private String accessKeySecret;
private String instanceName;
private String tableName;
private TunnelClient client;
/**
* 初始化通道
*/
private TunnelClient initTunnel() {
this.client = new TunnelClient(this.endPoint, this.accessKeyId, this.accessKeySecret, this.instanceName);
return client;
}
/**
* 获取通道ID
*/
private String getTunnelId() {
TunnelClient client = this.initTunnel();
String tunnelName = "tunnel_" + this.tableName;
try {
// Tunnel不存在时创建新全量+增量通道
CreateTunnelRequest request = new CreateTunnelRequest(this.tableName, tunnelName, TunnelType.BaseAndStream);
CreateTunnelResponse resp = client.createTunnel(request);
String tunnelId = resp.getTunnelId();
log.info("create tunnel success! tunnel Id: {}", tunnelId);
return tunnelId;
} catch (Exception e) {
// Tunnel存在时获取通道ID
DescribeTunnelRequest request = new DescribeTunnelRequest(this.tableName, tunnelName);
DescribeTunnelResponse resp = client.describeTunnel(request);
TunnelInfo tunnelInfo = resp.getTunnelInfo();
String tunnelId = tunnelInfo.getTunnelId();
log.info("tunnel already exist! tunnel Id: {}", tunnelId);
return tunnelId;
}
}
/**
* 消费通道
*/
public void startTunnel() {
String tunnelId = this.getTunnelId();
TunnelWorkerConfig config = new TunnelWorkerConfig(new SyncProcessor(this.tableName));
if (tunnelId != null) {
TunnelWorker worker = new TunnelWorker(tunnelId, this.client, config);
try {
worker.connectAndWorking();
} catch (Exception e) {
e.printStackTrace();
worker.shutdown();
this.client.shutdown();
}
}
}
}
SyncProcessor 类是一个同步处理器,根据不同的记录类型,如 UPDATE、PUT、DELETE,完成数据库同步的业务逻辑,或者其他业务。
import com.alicloud.openservices.tablestore.model.PrimaryKeyColumn;
import com.alicloud.openservices.tablestore.model.RecordColumn;
import com.alicloud.openservices.tablestore.model.StreamRecord;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import lombok.extern.log4j.Log4j2;
import java.util.List;
import java.util.Map;
/**
* 同步通道处理器
*/
@Log4j2
public class SyncProcessor implements IChannelProcessor {
/**
* 处理过程
*/
@Override
public void process(ProcessRecordsInput input) {
try {
// ProcessRecordsInput里包含有拉取到的数据
log.info("Process {} records, NextToken: {}", input.getRecords().size(), input.getNextToken());
for (StreamRecord record : input.getRecords()) {
String recordType = record.getRecordType().toString();
// UPDATE 数据操作
if (StreamRecord.RecordType.UPDATE.toString().equals(recordType)) {
}
// PUT 数据操作
if (StreamRecord.RecordType.PUT.toString().equals(recordType)) {
}
// DELETE 数据操作
if (StreamRecord.RecordType.DELETE.toString().equals(recordType)) {
}
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 关闭通道
*/
@Override
public void shutdown() {
log.error("Sync Exit");
}
}