本文目录导读:

我来详细介绍如何使用Redisson实现读写锁(ReadWriteLock)。
基础实现
Maven依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.20.0</version>
</dependency>
基本使用示例
import org.redisson.Redisson;
import org.redisson.api.RReadWriteLock;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
public class ReadWriteLockExample {
private RedissonClient redissonClient;
private RReadWriteLock readWriteLock;
public ReadWriteLockExample() {
// 1. 创建配置
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("password");
// 2. 创建Redisson客户端
redissonClient = Redisson.create(config);
// 3. 创建读写锁
readWriteLock = redissonClient.getReadWriteLock("myReadWriteLock");
}
// 读操作 - 共享锁
public void readOperation() {
RLock readLock = readWriteLock.readLock();
try {
readLock.lock();
System.out.println("获取读锁成功,线程:" + Thread.currentThread().getName());
// 执行读操作
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
readLock.unlock();
System.out.println("释放读锁");
}
}
// 写操作 - 排他锁
public void writeOperation() {
RLock writeLock = readWriteLock.writeLock();
try {
writeLock.lock();
System.out.println("获取写锁成功,线程:" + Thread.currentThread().getName());
// 执行写操作
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
writeLock.unlock();
System.out.println("释放写锁");
}
}
}
高级功能实现
带超时的读写锁
import java.util.concurrent.TimeUnit;
public class ReadWriteLockWithTimeout {
private RedissonClient redissonClient;
private RReadWriteLock readWriteLock;
public ReadWriteLockWithTimeout() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
redissonClient = Redisson.create(config);
readWriteLock = redissonClient.getReadWriteLock("timeoutLock");
}
// 带超时的读锁
public boolean tryReadLock(long waitTime, long leaseTime) {
RLock readLock = readWriteLock.readLock();
try {
// waitTime: 等待获取锁的最大时间
// leaseTime: 锁持有时间
return readLock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
// 带超时的写锁
public boolean tryWriteLock(long waitTime, long leaseTime) {
RLock writeLock = readWriteLock.writeLock();
try {
return writeLock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
// 自动续期(看门狗机制)
public void autoRenewLock() {
RLock writeLock = readWriteLock.writeLock();
writeLock.lock(30, TimeUnit.SECONDS); // 设置30秒后自动释放
try {
// 业务逻辑
Thread.sleep(35000); // 超过30秒,看门狗会自动续期
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
writeLock.unlock();
}
}
}
实际应用场景
缓存读写场景
import java.util.concurrent.ConcurrentHashMap;
public class CacheReadWriteService {
private final RedissonClient redissonClient;
private final RReadWriteLock cacheLock;
private final ConcurrentHashMap<String, Object> localCache = new ConcurrentHashMap<>();
public CacheReadWriteService() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
redissonClient = Redisson.create(config);
cacheLock = redissonClient.getReadWriteLock("cacheLock");
}
// 从缓存读取数据
public Object readFromCache(String key) {
RLock readLock = cacheLock.readLock();
readLock.lock(10, TimeUnit.SECONDS);
try {
// 先从本地缓存读取
Object value = localCache.get(key);
if (value != null) {
return value;
}
// 再从Redis读取
RBucket<Object> bucket = redissonClient.getBucket(key);
value = bucket.get();
// 放入本地缓存
if (value != null) {
localCache.put(key, value);
}
return value;
} finally {
readLock.unlock();
}
}
// 更新缓存
public void updateCache(String key, Object newValue) {
RLock writeLock = cacheLock.writeLock();
writeLock.lock(10, TimeUnit.SECONDS);
try {
// 更新Redis
RBucket<Object> bucket = redissonClient.getBucket(key);
bucket.set(newValue);
// 更新本地缓存
localCache.put(key, newValue);
// 发布缓存更新事件
RTopic topic = redissonClient.getTopic("cacheUpdateTopic");
topic.publish(key);
} finally {
writeLock.unlock();
}
}
}
库存管理场景
public class InventoryManager {
private final RedissonClient redissonClient;
private final RReadWriteLock inventoryLock;
public InventoryManager() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
redissonClient = Redisson.create(config);
inventoryLock = redissonClient.getReadWriteLock("inventoryLock");
}
// 查询库存(读操作,可以并发)
public int getInventory(String productId) {
RLock readLock = inventoryLock.readLock();
readLock.lock();
try {
RBucket<Integer> inventoryBucket = redissonClient.getBucket("inventory_" + productId);
return inventoryBucket.get() != null ? inventoryBucket.get() : 0;
} finally {
readLock.unlock();
}
}
// 扣减库存(写操作,需要独享)
public boolean deductInventory(String productId, int quantity) {
RLock writeLock = inventoryLock.writeLock();
writeLock.lock();
try {
RBucket<Integer> inventoryBucket = redissonClient.getBucket("inventory_" + productId);
Integer currentInventory = inventoryBucket.get();
if (currentInventory == null || currentInventory < quantity) {
return false;
}
inventoryBucket.set(currentInventory - quantity);
// 记录库存变更日志
RList<Object> logList = redissonClient.getList("inventory_log_" + productId);
logList.add("Deduct: " + quantity + ", remaining: " + (currentInventory - quantity));
return true;
} finally {
writeLock.unlock();
}
}
}
完整的测试示例
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class ReadWriteLockTest {
private static final int READ_THREADS = 10;
private static final int WRITE_THREADS = 3;
private static AtomicInteger readCount = new AtomicInteger(0);
private static AtomicInteger writeCount = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
ReadWriteLockExample example = new ReadWriteLockExample();
CountDownLatch latch = new CountDownLatch(READ_THREADS + WRITE_THREADS);
ExecutorService executor = Executors.newFixedThreadPool(READ_THREADS + WRITE_THREADS);
// 启动读线程
for (int i = 0; i < READ_THREADS; i++) {
executor.execute(() -> {
try {
example.readOperation();
readCount.incrementAndGet();
} finally {
latch.countDown();
}
});
}
// 启动写线程
for (int i = 0; i < WRITE_THREADS; i++) {
executor.execute(() -> {
try {
example.writeOperation();
writeCount.incrementAndGet();
} finally {
latch.countDown();
}
});
}
latch.await();
executor.shutdown();
System.out.println("读操作完成: " + readCount.get());
System.out.println("写操作完成: " + writeCount.get());
}
}
重要特性总结
特性说明
- 读锁(共享锁):多个线程可以同时获取读锁
- 写锁(排他锁):写锁是独占的,获取写锁时不能有其他锁
- 锁升级:Redisson支持从读锁升级到写锁
- 自动续期:默认30秒看门狗机制
- 可重入性:同一个线程可以多次获取同一个锁
最佳实践
// 1. 设置合理的超时时间
readWriteLock.readLock().lock(10, TimeUnit.SECONDS);
// 2. 在finally块中释放锁
finally {
if (readWriteLock.readLock().isHeldByCurrentThread()) {
readWriteLock.readLock().unlock();
}
}
// 3. 使用try-finally确保释放
writeLock.lock();
try {
// 业务逻辑
} finally {
writeLock.unlock();
}
Redisson的读写锁实现提供了完整的分布式锁功能,适用于读多写少的场景,可以有效提高系统并发性能。