正文
上一篇我们讲了关于direct类型的Exchange,这一片我们来了解一下fanout类型的Exchange。
1.Exchange的fanout类型
fanout类型的Exchange的特点是会把消息发送给与之绑定的所有Queue中,我们来测试一下。代码如下
using RabbitMQ.Client;using System;using System.Text;using System.Threading;using System.Threading.Tasks;namespace RabbitMQConsole
{ class Program
{ static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "39.**.**.**";
factory.Port = 5672;
factory.VirtualHost = "/";
factory.UserName = "root";
factory.Password = "root"; var exchange = "change3"; var route = "route2"; var queue3 = "queue3"; var queue4 = "queue4"; var queue5 = "queue5"; using (var connection = factory.CreateConnection())
{ using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);
channel.QueueDeclare(queue3, durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(queue3, exchange, queue3);
channel.QueueDeclare(queue4, durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(queue4, exchange, queue4);
channel.QueueDeclare(queue5, durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(queue5, exchange, queue5);
var props = channel.CreateBasicProperties();
props.Persistent = true;
channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
}
}
}
}
}运行代码,去可视化工具中查看一下
消费其中的一个
using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text;using System.Threading;namespace RabbitMQClient
{ class Program
{ private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
{
HostName = "39.**.**.**",
Port = 5672,
UserName = "root",
Password = "root",
VirtualHost = "/"
}; static void Main(string[] args)
{ var exchange = "change3"; var route = "route2"; var queue = "queue3"; using (IConnection conn = rabbitMqFactory.CreateConnection()) using (IModel channel = conn.CreateModel())
{
channel.ExchangeDeclare(exchange, "fanout", durable: true, autoDelete: false);
channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(queue, exchange, route);
channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Byte[] body = ea.Body;
String message = Encoding.UTF8.GetString(body);
Console.WriteLine( message+Thread.CurrentThread.ManagedThreadId);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}
}结果如下
大家可以依次消费其他两个Queue,这里就不演示了
2.消息的过期时间
我们在发送一些消息的时候,有时希望给消息设置一下过期时间,我们可以通过两种方式来设置
2.1设置队列的过期时间
using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;using System.Threading;using System.Threading.Tasks;namespace RabbitMQConsole
{ class Program
{ static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "39.**.**.**";
factory.Port = 5672;
factory.VirtualHost = "/";
factory.UserName = "root";
factory.Password = "root"; var exchange = "change4"; var route = "route2"; var queue7 = "queue7"; using (var connection = factory.CreateConnection())
{ using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);
//队列过期时间,单位毫秒
channel.QueueDeclare(queue7, durable: true, exclusive: false, autoDelete: false,arguments:new Dictionary<string, object> { { "x-message-ttl", 8000 } });
channel.QueueBind(queue7, exchange, queue7); var props = channel.CreateBasicProperties();
props.Persistent = true;
channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
}
}
}
}
}这样过8秒去Queue就看不到该消息了
2.2设置message的过期时间
using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;using System.Threading;using System.Threading.Tasks;namespace RabbitMQConsole
{ class Program
{ static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "39.**.**.**";
factory.Port = 5672;
factory.VirtualHost = "/";
factory.UserName = "root";
factory.Password = "root"; var exchange = "change4"; var route = "route2"; var queue7 = "queue7"; using (var connection = factory.CreateConnection())
{ using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);
channel.QueueDeclare(queue7, durable: true, exclusive: false, autoDelete: false,arguments:new Dictionary<string, object> { { "x-message-ttl", 8000 } });
channel.QueueBind(queue7, exchange, queue7); var props = channel.CreateBasicProperties();
//message过期时间,单位毫秒
props.Expiration = "30000";
props.Persistent = true;
channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
}
}
}
}
}我们发现还是8秒就过期了,说明如果同时设置了队列与消息的过期时间,则按照队列的时间过期。我们把队列的过期时间去掉重新试一下。
using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;using System.Threading;using System.Threading.Tasks;namespace RabbitMQConsole
{ class Program
{ static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "39.**.**.**";
factory.Port = 5672;
factory.VirtualHost = "/";
factory.UserName = "root";
factory.Password = "root"; var exchange = "change4"; var route = "route2"; var queue7 = "queue7"; using (var connection = factory.CreateConnection())
{ using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);
channel.QueueDeclare(queue7, durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(queue7, exchange, queue7); var props = channel.CreateBasicProperties();
props.Expiration = "30000";
props.Persistent = true;
channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
}
}
}
}
}3.队列生存时间
我们还可以设置一个队列的生存时间
using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;using System.Threading;using System.Threading.Tasks;namespace RabbitMQConsole
{ class Program
{ static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "39.**.**.**";
factory.Port = 5672;
factory.VirtualHost = "/";
factory.UserName = "root";
factory.Password = "root"; var exchange = "change4"; var route = "route2"; var queue8 = "queue8"; using (var connection = factory.CreateConnection())
{ using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);
channel.QueueDeclare(queue8, durable: true, exclusive: false, autoDelete: false,arguments: new Dictionary<string, object> {
{ "x-expires",10000} //设置当前队列的过期时间为10000毫秒 });
channel.QueueBind(queue8, exchange, queue8); var props = channel.CreateBasicProperties();
props.Persistent = true;
channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
}
}
}
}
}这样10秒后队列就消失了
點擊查看更多內容
為 TA 點贊
評論
評論
共同學習,寫下你的評論
評論加載中...
作者其他優質文章
正在加載中
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦


