博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于Netty与RabbitMQ的消息服务
阅读量:5925 次
发布时间:2019-06-19

本文共 4750 字,大约阅读时间需要 15 分钟。

Netty作为一个高性能的异步网络开发框架,可以作为各种服务的开发框架。

前段时间的一个项目涉及到硬件设备实时数据的采集,采用Netty作为采集服务的实现框架,同时使用RabbitMQ作为采集服务和各个其他模块的通信消息队列,整个服务框架图如下:

将业务代码和实际协议解析部分的代码抽离,得到以上一个简单的设计图,代码开源在GitHub上,简单介绍下NettyMQServer采集服务涉及到的几个关键技术点:

1、设备TCP消息解析:

NettyMQServer和采集设备Device之间采用TCP通信,TCP消息的解析可以使用LengthFieldBasedFrameDecoder(消息头和消息体),可以有效的解决TCP消息“粘包”问题。

消息包解析图如下:

lengthFieldOffset   =  0 lengthFieldLength   =  2 lengthAdjustment    = -2 (= the length of the Length field) initialBytesToStrip =  0 BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes) +--------+----------------+      +--------+----------------+ | Length | Actual Content |----->| Length | Actual Content | | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" | +--------+----------------+      +--------+----------------+

代码中消息长度的存储采用了4个字节,采用LengthFieldBasedFrameDecoder(65535,0,4,-4,0)解码,Netty会从接收的数据中头4个字节中得到消息的长度,进而得到一个TCP消息包。

2、给设备发消息:

首先在连接创建时,要保留TCP的连接:

static final ChannelGroup channels = new DefaultChannelGroup(            GlobalEventExecutor.INSTANCE);    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        // A closed channel will be removed from ChannelGroup automatically        channels.add(ctx.channel());    }

在每次一个Channel Active(连接创建)的时候用ChannelGroup保存这个Channel连接,当需要给某个设备发消息的时候,可以遍历该ChannelGroup,找到对应的Channel,给该Channel发送消息:

for (io.netty.channel.Channel c : EchoServerHandler.channels) {                            ByteBuf msg = Unpooled.copiedBuffer(message.getBytes());                            c.writeAndFlush(msg);                        }

这里是给所有的连接的设备都发。当连接断开的时候,ChannelGroup会自动remove掉这个连接,不需要我们手动管理。

3、心跳检测

当某个设备Device由于断电或是其他原因导致设备不正常无法采集数据,Netty服务端需要知道该设备是否在正常工作,可以使用Netty的IdleStateHandler,示例代码如下:

// 3 minutes for read idlech.pipeline().addLast(new IdleStateHandler(3*60,0,0));ch.pipeline().addLast(new HeartBeatHandler());/** * Handler implementation for heart beating. */public class HeartBeatHandler extends ChannelInboundHandlerAdapter{    @Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt)            throws Exception {        if (evt instanceof IdleStateEvent) {            IdleStateEvent event = (IdleStateEvent) evt;            if (event.state() == IdleState.READER_IDLE) {                // Read timeout                System.out.println("READER_IDLE: read timeout from "+ctx.channel().remoteAddress());                //ctx.disconnect(); //Channel disconnect            }        }    }}

上面设置3分钟没有读到数据,则触发一个READER_IDLE事件。

4、RabbitMQ消息接收与发送

NettyMQServer消息发送采用了Spring AMQP,只需要在配置文件中简单配置一下,就可以方便使用。

NettyMQServer消息接收同样可以采用Spring AMQP,但由于对Spring相关的配置不是很熟悉,为了更灵活的使用MQ,这里使用了RabbitMQ Client Java API来实现:

Connection connection = connnectionFactory.newConnection();                    Channel channel = connection.createChannel();                    channel.exchangeDeclare(exchangeName, "direct", true, false, null);                    channel.queueDeclare(queueName, true, false, false, null);                    channel.queueBind(queueName, exchangeName, routeKey);                    // process the message one by one                    channel.basicQos(1);                    QueueingConsumer queueingConsumer = new QueueingConsumer(channel);                    // auto-ack is false                    channel.basicConsume(queueName, false, queueingConsumer);                    while (true) {                        QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();                        String message = new String(delivery.getBody());                        log.debug("Mq Receiver get message");                        // Send the message to all connected clients                        // If you want to send to a specified client, just add                        // your own logic and ack manually                        // Be aware that ChannelGroup is thread safe                        log.info(String.format("Conneted client number: %d",EchoServerHandler.channels.size()));                        for (io.netty.channel.Channel c : EchoServerHandler.channels) {                            ByteBuf msg = Unpooled.copiedBuffer(message.getBytes());                            c.writeAndFlush(msg);                        }                        // manually ack to MQ server the message is consumed.                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

以上代码从一个Queue中读取数据,为了有效处理数据,防止异常数据丢失,使用了手动Ack。

RabbitMQ的使用方式:

 

代码托管在GitHub上:

 

参考:

http://netty.io/

http://netty.io/4.0/api/io/netty/handler/codec/LengthFieldBasedFrameDecoder.html

http://netty.io/4.0/api/io/netty/handler/timeout/IdleStateHandler.html

 

作者:
出处:
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
 
 

转载于:https://www.cnblogs.com/qxoffice2008/p/4257428.html

你可能感兴趣的文章
深入理解Three.js(WebGL)贴图(纹理映射)和UV映射
查看>>
各种音视频编解码学习详解 h264 ,mpeg4 ,aac 等所有音视频格式
查看>>
MongoDB
查看>>
【ACR2015】依那西普按需维持治疗策略有效抑制RA骨破坏进展
查看>>
MVC捕获数据保存时的具体字段验证错误代码
查看>>
XHTML教会我的一些东西-1
查看>>
TypeError: 'MongoClient' object is not callable
查看>>
command not found Operation not permitted
查看>>
计算几何题集
查看>>
《c程序设计语言》-3.2 字符串转换
查看>>
MySQL 删除数据的最好的方式
查看>>
web 后台返回json格式数据的方式(status 406)
查看>>
C++ variable_template
查看>>
三种方式使得iOS应用能够在后台进行数据更新和下载
查看>>
并发编程总结4-JUC-REENTRANTLOCK-2(公平锁)
查看>>
去创业公司不能有一夜暴富的侥幸,更不能指望掉馅饼
查看>>
20161114记录一件工作的事
查看>>
CSharp设计模式读书笔记(10):装饰模式(学习难度:★★★☆☆,使用频率:★★★☆☆)...
查看>>
#、%和$符号在OGNL表达式中的作用
查看>>
java for循环的几种写法
查看>>