在项目中使用消息队列以提高QPS的方案探究
目标:提高短信平台项目QPS
方案概要:使用阿里巴巴开源的rocketmq消息队列对平台做一次异步改造,目前短信平台项目发送短信的过程是同步的,即前端发送请求需要等待后台处理完并返回结果才结束一次完整的调用,但是后台返回的结果前端并没有做相应的处理,整个消息发送过程的可靠性由后台保证。用户在前台提交完结果后若想知道发送是否成功目前唯一的做法是去短信发送记录中查看自己当天发送短信的记录。所以这里可以将提交的请求放入消息队列后收到入队成功的消息就返回到前端,后面的请求交由对应的短信发送服务去消息队列当中拉取。
改造效果对比
一. 改造前压测情况
为了后面加入mq后有一个对照效果,我对短信发送接口做了一个初始压测,使用了Jmeter压测工具完成整个压测过程。具体步骤如下:
- 借助同学写的一个模拟用户数据生成的脚本随机生成500/750/1000条用户数据。
- 开500/750/1000个线程模拟500/750/1000个用户发请求,记录各自的聚合数据和图形结果。
- 对比加入mq后的测试结果。
模拟500个用户同时发短信


模拟750个用户同时发短信


模拟1000个用户同时发短信


分析
- 500个并发请求下系统正常工作,qps为51.3。
- 750个并发请求下异常提高,qps下降,察看结果树表明有一些请求连接被重置,以及出现超时的情况。
- 1000个请求异常率攀升。偏离指数上升,吞吐量下降。
- 检查原因发现出现异常的原因在于MongoDB,本文主要探讨mq对qps提升的影响,所以对比的时候只对比500个并发下qps到底能提升多少。
二. 改造后压测情况
模拟500个用户同时发短信
/模拟500个用户/聚合数据.png)
/模拟500个用户/图形结果.png)
模拟750个用户同时发短信
/模拟750个用户/聚合数据.png)
/模拟750个用户/图形结果.png)
可以看到500个用户发请求,qps提高到135,提高了约1.5倍
改造过程
springboot使用rocketmq
之前已经在服务器上安装好了一个Docker版单机Rocketmq,所以这里直接使用就好了
1. 在pom文件中引入相关jar包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
2. 配置Producer为bean
public class Producer {
private Logger logger = LoggerFactory.getLogger(Producer.class);
private String producerGroupName;
private String nameServerAddr;
private DefaultMQProducer producer;
public Producer(String producerGroupName, String nameServerAddr){
this.producerGroupName = producerGroupName;
this.nameServerAddr = nameServerAddr;
}
/**
* 消息生产者初始化
* @throws MQClientException
*/
public void init() throws MQClientException {
logger.info("开始启动消息生产者服务...");
//创建生产者
producer = new DefaultMQProducer(producerGroupName);
//设置名字服务器
producer.setNamesrvAddr(nameServerAddr);
//启动生产者
producer.start();
//设置异步失败重发次数
producer.setRetryTimesWhenSendAsyncFailed(1);
logger.info("消息生产者服务启动成功.");
}
public void destory(){
logger.info("开始关闭消息生产者服务...");
producer.shutdown();
logger.info("消息生产者服务已关闭.");
}
public DefaultMQProducer getProducer() {
return producer;
}
}
3. 注册producer
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="producer" class="cn.bookcycle.consumer.messagequeue.Producer" init-method="init" destroy-method="destory">
<constructor-arg index="0" value="smssend"/>
<constructor-arg index="1" value="${rocketmq.nameServerAddr}"/>
</bean>
</beans>
4. 配置consumer为bean
public class Consumer {
private Logger logger = LoggerFactory.getLogger(Consumer.class);
private String consumerGroupName;
private String nameServerAddr;
private String topicName;
private DefaultMQPushConsumer consumer;
private MessageListenerConcurrently messageListener;
public Consumer(String consumerGroupName, String nameServerAddr, String topicName, MessageListenerConcurrently messageListener){
this.consumerGroupName = consumerGroupName;
this.nameServerAddr = nameServerAddr;
this.topicName = topicName;
this.messageListener = messageListener;
}
public void init() throws Exception{
logger.info("开始启动消费者服务");
//创建一个消息消费者,并设置一个消息消费者组
consumer = new DefaultMQPushConsumer(consumerGroupName);
//指定 NameServer 地址
consumer.setNamesrvAddr(nameServerAddr);
//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅指定 Topic 下的所有消息
consumer.subscribe(topicName, "*");
//注册消息监听器
consumer.registerMessageListener(messageListener);
// 消费者对象在使用之前必须要调用 start 初始化
consumer.start();
logger.info("消息消费者服务启动成功.");
}
public void destroy(){
logger.info("开始关闭消息消费者服务...");
consumer.shutdown();
logger.info("消息消费者服务已关闭.");
}
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
}
5. 注册consumer
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="producer" class="cn.bookcycle.consumer.messagequeue.Producer" init-method="init" destroy-method="destory">
<constructor-arg index="0" value="smssend"/>
<constructor-arg index="1" value="${rocketmq.nameServerAddr}"/>
</bean>
<bean id="consumer" class="cn.bookcycle.consumer.messagequeue.Consumer" init-method="init" destroy-method="destroy">
<constructor-arg index="0" value="smssend"/>
<constructor-arg index="1" value="${rocketmq.nameServerAddr}"/>
<constructor-arg index="2" value="sms"/>
<constructor-arg index="3" ref="messageListener"/>
</bean>
<bean id="messageListener" class="cn.bookcycle.consumer.messagequeue.MessageListener" />
</beans>
6. 配置消息监听器MessageListener
监听到消息后将消息发往处理消息的服务,服务根据自己的能力从队列中取消息出来消费。
public class MessageListener implements MessageListenerConcurrently {
private Logger logger = LoggerFactory.getLogger(MessageListener.class);
@Autowired
private RestTemplate restTemplate;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (list != null){
for (MessageExt messageExt : list){
try {
String newMessage = new String(messageExt.getBody(),"utf-8");
logger.info("监听到消息" + newMessage);
logger.info("将消息发往真正的消息处理者" + newMessage);
//向真正的消息处理者发送消息请求
String result = restTemplate.postForObject("http://sms-service/sms", newMessage, String.class);
logger.info("收到消息的处理结果" + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
7. 网关接口改造
//原网关接口需要接收到消息后会立即向消息的真正消费者发起请求调用
@RequestMapping(value = "/sms", method = RequestMethod.POST)
public String smsConsumer(@RequestBody (required=false) String requestBody) {
return restTemplate.postForObject("http://sms-service/idtfcode", requestBody, String.class);
}
//改造之后的网关会把请求入队列,等待消费者自己去拉取消息消费。
@RequestMapping(value = "/sms", method = RequestMethod.POST)
public String smsConsumer(@RequestBody (required=false) String requestBody) throws Exception {
//生产消息并返回入队结果
return messageService.produceMsg(requestBody);
}
以上就是完整的改造过程,改造完之后重新推导码云并在服务器拉取打包编译后重新上线,经过测试,qps有明显的提高。
改造过程中遇到的问题
1. 单元测试中Producer注入失败
问题描述:在Test中使用@Autowired注入Producer对象一直失败,由于对单元测试方面知识的欠缺,在这里遇到这个问题。
原因:读取applicationContext.xml配置文件之后才能注入,需要在Test类上使用SpringBootTest注解读取配置文件,在主类上添加读取配置文件的注解在这里是不起作用的。
@RunWith(SpringRunner.class)
@SpringBootTest
public class testMQ {
@Autowired
private Producer producer;
@Test
public void testMQ() throws Exception{
String jsonData = "{id:1}";
Message msg = new Message("sms",null,jsonData.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.getProducer().send(msg);
//对返回结果作处理
SendStatus sendStatus = sendResult.getSendStatus();
}
}
RunWith和SpringBootTest注解。加入这两个注解后就可以使用Autowired自动注入所需要的bean。