上一篇文章中,簡單的介紹了一下RabbitMQ的work模型。這篇文章來學(xué)習(xí)一下RabbitMQ中的發(fā)布訂閱模型。
發(fā)布訂閱模型(Publish/Subscribe):簡單的說就是隊列里面的消息會被多個消費者同時接受到,消費者接收到的信息一致。
發(fā)布訂閱模型適合于做模塊之間的異步通信。

img
適用場景
- 發(fā)送并記錄日志信息
- springcloud的config組件里面通知配置自動更新
- 緩存同步
- 微信訂閱號
演示
生產(chǎn)者
public class Producer {
private static final String EXCHANGE_NAME = "exchange_publish_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 發(fā)送消息到交換機
for (int i = 0; i < 100; i++) {
channel.basicPublish(EXCHANGE_NAME, "", null, ("發(fā)布訂閱模型的第 " + i + " 條消息").getBytes());
}
// 關(guān)閉資源
channel.close();
connection.close();
}
}
消費者
// 消費者1
public class Consumer {
private static final String QUEUE_NAME = "queue_publish_1";
private static final String EXCHANGE_NAME = "exchange_publish_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 將隊列綁定到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("隊列1接收到的消息是:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
// 消費者2
public class Consumer2 {
private static final String QUEUE_NAME = "queue_publish_2";
private static final String EXCHANGE_NAME = "exchange_publish_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 將隊列綁定到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("隊列2接收到的消息是:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
測試
先啟動2個消費者,再啟動生產(chǎn)者


可以看出來消費者1和消費者2接收到的消息是一模一樣的 ,每個消費者都收到了生產(chǎn)者發(fā)送的消息;
發(fā)布訂閱模型,用到了一個新的東西-交換機,這里也解釋一下相關(guān)方法的參數(shù):
// 聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 該方法的最多參數(shù)的重載方法是:
Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map< String, Object > arguments) throws IOException;
/**
* param1:exchange,交換機名稱
* param2:type,交換機類型;直接寫 string效果一致;內(nèi)置了4種交換機類型:
* direct(路由模式)、fanout(發(fā)布訂閱模式)、
* topic(topic模式-模糊匹配)、headers(標(biāo)頭交換,由Headers的參數(shù)分配,不常用)
* param3:durable,是否持久化交換機 false:默認(rèn)值,不持久化
* param4:autoDelete,沒有消費者使用時,是否自動刪除交換機 false:默認(rèn)值,不刪除
* param5:internal,是否內(nèi)置,如果設(shè)置 為true,則表示是內(nèi)置的交換器, 客戶端程序無法直接發(fā)送消息到這個交換器中, 只能通過交換器路由到交換器的方式 false:默認(rèn)值,允許外部直接訪問
* param6:arguments,交換機的一些其他屬性,默認(rèn)值為 null
*/
// 將隊列綁定到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
/**
* param1:destination,目的地,隊列的名字
* param2:source,資源,交換機的名字
* param3:routingKey,路由鍵(目前沒有用到routingKey,填 "" 即可)
*/
小結(jié)
本文到這里就結(jié)束了,介紹了RabbitMQ通信模型中的發(fā)布訂閱模型,適合于做模塊之間的異步通信。
聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。
舉報投訴
-
交換機
+關(guān)注
關(guān)注
23文章
2904瀏覽量
104462 -
緩存
+關(guān)注
關(guān)注
1文章
248瀏覽量
27760 -
模型
+關(guān)注
關(guān)注
1文章
3751瀏覽量
52099 -
springcloud
+關(guān)注
關(guān)注
0文章
17瀏覽量
1669 -
rabbitmq
+關(guān)注
關(guān)注
0文章
20瀏覽量
1278
發(fā)布評論請先 登錄
相關(guān)推薦
熱點推薦
RabbitMQ通信模型中的work模型
上一篇文章中,簡單的介紹了一下RabbitMQ,以及安裝和hello world。 有的小伙伴留言說看不懂其中的方法參數(shù),這里先解釋一下幾個基本的方法參數(shù)。 // 聲明隊列方法
RabbitMQ是什么
在工作中經(jīng)常會用到消息隊列處理各種問題,今天指北君帶領(lǐng)大家來學(xué)一個很常用到的技術(shù)-RabbitMQ;接下來還會有關(guān)于RabbitMQ的系列教程,對你有幫助的話記得關(guān)注哦~ RabbitMQ
MQTT協(xié)議介紹之一:發(fā)布/訂閱
,MQTT被正式批準(zhǔn)為OASIS標(biāo)準(zhǔn)。 MQTT 3.1.1現(xiàn)在是該協(xié)議的最新版本。發(fā)布/訂閱模式發(fā)布/訂閱模式(pub / sub)是傳統(tǒng)客戶端 - 服務(wù)器
發(fā)表于 08-25 19:58
NodeMCU實現(xiàn)訂閱和發(fā)布主題
NodeMCU實現(xiàn)訂閱和發(fā)布主題。1、要點掃盲1.1 MQTT《MQTT協(xié)議--MQTT協(xié)議簡介及原理》《MQTT協(xié)議--MQTT協(xié)議解析》1.2 OneNET《NodeMCU學(xué)習(xí)(十)--發(fā)送數(shù)據(jù)
發(fā)表于 11-01 08:37
請問esp32c3,ble mesh怎么向訂閱的分組發(fā)布消息?
發(fā)布消息,為什么vnd_models模型不可以.有沒有更加簡單的api,直接傳訂閱分組地址就可以發(fā)布消息的?
發(fā)表于 02-13 06:47
請問esp32c3 ble mesh怎么向訂閱的分組發(fā)布消息?
發(fā)布消息,為什么vnd_models模型不可以.有沒有更加簡單的api,直接傳訂閱分組地址就可以發(fā)布消息的?
發(fā)表于 03-06 08:36
基于SOA的發(fā)布/訂閱系統(tǒng)設(shè)計
企業(yè)電子商務(wù)的迅猛發(fā)展已經(jīng)改變了分布式系統(tǒng)的規(guī)模,傳統(tǒng)的基于請求/應(yīng)答的點對
點、同步通信已不能滿足大規(guī)模動態(tài)分布式應(yīng)用環(huán)境?;赟OA 的發(fā)布/訂閱系統(tǒng)模型
發(fā)表于 07-08 08:42
?21次下載
rabbitmq是什么?rabbitmq安裝、原理、部署
rabbitmq是什么? MQ的全稱是Messagee Queue,因為消息的隊列是隊列,所以遵循FIFO 先進(jìn)先出的原則是上下游傳遞信息的跨過程通信機制。 RabbitMQ是一套開源(MPL
RocketMQ和RabbitMQ的區(qū)別
RocketMQ和RabbitMQ的區(qū)別: 架構(gòu)設(shè)計:RocketMQ是基于主題(Topic)的發(fā)布/訂閱模式,而RabbitMQ則是基于隊列(Queue)的消息代理系統(tǒng)。 語言支持
RabbitMQ中的路由模型(direct)
路由模型 RabbitMQ 提供了五種不同的通信模型,上一篇文章中,簡單的介紹了一下RabbitMQ的發(fā)
redis和rabbitMQ的區(qū)別
Redis和RabbitMQ之間的區(qū)別。 架構(gòu)設(shè)計: Redis是一個內(nèi)存存儲系統(tǒng),它將數(shù)據(jù)存儲在內(nèi)存中,以提供快速的讀寫訪問。因此,Redis的存儲能力受到內(nèi)存大小的限制。它使用發(fā)布/訂閱
如何在MQTT中發(fā)布和訂閱實體
在MQTT中發(fā)布和訂閱實體(主題)是MQTT通信的核心操作,下面將詳細(xì)介紹其原理、步驟以及示例代碼,幫助你全面理解這一過程。 一、MQTT發(fā)布與訂閱的基本概念
RabbitMQ中的發(fā)布訂閱模型
評論