SpringBoot integration redis分布式锁

  |  
阅读次数
  |  
字数 876
  |  
时长 ≈ 4 分钟

在使用Redis的发布/订阅功能时,由于多节点同时订阅同个通道后会同时消费,此时需要使用一定的策略来避免重复消费的情况。

以下有两种方案:

  1. 将发布者与订阅者部署在同一台机器上,然后在启动应用时动态初始化通道名称加上IP,这样由当前机器发布的消息只能由当前机器来消费,使用此方法消费者在同一机器上也最多只能部署一台。
  2. 使用分布式锁,在订阅者收到通道消息后,对当前资源加锁,抢到锁的则有权限执行业务代码。这种相对于前者而言,就显得便捷很多。本次我们使用SpringBoot Integration Redis的分布式锁完成此功能。

pom依赖

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>

yml配置

1
2
3
4
5
6
spring:
redis:
database: 0
host: localhost
port: 6379
timeout: 1s

java代码

Redis 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public CacheManager cacheManager(RedisTemplate<?,?> redisTemplate) {
CacheManager cacheManager = new RedisCacheManager(redisTemplate);
return cacheManager;
}

@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, String> redisTemplate = new RedisTemplate<String, String>();
redisTemplate.setConnectionFactory(factory);
return redisTemplate;
}
}

Redis发布/订阅配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
@Slf4j
public class RedisSubListenerConfig {
/**
* 初始化监听器
*
* @param connectionFactory
* @param listenerAdapter
*
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// todo 是否需要配置优化线程池
// todo 可以使用自定义序列化
// new PatternTopic("这里是监听的通道的名字") 通道要和发布者发布消息的通道一致
// container 可以添加多个 messageListener,订阅多个通道
container.addMessageListener(listenerAdapter, "ts-tmpl-svc-msgusrinf");
return container;
}

/**
* 绑定消息监听者和接收监听的方法
*
* @param redisReceiver 消息接收者
*
* @return
*/
@Bean
public MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
// receiveMessage 消息接收后的方法
return new MessageListenerAdapter(redisReceiver, "receiveMessage");
}
}

Redis分布式锁配置

1
2
3
4
5
6
7
8
@Configuration
public class RedisLockConfig {
@Bean
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory factory) {
// 默认60秒过期
return new RedisLockRegistry(factory, "ts-template");
}
}

消息发布者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
@Slf4j
public class MsgUserInfoServiceImpl implements MsgUserInfoService {
@Autowired
private MsgUserInfoRepository msgUserInfoRepository;
@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Override
public Long saveAndPushToQueue(MsgUserInfo info) throws TemplateApiException {
Long id = 0L;
try {
MsgUserInfo result = msgUserInfoRepository.saveAndFlush(info);
id = result.getId();
// 发布消息到通道
redisTemplate.convertAndSend("ts-tmpl-svc-msgusrinf", result);
} catch (Exception e) {
log.error("数据记录异常:openid={},errMsg={}", info.getWeixinno(), e.toString(), e);
throw new TemplateApiException("15", "消息记录到数据库异常");
}
return id;
}
}

消息订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Component
@Slf4j
public class RedisReceiver {
@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private ModelPushService modelPushService;

@Autowired
private RedisLockRegistry redisLockRegistry;

/**
* 收到通道的消息之后执行的方法
*
* @param message 消息
* @param channel 通道
*/
public void receiveMessage(String message, String channel) {
MsgUserInfo msgUserInfo = (MsgUserInfo) redisTemplate.getDefaultSerializer().deserialize(message.getBytes());
// 对当前资源加锁
Lock lock = redisLockRegistry.obtain(String.valueOf(msgUserInfo.getId()));
boolean isLockOwn = true;
try {
// 当多个通道同时收到消息时,只有一个能够加锁成功
isLockOwn = lock.tryLock();
if (isLockOwn) {
log.info("Received data {} from topic {}", message, channel);
modelPushService.saveAndPushInfo(msgUserInfo, false);
}
} catch (Exception e) {
log.error("队列消费异常:errMsg={}", e.toString(), e);
} finally {
// 解锁
if (isLockOwn) {
lock.unlock();
}
}
}
}