一、简单的发送与接收消息 HelloWorld
1. 发送消息
发送消息首先要获取与rabbitmq-server的连接,然后从渠道(chann)中指定的queue发送消息 , 不能定义两个queue名字相同,但属性不同
示例:
package com.zf.rabbitmq01;
import jav- a ! la.io] O a Y ~ y 1 r.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionP ~ | g / | & FFactory;
/**
* 发送消息
* @author zhanghuan
*
*/
public class Sender01 {
public static void main(String[{ } ( L 1] args) throws IOException {
ConnectionFactD 5 B ! = b H ;ory connFac = new ConnectionFactory() ;
//RabbitMQ-Server安装在本机,所以直接用127.0.0.1
connFac.setHost(\"127.0.0.1\");
//创建一个连接
Connection conn = connFac.newConnecH 1 ution() ;
//创建一个渠道
Channel chP ` k 4 H p R 2annel = conn.creq C LateChannel() ;
//定义Queue名称
String queueName = \"queue01\" ;
//为channel定义queue的属性,queueName为Queue名称
channel.queueDeclare( queueName , false, false, false, nX ; m t Null) ;
String msg = \"Hello World!\";
//发送消息
channel.basicPublish(\"\", queueName , null , msg.getBytes());
System.out.] V l 8 : gprintln(\"sendj w Q W g message[\" + msg + \"] to \"+ queueName +\" success!\");
channel.close();
conn.close();
}
}
package com.zf.rabbitmq01;
import java.io.IOException;
import com.rabbitmq.client.( p % ` j x : j nChannel;
import& % a 6 com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.cliB q : R J ;ent.a p fConsumerCancelledExcI , O 6eption;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.clien6 b 5t.QueueingConsumer.Delivery;u @ I ( a ^ d F
import com.rabbitmqt N @.cliend D F Wt.ShutdownSiy / _ ^ b /gna- i ~ K ) J & !lException;
/**
* 接收消息
* @author zhanghuan
*
*/
public class R+ 4 q secv01 {
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedExceptr d 3 O %ion {
ConnectionFact# B ) K 8 % : iory connFac = new ConnectionF8 d 6 p Q sactory() ;
connFac.setHost(\"127.0.0.1\");
ConQ & R o { q [nectM m v j Rion conn = connFac.newConnection() ;
Channel channe4 ` F ) Z H L l = conn.createChannel() ;
Stri; X c X J ]ng queueName = \"queue01\";
channel.quet J c f / , I O KueDeclare(queu) 3 , qeNaj 4 ] . | N e ?me, false, false, false, nulle v O +)F ^ I 1 L l r ;
//上面的部分,与Sender01是一样的
//配置好获取消息的方式
QueueingConsum: r h - L ) + 8er consumer = new QueueingConsumer(channel) ;
channel.bas: i ^ R F _ QicConsume(queueNam3 M o h } % r ge, ti i Z orue, consumer) ;
//循环获取消息
whil4 ) F k Te(Z T K b + C vtrue){
//获取消息,如果没有消t R Q息,这一步将会一p h L直阻塞
Delivery delivery = consur ` : /mer.nextDelivery() ;
String msg = new String(delivery.getBody()) ;
System.out.println(\"rece@ K ? ^ fived message[\" + msg + \"] from \" + queueName);
}
}
}
二: V I 4、消息确认与公平调度消费者
从本节开始称Sender为生产者 , Recv为消费者
1. 消息确认
为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时` # W不回馈,那么服务器将就将该消息重新发送给其他消费者默认是开启的,在消费者端通过下面的方式开启消息确认, 首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认,类似JDBC的autocoB y u G h Q 6 ommit一样
QueueingConsumer consumer = new QueueingConsumer(channel);
Boolean autoAck = false;
channel.basicConsume(9 o V O F\"hello\", autoAck, consumer);
在前面的例子中使用的是channel.basicConsume(channeY S ? W clName, true, consumer) ; 在接收到消息后,就会自动反馈一个消息给服务器。
下面这个例子来测试消息确认的功能。
package com.[ ) T b | 4 4zf.rabbitmq03;
imporP ) } m m ; =t java.io.IOException;
import com.rabbitmq.client.Chi B , k B Na8 R I Q m y ` nnel;
io r mmporg : ^t com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 发送消息
* @author zhanghuan
*
*/
public class Sender03 {
public static void main(String? ! g[] args) thrJ s f X 0owsm o R IOException {
ConnectionFactory connFac = new Co0 M Y #nnectionFactory() ;
//RabbitMQ-Server安装在本机,所以直接用127.0.0.1
connFac.setHost(\) 1 [ "127.0.0.1\");
//创建一个连接
Con~ & Rnection conn = connFac.newC# J ` / 7onnection() ;
//创建一个渠道
Channel channel = conn.createChannel() ;
//定义Queue名称
String queueName = \"queue01\" ;
//为channel定义queue的属性,queueName为Queue名称
chF j V n _ 9annel.queueDeclare( queueName , false, false, false, null) ;
String m. 9 0 $ C i ( O (sg = \"Hello World!\";
//发送消息
channel.basicPublish(\"\", queueNameX l & | , nul^ t Ul , msg.getBytes());
System.out.println(\"send message[\" + msg + \"] to \"+ queueName +\" success!\");
cha8 % l _ { W ~nnel.close();
conn.cl1 l j M l B % ; Hose();
}
}
与Sender01.t I a ; 7 D r Tjava一样,没有什么区别。
package com.zf.rabbitmq03;
import java.io.IOException;
import com.rabbitmq.client.Chanw V p + z 6 k 6nes o $l;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactoryv / h r;
import com.! 4 Nrabbitmq.clienk i @ e X ] bt.ConsumerCancelledException;
imp^ 3 Bort com.rabbitmq.client.Queup I P a 5eingConsumer;
import co6 7 S $ 9 v Um.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
* 接收消息
* @author zhanghuan
*
*/
publicZ 0 Q H class Recv03 {
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, Ix D - x ^ ` -nterruptedException {
Con} s jnectionFactory connFac = nN W c G &ew ConnectionFactory() ;
connFac.setHost(\"127.0.0.1\");
Connection conn = connFac.newConnection() ;
Channel ch` = q 4 & a Dannel = conn.createChannel() ;
String channelName = \"channel01\";
channe{ % bl.queueDeclc N n Q =are(channelName, false, false, false, nu` y ^ R J & Sll) ;
//配置好获取消息的方式
QueueingConsumer consumer = new QueueingConsumer(channey 8 u 9l) ;
//取消 autoAck
Boolean autoAck = false ;
channf = T sel.basicConsume(channelName, autoAck, consumer)T ) I + a b ;
//循环获取消息
whileJ 0 O X 1 Q w M(true){
//获取消息,如果没有消息,这一步将会z e ` T 2一直阻塞
Delivery delivery = consumer.nextDR + ( w & [ 3 [ +elivery() ;
String msg = new String(de[ ^ z _ Rlivery.getBody()) ;
//确认消息,已经收到
channel.basicAck(delivery.getEnvelope().getDelZ e } k E e 9iveryTag()
, false);Q A p l &
System.out.println(\"received message[\" + msg + \"] from \" + channelName);
}
}
}
注意:一旦将autoAck关闭之后,一定要记得处理完消息之后,向服务器确认消息。否则服务器将会一直转发该消息如果将上面的channel.basicAck(delivery.getEnvelope().getDeliveryTC 2 { E / i h 8ag(), fZ Z , V y { 7 balse);注释掉, Sender03.java只需要运行一次 , Recv03.java每次运行将都会收到HelloWorld消息
注意:但是这样还是不够的,如果rabbitMQ-Server突然挂掉了,那么还没有被读取的消息还是会丢失 ,所以我们可以让消息持久化。 只需要在定义Queue时,设置持久化消息就可以了,方法如下:
boolean durable = true;channe9 l J j Dl.queueDecl? L v U `are(channelName, durable, false, false, null);
这样设置之后,服务器收到消息后就会立刻将消息` f : X b写入到硬盘,就可以防止突然服务器挂掉,而引起的数据丢失了。 但是服务器如果刚收到消息,还没来得及写入到硬盘,就挂掉了,这样还是无法避免消息的丢失。
2. 公平调度
上一个例子能够实现发送一个Message与接收一个Message
从上一个Recv01中可以看出,必o # ) r L y w须处理完一个消息,才会去接收下一个消息。如果生产者众多,那么一个消费者肯定是忙不过( O X G ? | q -来的。此时就可以用多个消费者C : 9 H来对同一个Channel的消息进行处理,并且要公平的分配任务给多个消费者。不能部分很- T i W { O O i忙,部分总是空闲
实现公平调度的方式就是让每个消费者在同一时刻会分配一个任务。 通过channel.basicQos(1);可以设置
列如:
当有多个消费者同时收& % d Y取消息,且每个消费者在接收消息的同时,还要做其它的事情,且会消耗很长的时间,在此过程中可能会出现一些意外,比如消息接收到一半; V e , Y ~ :的时候,一个消费者宕掉了,这时候就要使用消息接收确认机制,可以让其它的消费者再次执行6 / Z p X l刚才M # z n B宕掉的消费者没有完成的事情。另外,在默认情况下,我们创建的消息队列以及存放在队列里面的消U 5 o息,都是非持久化的,也就是说当RabbitMQ宕掉了或者是重启了,创建的消息队列以及消息都不会保存,为了解决这种情况,保证消息传输的可靠性,我们可以使用RabbitMQ提供的消息队列的持久化机制。
生产者:
import c& ! j Q wom.rabbit6 | Lmq.client.ConneA U * @ h PctionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.cli} * kent.MessagePropertie/ : Rs;
public class ClientSend1 {
public static final String queue_name=\"my_queue\";
public static final Boolean dS k A zurable=true;
//消h n Q h 8 T (息队列持久化
public static void main(Strin# [ ]g[] args)
throws java.io.IOException{
ConnectionFactory fa- @ W B , F mctory=new ConnectionFactory();
//创建连接工厂
factory.setHost(\"localhost\");
factory.setVir_ { b h % ptualHost(\"my_mq\");
factory.setUsername(\"zhxia_ S P . * ^\");
factory.setP| G ` A C assword(5 z _ o\"123456\");
Connection connection=factory.newConnection();
//创建连接
Channel channel=connection.createChanneK * 2 ` t tl();
//创建信道
channel.queueDeclare(queue_name, durable, false, false, n d f t : n V Eull);
//声明消息队列,且为可持久化的
String message=\"Hello world\"+MaM t K h K 8 G + ith.random();
//将队列设置为持久化之后,还需要将消息也设为可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish(t 4 y r T X\"\", queue_name, MessageProperties.PERSISTEND E m * YT_TEXT_PLAIN,message.getBytes());
System.out.println(\"Send message:\"+message);
channel.close();
connection.close();
}
}
说明:行17 和行20 需要同时设置,也就是将队列设置为持久化之后,还需要将发送的消息也要设置为持久化才能保证队列和消息一直存Q s 3 @ + : V +在
消费者:
import com.rabbitmq.client.ConnectionFactory;
import com.Y [ ! } ] 3 ~ Z arabbitmq.client.ConnE F M ; u Vection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Queueinp h e I (gConsumer;
public6 p G class ClientReceive1 {
public static final Strinc M pg queue_name=\"my_queue\";
public static final Boolean autoAck=false;
public static final Boolean dx M - & - Aurable=true;
pubi P i r olic static void main(String[] ai / prgs)
throws java.io.IOException,java.lq | Q Qang.InterruptedException{
ConnectionFactory factory=new ConnectionFactory();
fa} @ O E ! = O xctorD U 1 K Vy.setHost(\"localhc K I N R o cost\");
factory.setVirtual5 w qHost(\"my_mq\");
factory.setUsername(\"zhxia\");
factory.setPassword(\"12345G p ! g H U [6\");
Connection connection=factory.neq D F 4 x w x lwConnection();
Channel channel=connectio% [ V s J % Xn.createChannel();
channel.queueDeclare(que= i Q . N vue_name, durable, false, false, null);
System.out.println(\"Wait for message\");
channel.basicQos(1);
//消息分发处理
QueueingConsumer consumer=new QueueinN | v jgConsumer(channel);
channel.b? w 2 oasicConsume(queue_name, autoAck, consumer);^ g O [ 6 U f
while(true){
Thread.sls r A ) | Eeep(500)e 9 X d {;
QueueinV . K g h j TgConsumer.Del u M ; H @livery deliver=consumer.nextDelivery();
Str/ Y z ning message=new String(del{ g ! : : @ Uiver.getBody());
System.out.println( % + . o\"Message received:\"+message);
channel.basicAck(deliver.getEnvelope()s o y r N E T F.getDeliveryTag(), false);
}
}
}
说明:行22: 设置RabbitMQ调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完并且已经对刚才处理的r z ( w消息进行确认之后, 才发送下一条消息,防止消费者太过于忙碌。如下图所示:
三、发布/订阅消息
前M S G v e A J ? o面都是一条消息只会被一个消费者处理。
如果要每个消费D _ h Z者都处理同一个消息,rabbitMq也: l I * E 3 | [提供了相应的方法。
在以前的程序中,不管是m w j p生产者端还是消费者端都必须知道一个指定的QueueName才能发送、获取消息。 而rabbitMQ消息模型的核心思想是生产者不会将消息直接发送给队列。
因为,生产者通常不会知道消息将会被哪些消费者接收。
生产者的消息虽然不是直接发送给Queue,但是消S 1 F o = a H息会交给Exch y 0 4ange,所以需要定义J 2 + g & ] nExchange的消息分发模式 ,之前的程序中,有如下一行代码:
channel.basicPublish(\"\", queueName , null , msg.getBytes());
第一个参数为空字符串,其实第一个参数就是ExchangeName,这里用空字符串,就表示消息会交给默认的Exchange。
下面我们将自M y j :己定义Exchange的属性。
package com.zf.rabbitmq04;
import java.io.IOEK 3 | j O $ nxception;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;Q , t H A (
import com.rabbitmq.client.ConnectionFactory;
/**
* 发送消息
* @author zhanghuan
*
*/
public class Sender04 {
public static vY t O # 8 -oid main(String[] args) throws IOExc| ` j 1eption {
ConnectionFa1 q F p R ( actory connFac = new ConnectionFactory() ;
//RabbitMQ-Server安装在本机,所以直接用127.0.0.1
connFac.setHost(\"127. k 2 -0.0.1\");
//创建一个连接
Connection conn = connFac.newConnection() ;
//创建一个渠道
Channel channel = conn.createChannel() ;
//定义ExchangeName,第二个参数是E_ p o x l 4 = ] Mxchange的类型,fanout表示消息将会分列发送给S ` 4 9多账户
String exchangeName = \"news\" ;
channel.exchangeDeclare(exchangeName, \"fanout\") ;
String msg = \"Hello Woro q q qld!\";
//发送& X M e消息,这里与前面的不同,这里第一个参数不再是字符串,而是ExchangeName ,第W * K D p ]二个参数也不再是queueName,而是空字符串
channel.b: K H hasicPublish( exchangeName , \"\" , n& F u Sull , msg.getBytes());
System.g i ? f j bout.println(\"send message[\" + msg + \"] to exchange \"+ exchangeName +\" success!\");
channel.close();
cl 6 ) z n U w _onn.close();
}
}
Send04.java] n e j ; d 发送消息{ 6 * 1 A ; 时没有指定的queueName 用的空字符串代替的k J 4 - |。 Exchange的类# E o P型有direct, topic, headers 、 fanout四种,上面用的是fanout类型
package com.zf.rabbitmq04;
import java.i8 w $ 2 !o.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import co( Q h a ) ? Fm.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException; n ] u ( n j x -;
/**
* 接收消息
* @author zhanghuan
*
*/
public class Recv04_01 {
public static void* . a 9 E main(String[] args) throws IOExcepti1 0 * oN e V ~ , . X cn, ShutdownSignalException, ConsU J [ # f bumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactk n k p - Sory() ;
connFac.setHost(\"127.0.0.1\");
ConnectR X I y 3ion conn = connFac.new] 0 q r ? M Q H _Connection() ;
Channel c; & J r 4 1 Y ^ 5hannel = conn.createChannel() ;
String exchangeName = \"news\" ;
channel.exchangeDeclare(exchangeName, \"fanout\") ;
//这里使用没有参数的queueDeclare方法创@ ) ] 5 s建Que8 ] m -ue- = Z并获取QueueName
String queueName = channel.queueDeclare().getQueue() ;
//将queue绑定到Exchange中
channel.queueBind( queueName, exchangeName, \"\") ;
//配置好获取消息的方式
QueueingConsumer consumer = new QueueingConsumer(chaY r % ] w -nnel) ;
channel.basicConsume(queueNameL t 0 V ( Y $, true, consumer) ;
//循环获取消息
while(t h , B W b .rue){
//获取消息,如~ q 1 = P ?果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery() ;
String msg = new StriY , B dng(delivery.getBody()) ;
System.out.prinI U Rtln^ F w 3 B & 1(\"received message[\" + msg + \"] from \" + queueName);d t j t j v
}
}
}
Recv04_01.java 使用channel.queu1 L OeDeclare()方法创建了一个Queue,该Queue有系统创建,并分配了一个随机的名称。 然后将该Queue与与Exchange绑定在一起。 该Queue就能从Exchange中后去消息了。
测试
将Recv04_01.java 文件复制几份 Recv04_02.java Recv04_03.java,然后执行Recv04_01 与 Recv04_02,接下来执行Sender04发送消息,可以看到Recv04_01 与Recv04_02都接收到了消息。然后执行Recv04_03,没有获取到任何消息。接下来再执行Senderw t { 004发送消息,可以看~ U s到Recv04_01 、Recvi K i e V H H04_02与Recv04_03都接收J B O v : i到了消息。
说明ExchZ 2 h O n :ange在收到生产者的消息后,会将消息发送给当前已经与它绑定了的所有Queue 。 然后被移除。
四、消息路由
生产者u ] f ! ; - - C 3会生产出很多消息 , 但是不同的消费者可能会有不同的需求,只需要接收指定的消息,其他的消息需要被过滤掉。 这时候就可以对消息进行过( a 2 % d 8滤了。 在消费者端设置好需要接收的消息类型。
如果不使用默认的Exchange发送消息,而是使用我们自定定义的Exchange发送消息,那么下面这个方法的第二个参数就不是QueueName了,而是消息的类型。
channel.basicPublish( exchangeName ,V L [ messageType , null , msg.V d ! P A z D b dgetBytes());
示例:
package com.zf.rabbitmq05;
import java.io.IOException;
ix - 1 ~ Fmport com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
importj a I Z ? m & U com.rabbitmq.client.ConnectionFactory;
/**
* 发送消息
* @author zhanghuan
*
*/
public class Sender05 {
pub8 j C 8lic static void main(String[] args) throws IOException {
ConnectionFactory connFac = new ConnectionFactory() ;
//RabbitMQ-Server安装在本机,所以直接用11 H 0 + B27.0.0.1
connFac.setHost(\j h D"12q Q !7.0.0.1\");
//创建一个连接
Connection conn = connFac.O f J ` 3newConnection() ;
//创建一个渠道
Channel c4 v b 8 Y ! o ~hannel = conn.createChannel() ;
String exchangeName = \"exchange02\";
String messageType = \"type01\";
cT u ~ (haG + . + # Knnel.exchangeDeclare(exchangeName, \h J 5 Q M 9 h W I"direct\") ;
//定义Queue名
Striu f q g 8 w Wng msg = \"Hellof ! 9 C h 3 I e Worl$ a W ` ) , zd!\";
//发送消息
channel.basicPubliK ] lsh( exchangeName , messageType , null , msg.getBytes());
System.out.println(\"send message[\" + msg + \"] to \"+ exc+ 3 ( uhangeName +\" success!\");
channel.close();
conn.clo b ose();
}
}
package com.zf.rabbitmq05;
import java.io.IOExce2 5 G ] n Wption;
import com.rabbitmq.client.ChanT [ { 9 4 g m mnel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.cJ n ~ S X ;lient.QueueingC? # t c 6 0onsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
* 接收消息
* @author zhanghu8 o p m , Z B # ran
*
*/
publih H : - Q _c class Recv05_01 {
public static void main(String[] args) throws IOException, ShutdownSignalExcZ [ L heption, ConsK ! / m C _ V ( umerCaC s , lncelledException, InterruptedException {
ConnectionM ; S @ . l c Factory connFac = new ConnectionFactory() ;
connFac.setHost(\"127.0.0.1\");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
String exchangeName = \"exchange02\";
channel.exchangeDeclare(exchangeName, \"^ p O n W G y x 1direct\") ;
Sk 0 w , | u .tring queueName = channel.queueDeclare().gw d | / detQueue() ;
//第三个参数就是type,这里表示只接收type01类型的消息。
channel.queueBind(queueName, exchangeName, \x G o $"type01\") ;
//也可以选择接收多种类型的消息。只需要再下面再绑定一次就可以了
channel.queueBind( L c m k ! T L f(quG 6 2 YeueName, exchangeName, \"tU ? q } i &ype02\") ;
//配置好获取消息的方式
Queuei4 @ XngConsumer consumer = new Queu# k 4 ^eingCo^ Y ! I ansumer(channelR H q @ Y U - o) ;
channel.basicConsume(queueName, true, consumer) ;
//循环获取消息G k K # A k t /
while(true){
//获取消息,如果没有消息,这一% 1 C步将会一直阻塞
Delivery delivery = consum? } , ! L ) H aer.B $ { s ] ^nextDelivery() ;
String msg =X u q * & newS q q ? V E . C h String(delivery.getBody()) ;
System.out.prinH e qtln(\"received message[\" + msg + \"] from \" + exchangeName);
}
}
}
这时,启动Recv05_01.java 然后启动Sender05.java ,消费者端就会收到消息。然后将Sender05.java 中的messageQ S 2 7 c 0 ` Type分别改为type02 typ: Z 0 } W 4 D Ie03 然后发送消息 , 可以看到消费者端能接收到} o Z e q Gtype02的消息,但是k 7 i不能接收到type03的消息。
五、Topic类型消息
上一节中使用了消息路由,消费者可以选择性的接收消息。 但是这样还是不够灵活。
比如, Z某个消费者要订阅娱乐新闻消息 。 包括新浪、网易、腾讯的娱乐新闻。那么消费者就需要绑定三次,分别绑定这三个网站的消息类型。 如果新闻门户更多了,那么消费者将要绑定个更多的消息类型, 其实消费者只是需要订阅娱乐新闻,不管是哪个网站的新闻,都需要。 那么在rabbitMQ中可以使用topic类型。 模糊匹配消息类型3 c L。
模糊匹配中的 *O ] e M代表一个 #代表零个或1个
示例:
package com.zf.rabbitmq06;
impo I ) h Xrt java.io.I0 t ` ~ UOException;
import com.rabbitmq.client.Channe/ N J B v ` 2 Zl;
im 4 M tport com.rao a o U 2 H tb. ) h ) Mbitmq.client.Connection;
import cD r V nom.rabbitmq.client.ConnectionFactoryc G E I W q y S;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.Queueing( K y v M z 3 | ~Consx v s Oumer.Delivery;
impo 7 Hort com.rabbitmq.client.ShutdownSignalException;
/**
* 接收消息
* @author zhanghuan
*
*/
public class Recv06_01 {
publS k x 9 D Qic sta& u o i ! A }tic void main(String[] args) thg = N @r: E , uows IOException, ShutdownSignalException, ConsumerCancelledExcep0 4 m a vtion, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost(\"127.0.0.1\");
Connection conn = connFac.newConnectx = L A =ion() ;
Channel channel = conn.crF . v s x FeateChannel() ;
String exchangeName = \"exchange03\";
channel.exch5 r m ; 8 +angeDeclare(excho 3 h | , W VangeName, \"topic\") ;
St- N X i A :ring queueName = channel.queueDeclare().getQueue() ;
//第三个参数就是type,这里表示只接收t 7 } 1 Qype01类型的消息。
channel.queueBind(queueName, exchangeName, \"#.type01\") ;
//配置好获取消息的方式
QueueZ { f 9 } o Iin| } YgC5 % yonsumer consumer = new QueueingConsumer(channel) ;
channef y u + a Zl.basicConsuN E s : q 5me(queueName, true, consumer) ;
//循环获取消息A g g C i U = - 8
while(true){
//获取消息,如果没有消息w ] i 6 a F,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery() ;
String msg = new String(deli} w : z p # ! ( ;v) 0 v I M Iery.getBody()) ;
System.out.println(\"received message[\" + msg + \"] from \" + exchangeName)6 A , J;
}
}
}
package com.zf. A S ~ F [ 8.rabbitmq06;
import java.io.IOExcepti) i R y oonZ g t W o 5;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connj x Fection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 发送消息
* @author zhanghuan *
*/R C 6 ^ / J ` R Q
public claf E x ^ ]ss Sender06 {
public static void main(String[] args) throZ 9 E bws IOException {
ConnectionFactory connFac = new ConnectionFactory() ;
//RabbitMQ-Server安装在本机,所以直接用12d x l7.0.0.1Z S / = = 2
connFac.setHost(\"127.0.0.1\");
//创建一个连接
ConnectD e 6 i 8 4 h @ion conn = connFac.newConnection() ;
/g Z B/创建一个渠道
Channel channel = conn.createChL # % g $ Z @ Wannel() ;
Strip c ^ng9 & ] { s exchangeName = \"exchange03\";
String messageType = \V * m * p T : k {"fs.type01\";
channel.exchangeDecY c + - V { 8lare(exchangeName, \"topic\") ;
//定义Queue名
String msg = \"HellI Q A xo World!\";
//发送消息l 5 f ~
channel.basicPublish( exchangeName , messageType , null , msg.getBytes());
System.out.println(\"= 8 = # & rsend message[\" + msg9 d q x ! 3 * T s + \"] to \"+ exchangeName +\" success!\");
channel.F P T g g Uclose()1 ( V 6;
conn.close();
}
}
使用topic之后 。不管S[ & u ( = }ender端发送的消息类型是fs.type01 还是 xx.typC 7 & f k 0 8e01 还是 type01 ,消费者都会收到消息
六、RPC 远程过程调用
当客户端想要调用服务器的某个方法来完成某项功能时,就可以使用rabbitMQ支持的PRC服务。
其实RPC服务与普通的收发消息的区别不大, RPCR X V J l !的过程其实就是客户端向服务端定义好a J N } r的Queue发送消息,其中携带的消息就~ W , p G &应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queug @ z ne。
示例:
package com.zf.rabbitmq07;
import java.io.IOExcepti{ | |on;
import com.rabbitmq.client.AMQP.BasicProperties;
importf ~ $ com.rabbitmq.client.CW C jhannel;
import com.rab1 u F - } [bitmq9 M r 3 W e -.client.Connectionp ] h;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledE~ - ! ^ S / q txce0 j & + n % !ption;
import com.rabbitmq.client.QueueingConsumer;
impoH ] U Trt c@ P 1om.rabbitmq.client.QueueingConsume) [ e V u G 4 X +r.Delivery;# E 3 ! N c
i6 - 3 ? Omport com.rabbitmq.client.ShutdownSignalException;
public class RPC` U -Server {
public stat; & ( 5 V + ( =ic final1 g L M 3 String RPC_QUEUE_NAME = \"rpc_queue\";
public static StriW V qng sayHello(String name)/ M T I {
return \"hello \" + name ;
}
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, Int8 C v i p @ f f UerruptedExceptionH p , {
ConnectionFactory connFac = new ConnectionFactory(N n S } I l k p) ;
co- 3 ! * $nnFac.set! + WHost(\"localhost\");
Connection conn = connFac.newConnecti6 1 B ` P W 5on() ;
Channel chann* J & @ A &el = conn.createChannel() ;
channel.queueDeclare(RPC_QUEUE_NAME, fals! w r Ge, false, false, null) ;
QueueingConsumer consumer = new QueueingCo4 U , ^ Knsumer(channo A e 3 0 T ] pel);
channel.basicConsume(RPC_QUEUE_NAME, f# 9 $ Malse , consumer) ;
while(true){
System.out.priE ) ! : + 6 2 | ~ntln(\"服务端等待接收消息..| N /\");
Delivery deliver = consumer.nextDelivN w s # I d 3ery() ;
System.out.println(\"服务端成功收到消息..\");
BasicProperties props = deliver.getProperties() ;
String message = new String(deli$ 2 K s Y iver.getBody()f z d A , e 9 . S , \"UTF-8\J ; w ] F .") ;
String responseMessage = sayHello(message) ;
BasicProperties responseProps = new BasicProperties.Builder()
.correln 5 J 4 ^ T , :a} 8 ( D ! ` r ZtionId(propR $ T B G y ; Ys.getCorrelationId())
.build() ;
//将结果返回到客户端QueuE n O W 2 ~ We
channel.basicPublish(\"\", prop1 W ; vs.getReplyTo() ,| P 7 responseProps , responseMessa+ 3 S ? g 2 j % Zge.getBytes r @ V K N - }s(\"UTF-8\")W ^ | t J B ; h ) ;
//向客户端确认消息[ S 2
channel.basicAck(deliver.getEnv: q L M l i _ 7 velo6 R $ R z ? [pe().getDeliveryTag(), false);
Systek I } j $ xm.out.println(\"- u 3 5服务端返回消息完P E f I x f ? 2成..\");
}
}
}
package com.zf.rabbitmq07;
import java.io.IOException;
import java.util.UUID;
import com.rabbitmq.client.CH 3 / # . K Q ; [hannel;
import com.rabbitmq.client.Conne} Q u D p F + Sction;
import com.rabbitmq.client.R t ) 3 C M E 5ConnectionFactory;
import com.rabbitmq.client.ConsumerCancell8 e ` 2 i % o oedException;
import com.rabbitmq.client.QueueingConsumer;
importw * 4 r com.raA V ~bbitmq.client.AMQP.Basig | /cProperties;
import com.rabbitmq.client.` i k y - jQuec U N : + L / /ueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
public class RPCClient {
public static finaQ k 8l String RPC_QUEUE_NAME = \"rpc_queue\";
public static void main(String[] args) throws IOException, ShutdownSz b H j ^ K ignalExcepti- q _ N N - z Jon, ConsumerCancelledException, Inte/ G b L Errupted6 _ fException {
ConnectionFaS V e s Sctory connFac = new ConnectionFactory() ;
connFac.setHost(\"local: 1 [host\")[ D V;
Connection conn = connFac.newConney N j ( d : pction() ;
Channel channel = conn.createChannel() ;
//响应QueueName ,服务端将会把要返3 % ) S X v H p回的信息发送到该Queue
String responseQueue = channel.queueDeclare().getQueue() ;
String correlationId = UUID.randomUUID().toString() ;
BasicProperties props = new BasicProperties.Buil4 m z Gder()
.replyTo(respo* i }nseQueue)
.correlationId(corr/ 9 h C N , ; 3elationId)
.build();
String message = \"is_zhoufeng\";
channZ [ ; M 8 9 x z Tel.basicPublish( \"Q T [ O s n\" , RPC_QUEUE_NAME , props , message.getBytes(\N H & r E"UTF-8\"));
QueueingConsuY 3 hmer consumer = new QueueingCd / D # ] s A Honsumer(channel) ;
channel.basicConsume( respB c AonseQueue , consumer) ;
while& # ^ 7 L / ] I(true){
Delivery delivery = consumer.nextDeg r h Klivery() ;
if(delivery.getProperties().getCorrelationId().equals(correlationId)){F U _ f 4 #
String resu% d ~ v _ 2 * r Xlt = new String(deliveryE [ & [ h 0.V O m ^ o ^getBody()) ;
System.out.println(resul1 Z yt);
}
}
}B c ^ , .
}
写在最后:
- 针对于Java程序员,笔者最近整理了一些面试真题,思维导图,程序人生等PDF学习资料;
- 关注私信我\"86\",即可获取!
- 希望读到这的z t 0 6 1 C I h P您能点个小赞和关注下G m ( u ,我,以后还会! ( T I G h }更新技术干货,谢谢您的支持!