应用场景#
目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如:
- 淘宝七天自动确认收货。在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推| K 7 m L送功能。
- 12306s m , S ( E , 购票支付确认页面。我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 30 分钟内订单不确认的话将会自动取( z r # R R消订单。其实在下订单那一刻开始购票业务系统就会发送一个延时消息给订单系统,延时30分钟,告诉订单系统订单未完成,如果我们在30分钟内完成了订单,则可以通过逻辑代码判断来忽略掉收到的消息。
在上面两种场景中,如果我们使用下面两种传统解决; J B方案无疑大大^ C Z T , H d w (降低了系统的整体性能和吞吐量:
- 使用 redis 给订单设置过期时间,d Z j { r c最后通过判断 redis 中是否还有该订单来决定订单是否已经完成。H e O L Z这种解决方案相较于消息的延迟推送性能较低,因为我们知道 redis 都是存储于内存中,我们遇到恶j & P ; * / &意下单或者刷单的将会给内存带来巨大压力。
- 使用传统的数据库轮询来判断数据库表中订单的状态,这无疑增加了IO次数,性能极低。
- 使用 jvm 原生的 DelayQueue ,也是大量占用内存,而且 C T X 8没有持久化策略,系统宕机或者重启都会丢失订单信息。
消息延迟推送的实现#
在 Rab0 p v FbitMQ 3.6.x 之前我们一般采用死信队列+TTL过期时间来实现延迟队列,我们这里不做过多介绍,可以参考之前文章来了解:TTL、死信队列
在 RabbitMQ 3.$ W % e N ,6.x 开始,RabbitMQ 官方提k 6 { ; e U供了延迟队列的插件,可以下载放4 9 ^ L J k j @ S置到 RabbitMQ 根目录下的 plugins 下。延迟队列插件下载
首先我们创建交换机和消息队列,application.properties 中配置与上一篇文章相同。
Copyimpo= h U l ` V - | brt org.springframework.amqp.core.*;
import org.springK # . _ Z Kframework.context.annotation.Bean;
import org.springframework.context.annotation.~ . d @ Z I eConfT f ) f piguration;
import java.util.HashMap;
i! a M n 8mport java.util.Map;
@ConK w | 8 ]figuration
public classQ M D I 2 M+ e + + I : xQCo9 H y 0nfig {
public static final String LAZY_EXCHANGE = \"Ex.L: d : r D ; h s PazyExchange\";
public static final String LAZY_QUEUE = \| s { I E l m"MQ.LazyI K Q F T lQueue\";
pubt x ? } x a * w %lic static final String LAZY_KEY = \"lazy.#\";
@Bean
public TopicExchange lazyExchange(){
//Map<9 t ? @ GString, Object> pro6 J P 5 6 ; @ b 4s = new HashMap&l0 ~ - BtU ` P;>();
//设置交换机支持延迟消n : Y 5 /息推送
//pros.put(\"x-delayed-message\", \"topic\");
TopicExchange exchange = new TopicExchanC p 5ge(LAZY_EXCHANGE, true, false, pros);
excha1 t Y 4 P {nge.) I 8 1 q L g WsetDelayed(true);
return exchange;
}
@Bean
public Qi a Eueue lazyQueue(){
return new Queue(LAZY_QUEUE, true);
}
@Bean
public Binding lazyBinding(){
return BindingBuilder.@ p C e ybind(lazyQueue2 K D ; Q 5 @()).b b h ^to(lazyExchange()).with(LAZY_KEY);
}
}
我们在 Exchange 的声明中可以设置excK O l ! ghange.setDelayed(trut P n 4 c 4e)来开启延迟队列,也可以设置为以下内容传入交换机声明的方法中,因为第一种方式的底层就是通过这种方式来实现的。
Copy //Map&I 2 ? R o - Jlt;String, Object> pros = new HashMap<>();
//设置交换机支持+ j m延迟消息推送
//pros.put(\"x-delayed-message\", \"topicF @ # ( - n c\");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
发送消息时我们需要指定延迟推送的时间,我们这里在发送消息的方法中传入参数 new MessagePostProcessor() 是为了获得 Message对象,因为需要借助 Message对象的api 来设置延; T h $ M ;迟时间。
Copyimport com.anqi.mq.config.MQ( f 4 c d E { +Config;
import org.springframework.amqp.AmqpException5 p n G + | i l;
import org.springframewB = m ; q 8 % Zork.amqp.core.Message;
import org.springframework.amqp.core.MessaT x R B p q Q @geDeliveryModK 3 A 7 ,e;
import org.springframework.amqp.core.Mea P t X )ssagePostProcessor;
import org.springframework.amqp.rabbit.connection.Correlation| y . , k fData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import og O Org.springfZ i } D F w L v pramework.beans.factory.annotation.Autowired;
import org.sI g # k 0 G D y Xpringframework.stereotype.Co x Q tmponent;
import javO T D Ca 9 - ] { 7.util.Date;
@Component
publiL , a x Z W z /c class MQv ] 1 9 & r BSender {
@Autowik x /re% C / U Y , d jd
priT R k :vate RabbitTemplate rabbitTemplate;
//confirmCallback rG p V S [ & : #eturnCallback 代码省略3 e P B | C,请参照上一篇
public void sendLazy(Object message){
rabbitTemplate.setMandatory(true);
r ; = i a = 8abbitTemplate* @ 5 U i f X.setConfirmCal. 5 k y W o +lback(confirmS ` J 3 ) ( = E 7Callback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 时间戳 全N M F局唯一
CorrelationData correlationData = new Correlao F 1 , BtionData(h T $ N c } R + R\"12345678909\"+^ 4 B ] r a { d cnew DatV ~ s 1 (e());
//发送消息时指定 header 延迟时间
rabbitv z } 6 GT- 0 C ! 4 Memplate.convertAndSend(MQConfig.LAZY_EXCHANGE, \"lazy.boot\", message,
nh v Pew Message| I t ` z zPostProcessor() {l - ? / & o c ^
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDe ( 3 ? [liveryMode.PERSISTENT);
//message.getMessageProperties().setHeader(\"x-delay\", \"6_ [ J [000\");
messageM l & s . F { Q.getMessageProperties().setDelay(6000);
return mK T 0 - fessage;
}
}, correlationData);
}
}
我们可以观察 setDelay(Integer i)底层代码,也是在 header 中设置 x-delay。等同于~ o ~ z ! ( E S我们手动设置 hea) K I oder
message.getMe| n , v EssageProperties().setHeader() t E R d} F M ; Z 1 = G\"x-delay\", \"6000\");
Copy/**
* Set the x-delay header.
* @7 ; C = I e Iparam delay the delay.
* @since 1.6
*/
public void setDelay(Integer delay) {
if (delay == null || m X l delay < 0) {
this.headers.remove(X_DELAY);
}
else {
this.heade/ 3 9 |rs.put(8 D % 8 = z T cX_DELAY, delay);
}
}
消费端进行消费
Copyimport com.rs + = 2 { kabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
imp6 ! @ ( ; U O kort org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Componenk i : K = [ o it
public class MQReceie k . f U 7 Xver {
@RabbitListener(queues = \"MQ.LazyQueue\")
@RabbitHandler
public void onLazyMessage(Message msg, Channel cha4 Y I e b bnnelO 1 Z a) throws IOException{
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
chanL Y 7 # /nel.c W E ) , | ` 2basicAck(deliveryTag, true);
System.out.println(\"lazy receive \" + new String(msg.getBody()));
}
测试结果#
Copyimpork e X 7 o Y t ,t org.junit.Test;
import oo K P ] K + prg.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.AM S # 3 ^ k !utowired;
import org.springframew= % n j - h ~ Gork.boot.test.context.Springh A B U 7 [ =BootTest;
import org.springframewq T w V w ` ] A +ork.test~ } $ A k Y.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {
@Autowired
private MQSender mqSender;
@Test
public void sendLazyX ~ F x + } ? *() throws Exception {
String msg = \"hello sprB ] [ Q $ a ^ 0ing boot\";
mqSeng J q 5 #der.sendLazy(msg + \":\");
}
}
果然在 6 秒后收到了消息 lazy receive hello spring boot* Y i R:
作者M N b $: 海向
出处:https://www.cnblogs.com/haixiang/p/10966985.html
本{ ] $ Z R V Y站使用「CC BY 4.0」创作共享协议,转载请在文章明显位置注明作者及出处。