用stream mq 和redis 实现高并发投票功能
好的,以下是一个使用Spring Stream和Redis实现高并发投票的示例:
- 配置RabbitMQ和Redis
在application.yml文件中添加以下配置:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
streams:
binder:
brokers: localhost:9092 # Kafka brokers地址
bindings:
vote-in:
group: vote-group # 绑定到名为vote-in的Stream输入通道,并将其分配给名为vote-group的消费者组(Kafka中的消费者组)
destination: campaignId # 消息的目标主题为campaignId
redis:
host: localhost
port: 6379
- 编写Stream生产者和消费者
在您的Spring Boot应用程序中,定义输入通道和输出通道。例如:
public interface VoteEventStreams {
String INPUT = "vote-in";
@Input(INPUT)
SubscribableChannel voteIn();
}
public interface VoteCountStreams {
String OUTPUT = "vote-out";
@Output(OUTPUT)
MessageChannel voteOut();
}
VoteEventStreams定义了一个名为vote-in的输入通道,VoteCountStreams定义了一个名为vote-out的输出通道。这些通道将用于在Stream生产者和消费者之间传输数据。
然后,您可以定义一个Stream生产者,并将投票请求写入RabbitMQ:
@RestController
public class VoteController {
private final VoteCountStreams voteCountStreams;
public VoteController(VoteCountStreams voteCountStreams) {
this.voteCountStreams = voteCountStreams;
}
@PostMapping("/votes")
public void submitVote(@RequestBody Vote vote) {
// 将投票请求写入RabbitMQ
voteCountStreams.voteOut().send(MessageBuilder.withPayload(vote).build());
}
}
在这个例子中,Vote是投票请求的Java对象。您可以在Message中包装Vote对象并发送到vote-out通道。
然后,您可以定义一个Stream消费者,在RabbitMQ中读取投票请求,并将其放入缓存中:
@Configuration
@EnableBinding(VoteEventStreams.class)
public class VoteEventProcessor {
private final RedisTemplate<String, Object> redisTemplate;
@Autowired
public VoteEventProcessor(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@StreamListener(VoteEventStreams.INPUT)
public void processVoteEvent(@Payload Vote vote) {
// 将投票数据存储到Redis缓存中
addVoteToCache(vote.getCampaignId(), vote.getOptionId());
}
private void addVoteToCache(String campaignId, String optionId) {
String key = "votes:" + campaignId;
redisTemplate.opsForSet().add(key, optionId);
}
}
在这个例子中,VoterEventProcessor是一个用于读取RabbitMQ中的投票请求并将其存储到Redis缓存中的Stream消费者。注入了一个RedisTemplate作为操作Redis缓存的工具。每当有一个新的投票请求被写入RabbitMQ,该processVoteEvent方法会被调用,并将投票数据存储到Redis缓存中。
- 处理投票请求
定义一个投票请求处理类,将投票数据从Redis缓存中读取并执行增加投票计数的操作。该类应该定期检查Redis缓存中的投票数据进行处理,并确保不会重复处理相同的投票数据。例如:
@Component
public class VoteProcessor implements Runnable {
private final RedisTemplate<String, Object> redisTemplate;
private final CampaignService campaignService;
private final ObjectMapper objectMapper;
private final ScheduledExecutorService executorService;
private final int BATCH_SIZE = 100;
@Autowired
public VoteProcessor(RedisTemplate<String, Object> redisTemplate, CampaignService campaignService, ObjectMapper objectMapper) {
this.redisTemplate = redisTemplate;
this.campaignService = campaignService;
this.objectMapper = objectMapper;
this.executorService = Executors.newSingleThreadScheduledExecutor();
}
public void start() {
executorService.scheduleWithFixedDelay(this, 0, 10, TimeUnit.SECONDS);
}
@Override
public void run() {
// 从Redis缓存中获取投票数据
Set<Object> votesSet = redisTemplate.opsForSet().members("votes:" + campaignId);
if (votesSet != null && !votesSet.isEmpty()) {
List<String> votes = new ArrayList<>(votesSet.size());
votesSet.forEach(vote -> votes.add((String) vote));
// 批量处理投票请求
for (int i = 0; i < votes.size(); i += BATCH_SIZE) {
List<String> batch = votes.subList(i, Math.min(i + BATCH_SIZE, votes.size()));
for (String voteData : batch) {
try {
Vote vote = objectMapper.readValue(voteData, Vote.class);
if (vote != null) {
// 安全地增加选项计数
campaignService.incrementVoteCount(vote.getCampaignId(), vote.getOptionId());
}
// 从Redis缓存中删除投票数据
redisTemplate.opsForSet().remove("votes:" + campaignId, voteData);
} catch (Exception e) {
// 解析投票请求失败,忽略该请求
}
}
}
}
}
}
在这个例子中,VoteProcessor是一个定期从Redis缓存中读取投票数据并将其写入数据库的异步作业,它使用CampaignService增加选项计数。每当有投票请求被写入Redis缓存,该类中的run方法就会被调用,定期读取Redis缓存中的投票数据进行批量计数处理。在将投票数据写入数据库后,应该从Redis缓存中删除相应的投票数据,以避免重复处理。
- 启动应用程序
在您的Spring Boot应用程序中启用上述组件,并执行以下操作:
@Configuration
public class AppConfig {
@Autowired
private VoteProcessor voteProcessor;
@PostConstruct
public void init() {
voteProcessor.start();
}
}
在投票请求已被写入RabbitMQ和Redis缓存,每个服务组件按照上述流程执行相应的任务,最终实现高并发的投票处理。
希望这个示例可以帮助您理解如何使用Spring Stream和Redis实现高并发投票场景。注意,这里只是一个简单实例,您需要根据实际需求进行更完善的异常处理和日志记录。
- 感谢你赐予我前进的力量

