博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
轻松搞定RabbitMQ(六)——主题
阅读量:5873 次
发布时间:2019-06-19

本文共 5722 字,大约阅读时间需要 19 分钟。

hot3.png

 轻松搞定RabbitMQ(六)——主题

在上一篇博文中,我们进一步改良了日志系统。使用Direct类型的转换器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,只能够无脑的转发

虽然使用Direct类型的转换器改进了日志系统。但它仍然有一定的局限性——不能根据多重条件进行路由选择。

在我们的日志系统中,我们可能不仅仅根据日志严重性订阅日志,也想根据发送源订阅。你可能从unix工具syslog了解过这个概念,它可以根据严重性(info/warning/crit…)和设备(auth/cron/kern…)转发日志。

这将给我们更多的灵活性——我们可以订阅来自元‘cron’的致命错误日志,同时也可以订阅‘kern’的所有日志。

为了在我们的日志系统中实现上述需求,我们需要了解更复杂的主题类型的转发器——Topic Exchange。

Topic exchange(主题转发器)

 发送给主题转发器的消息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符。这些标识符可以是随意,但是通常跟消息的某些特性相关联。一些合法的路由选择键比如“socket.usd.nyse”,"nyse.vmw","quick.orange.rabbit",你愿意用多少单词都可以,只要不超过上限的255个字节。

  绑定键也必须以相同的格式。主题转发器的逻辑类似于direct类型的转发器。消息通过一个特定的路由键发送到所有与绑定键匹配的队列中。需要注意的是,关于绑定键有两种特殊的情况:*(星号)可以代替任意一个标识符 ;#(井号)可以代替零个或多个标识符。

举一个简单例子,如下图:

SouthEast

 在上图例子中,我们发送描述动物的消息。消息会转发给包含3个单词(2个小数点)的路由键绑定的队列中。绑定键中的第一个单词描述的是速度,第二个是颜色,第三个是物种:“速度.颜色.物种”。

 我们创建3个绑定:Q1绑定键是“*.orange.*”,Q2绑定键是“*.*.rabbit”,Q3绑定键是“lazy.#”。这些绑定可以概括为:Q1只对橙色的动物感兴趣。Q2则是关注兔子和所有懒的动物。

 路由键为“quick.orange.rabbit”的消息会被路由到2个队列中去。而“lazy.orange.elephant”的消息同样会发往2个队列。另外“quick.orange.fox” 仅仅发往第一个队列,而"lazy.brown.fox"则只发往第二个队列。“quick.brown.fox”则所有的绑定键都不匹配而被丢弃。

如果我们违反约定,发送了只带1个或者4个标识符的选择键,像“orange”或者“quick.orange.male.rabbit”,会发生什么呢?这些消息都不匹配任何绑定,所以将被丢弃。

另外,“lazy.orange.male.rabbit”,尽管有4个标识符,但是仍然匹配最后一个绑定键,所以会发送到第二个队列中。

注:主题类型的转发器非常强大,可以实现其他类型的转发器。当队列绑定#绑定键,可以接受任何消息,类似于fanout转发器。当特殊字符*和#不包含在绑定键中,这个主题转发器就像一个direct类型的转发器。

完整实例

我们将主题类型的转发器应用到日志系统中,路由格式为:“<设备>.<严重级别>”。

发送端(EmitLogTopic.java)

public class ReceiveLogsOther {    private static final String EXCHANGE_NAME = "topic_logs";    public static void main(String[] args) throws Exception{        ConnectionFactory factory = new ConnectionFactory();        //RabbitMQ所在主机ip或者主机名        factory.setHost("192.168.20.87");        factory.setPort(5672);        factory.setUsername("admin");        factory.setPassword("admin");        // 打开连接和创建频道,与发送端一样        Connection connection = factory.newConnection();        final Channel channel = connection.createChannel();        //声明exchange        channel.exchangeDeclare(EXCHANGE_NAME, "topic");        // 声明一个随机临时队列        String queueName = channel.queueDeclare().getQueue();        channel.queueBind(queueName, EXCHANGE_NAME, "");        String severity="kern.error";//只关注核心错误级别的日志,然后记录到文件中去。        channel.queueBind(queueName, EXCHANGE_NAME, severity);        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");        final Consumer consumer = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println(message);            }        };        channel.basicConsume(queueName,true,consumer);    }}

消费者1(ReceiveLogs2Console.java)

public class ReceiveLogs2Console {    private static final String EXCHANGE_NAME = "topic_logs";    public static void main(String[] argv) throws Exception{        ConnectionFactory factory = new ConnectionFactory();        //RabbitMQ所在主机ip或者主机名        factory.setHost("192.168.20.87");        factory.setPort(5672);        factory.setUsername("admin");        factory.setPassword("admin");        // 打开连接和创建频道,与发送端一样        Connection connection = factory.newConnection();        final Channel channel = connection.createChannel();        //声明exchange        channel.exchangeDeclare(EXCHANGE_NAME, "topic");        // 声明一个随机临时队列        String queueName = channel.queueDeclare().getQueue();        String[] routingKeys ={"auth.*","*.info","#.warning"};//关注所有的授权日志、所有info和waring级别的日志        for (String routingKey : routingKeys) {            //关注所有级别的日志(多重绑定)            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);        }        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");        // 创建队列消费者        final Consumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println(" [x] Received ["  + envelope.getRoutingKey() + "] :'" + message + "'");            }        };        channel.basicConsume(queueName, true, consumer);    }}

消费者1(ReceiveLogsOther.java)

public class ReceiveLogsOther {    private static final String EXCHANGE_NAME = "topic_logs";    public static void main(String[] args) throws Exception{        ConnectionFactory factory = new ConnectionFactory();        //RabbitMQ所在主机ip或者主机名        factory.setHost("192.168.20.87");        factory.setPort(5672);        factory.setUsername("admin");        factory.setPassword("admin");        // 打开连接和创建频道,与发送端一样        Connection connection = factory.newConnection();        final Channel channel = connection.createChannel();        //声明exchange        channel.exchangeDeclare(EXCHANGE_NAME, "topic");        // 声明一个随机临时队列        String queueName = channel.queueDeclare().getQueue();        channel.queueBind(queueName, EXCHANGE_NAME, "");        String severity="kern.error";//只关注核心错误级别的日志,然后记录到文件中去。        channel.queueBind(queueName, EXCHANGE_NAME, severity);        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");        final Consumer consumer = new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println(message);            }        };        channel.basicConsume(queueName,true,consumer);    }}

 

 

 

 

 

转载于:https://my.oschina.net/LucasZhu/blog/1539430

你可能感兴趣的文章
hostPath Volume - 每天5分钟玩转 Docker 容器技术(148)
查看>>
php数组时按值传递还是按地址传递
查看>>
js向一个数组中插入元素的几个方法-性能比较
查看>>
Tensorflow代码解析(一)
查看>>
容器相关源加速以及k8s官方资源镜像下载
查看>>
Zombie进程
查看>>
asp.netmvc 三层搭建一个完整的项目
查看>>
sql server 生成随机数 rand函数
查看>>
CSS魔法堂:Transition就这么好玩
查看>>
【OpenStack】network相关知识学习
查看>>
centos 7下独立的python 2.7环境安装
查看>>
[日常] 算法-单链表的创建
查看>>
用ASP.NET Core 2.1 建立规范的 REST API -- 保护API和其它
查看>>
前端工程化系列[01]-Bower包管理工具的使用
查看>>
使用 maven 自动将源码打包并发布
查看>>
Redis 的源码分析
查看>>
ES6 对象的扩展
查看>>
Spark:求出分组内的TopN
查看>>
Python爬取豆瓣《复仇者联盟3》评论并生成乖萌的格鲁特
查看>>
关于跨DB增量(增、改)同步两张表的数据小技巧
查看>>