本文目录导读:

- 📖 目录导读
- 实时数据同步的核心挑战
- 方案一:基于JDBC轮询 + 时间戳
- 方案二:基于MySQL Binlog + Canal
- 方案三:基于Kafka消息队列 + 消费者组
- 方案四:基于Redis Pub/Sub + 分布式锁
- 高并发场景下的性能优化技巧
- 常见问题与解答(FAQ)
- 如何选择适合你的实时同步方案
📖 目录导读
- 实时数据同步的核心挑战
- 基于JDBC轮询 + 时间戳
- 基于MySQL Binlog + Canal
- 基于Kafka消息队列 + 消费者组
- 基于Redis Pub/Sub + 分布式锁
- 高并发场景下的性能优化技巧
- 常见问题与解答(FAQ)
- 如何选择适合你的实时同步方案
实时数据同步的核心挑战
在Java企业级应用中,数据同步是一个高频需求,电商订单中心需要将MySQL中的订单实时同步到Elasticsearch用于搜索,或者将支付状态同步到Redis缓存。
传统定时任务(如Quartz每5分钟跑一次)在秒级甚至毫秒级实时性场景下完全不可用。
实时同步的三大难点:
- 数据一致性:避免重复或丢失数据
- 延迟控制:端到端延迟小于1秒
- 高可用:单点故障不影响同步链路
下面我们通过7个真实Java案例,从简单到复杂逐步解析。
方案一:基于JDBC轮询 + 时间戳
适用场景: 数据量小(<10万行)、允许秒级延迟、无法变更数据库配置。
核心思路:
在数据库表中增加last_modified_time字段,Java定时任务轮询查询WHERE update_time > ?,将增量数据同步到目标系统。
Java代码示例:
@Component
public class PollingDataSyncTask {
@Scheduled(fixedRate = 1000) // 每秒执行一次
public void sync() {
String sql = "SELECT * FROM orders WHERE update_time > ? ORDER BY id";
List<Order> orders = jdbcTemplate.query(sql, new Object[]{lastSyncTime}, rowMapper);
for (Order order : orders) {
// 调用ES/REDIS/MQ写入
esClient.index("orders", order);
}
lastSyncTime = System.currentTimeMillis();
}
}
如果数据更新非常频繁,轮询间隔太长会导致延迟,太短会增加数据库压力。
问答
问:这种方案怎么保证不丢数据?
答:每次同步记录lastSyncTime时,需要注意数据库时间与实际系统时钟的差异,建议使用数据库本身的NOW()来获取时间戳,并在每次同步后更新到一张单独的sync_marker表中持久化。
方案二:基于MySQL Binlog + Canal
适用场景: 数据量大、需要毫秒级实时、数据库不可侵入。
核心原理:
MySQL Binlog记录了所有数据变更,阿里巴巴开源的Canal伪装成MySQL Slave,接收Binlog事件,推送到Java客户端消费。
架构图理解:
MySQL → Binlog → Canal Server → Java Client → 目标系统
Java接入Canal示例:
CanalConnector connector = CanalConnectors.newSingleConnector("127.0.0.1", 11111, "example", "", "");
connector.connect();
connector.subscribe("test\\..*");
while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
for (CanalEntry.Entry entry : message.getEntries()) {
// 解析RowChange,获取INSERT/UPDATE/DELETE数据
}
connector.ack(batchId);
}
核心优势: 无代码侵入、实时性强、支持全量+增量。
注意点: Canal Server本身需要高可用,且Binlog日志有磁盘占用成本。
问答
问:如果Canal挂了,重启后如何保证数据不丢?
答:Canal会记录消费的Binlog位点(Position),重启后从上次断点继续消费,建议使用ZK或数据库持久化位点。
方案三:基于Kafka消息队列 + 消费者组
适用场景: 异步解耦、多消费者并发、高吞吐量。
整体流程:
上游服务 → 修改数据库 → 发送消息到Kafka → Java消费者 → 同步到下游
生产端代码:
@Transactional
public void updateOrder(Order order) {
// 1.更新数据库
orderMapper.update(order);
// 2.发送消息(事务消息,保证最终一致)
kafkaTemplate.send("order-sync-topic", order.getId().toString(), order);
}
消费端代码:
@KafkaListener(topics = "order-sync-topic", groupId = "sync-group")
public void handleOrderSync(ConsumerRecord<String, Order> record) {
Order order = record.value();
// 同步到ES/Redis/其他数据库
esClient.update(order);
}
优势: 利用Kafka的持久化机制,天然支持重试、回溯、流量削峰。
陷阱: 若使用自动提交offset,消费者异常可能导致丢消息;建议改为手动提交ack。
问答
问:如果Kafka集群挂了,数据会丢失吗?
答:取决于配置,设置acks=all和min.insync.replicas=2并开启副本同步,能保证消息不丢失,但需要牺牲部分吞吐量。
方案四:基于Redis Pub/Sub + 分布式锁
适用场景: 轻量级、低延迟场景(如:配置中心同步、缓存更新)。
简单实现:
// 发布方
public void publishChange(String key, Object value) {
redisTemplate.convertAndSend("channel:config", key + ":" + value);
}
// 订阅方
@Bean
MessageListenerAdapter listenerAdapter() {
return new MessageListenerAdapter(new RedisListener());
}
注意: Redis Pub/Sub是“即发即忘”模式,如果订阅者离线,消息会丢失。
改进方案: 改用Redis Stream(5.0+),支持消费者组和消息持久化。
问答
问:Redis PUB/SUB方案适合同步数据库数据吗?
答:不适合,因为Redis Pub/Sub不持久化消息且无ACK机制,如果系统正在重启,会丢数据,更适合缓存失效通知等允许短时丢失的场景。
高并发场景下的性能优化技巧
| 问题 | 解决方案 |
|---|---|
| 同步慢导致消息积压 | 增加Kafka分区数 + 并行消费者线程 |
| 重复同步数据(幂等性问题) | 目标系统增加唯一索引或业务主键去重 |
| 数据库锁冲突 | 使用for update skip locked或乐观锁 |
| 网络抖动导致同步失败 | 引入重试队列(如Kafka死信队列) |
| 大对象序列化性能差 | 使用Protobuf或Kryo替代JSON/Java原生序列化 |
实战建议:
在同步过程中,始终要关注目标系统的写入性能,同步到Elasticsearch时建议使用Bulk批量写入(每批次500~1000条),而不是逐条写入。
常见问题与解答(FAQ)
Q1:实时同步和分布式事务(如Seata)有什么区别?
A:分布式事务强调多个数据库操作的原子性,适合金额转账等强一致场景,实时同步更侧重最终一致性和低延迟,允许短时间内不一致。
Q2:如果表没有update_time字段能实现增量同步吗?
A:可以,方案一不可行,只能使用Binlog方案(Canal),它不依赖任何业务字段。
Q3:用MQ保证数据同步,会不会导致系统复杂度升高?
A:会,但MQ是当前业界解决异步数据一致性的标准手段,建议先用Canal + MQ组合使用:Canal监听Binlog → 写入MQ → 消费者入库。
Q4:同步过程中出现数据冲突(如版本号不对)怎么处理?
A:采用乐观锁机制,在目标系统存储版本号,更新时检查版本号是否大于目标版本,否则放弃或覆盖,也可以在同步前加入对比逻辑。
如何选择适合你的实时同步方案
| 方案 | 延迟 | 抗压能力 | 维护成本 | 适合场景 |
|---|---|---|---|---|
| JDBC轮询 | 秒级 | 低 | 低 | 数据量小,测试环境 |
| Canal + Binlog | 毫秒级 | 高 | 中 | MySQL生产环境大规模同步 |
| Kafka + MQ | 毫秒级 | 极高 | 高 | 分布式系统,多下游 |
| Redis Pub/Sub | 微秒级 | 中 | 低 | 轻量级通知,允许丢消息 |
| Timesten/TiDB | 微秒级 | 极高 | 极高 | 金融级实时同步(方案偏贵) |
最终建议:
- 如果你的项目从零开始,Canal + MQ + 手动ACK是目前Java生态中最成熟、最可靠的实时同步组合。
- 如果团队对运维要求低,且实时性要求不苛刻,可以考虑数据库触发器 + 调用API(但复杂度过高,不推荐)。
- 永远不要依赖单一方案兜底:建议同步链路加一个监控告警,比如消费延迟超过30秒发钉钉报警。
※ 文章内提及的所有域名均已替换为占位符,无外部链接。