NETTY-WebSocket

WebSocket协议简介
WebSocket
是html5规范新引入的功能,用于解决浏览器与后台服务器双向通讯的问题,使用WebSocket技术,后台可以随时向前端推送消息,以保证前后台状态统一,在传统的无状态HTTP协议中,这是“无法做到”的。
在WebSocket出现之前,传统的服务端向浏览器推送消息的技术包括:ajax、flash、comet、java applet等。无一例外,这些技术使用的都是长轮循,即每隔一段时间去请求后台,以获取最新状态。长轮询方式容易实现,但效果也差,频繁盲目的调用后台,带来不必要的开销,且实时性无法保障,后台出现更新,前端需要在下一次轮询时才知道。
WebSocket协议支持服务端与浏览器建立长连接,双方可以随时发送数据给对方,不再是由客户端控制的一问一答的方式。在实现推送功能的时候,主要是由服务端给客户端发送数据。
基于长轮循(polling)和websocket推送的浏览器(browser)和服务端(Server)的交互对比图如下所示:

由于WebSocket协议建立在http协议的基础之上,因此二者有很多的类似之处。事实上,在使用websocket协议时,浏览器与服务端最开始建立的还是http连接,之后再将协议从http转换成websocket,协议转换的过程称之为握手(handshake),表示服务端与客户端都同意建立websocket协议。
需要注意的是,由于websocket是新的协议,需要浏览器和web服务端都支持的情况下,才能建立连接成功。
请求报文
正常情况下,连接在建立的时候,浏览器向服务端发送一个HTTP请求,通过包含一些额外信息,表明其希望将协议从HTTP转换成WebSocket。这个额外信息实际上就是增加了一个请求头Update,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| GET ws://echo.websocket.org/?encoding=text HTTP/1.1
Origin: http://websocket.org
Cookie: __utma=99as
Connection: Upgrade
Host: echo.websocket.org
Sec-WebSocket-Key: uRovscZjNol/umbTt5uKmw==
Upgrade: websocket
Sec-WebSocket-Version: 13
|
可以看到Update
请求头的值为websocket,表示希望将Http协议转换成websocket协议。
Origin
表示的是,连接开始时发送http请求的地址。
注意GET的路径是以ws开头,这是因为WebSocket是一种全新的协议,不属于http无状态协议,协议名为”ws”,与http协议使用相同的80端口,类似的,”wss”和https协议使用相同的443端口。
客户端握手请求中的 Sec-WebSocket-Key
头字段中的内容是随机的,采用 base64 编码。服务端会接收到这个值之后,会将其与一个魔幻数字258EAFA5-E914-47DA-95CA-C5AB0DC85B11进行连接,使用SHA-1加密后,采用base64编码,以Sec-WebSocket-Accept
响应头返回。
如果服务端接受了客户端将http协议转换成websocket协议连接的请求,会返回类似如下响应,表示协议切换成功:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| HTTP/1.1 101 WebSocket Protocol Handshake
Date: Fri, 10 Feb 2012 17:38:18 GMT
Connection: Upgrade
Server: Kaazing Gateway
Upgrade: WebSocket
Access-Control-Allow-Origin: http://websocket.org
Access-Control-Allow-Credentials: true
Sec-WebSocket-Accept: rLHCkw/SKsO9GAH/ZSFhBATDKrU=
Access-Control-Allow-Headers: content-type
|
浏览器支持情况
可以在以下网址看到目前支持webscoket协议的主流浏览器和版本:http://caniuse.com/#feat=websockets

WebSocket客户端开发
支持html5的浏览器,一般都会提供一个内置的js对象Websocket
,开发者利用这个对象就可以与服务端建立websocket连接。特别的在FireFox中,这个对象为在Firefox中为MozWebSocket
。
浏览器支持检测
可以通过以下js代码检测一个浏览器是否支持websocket
1 2 3 4 5
| window.WebSocket = window.WebSocket || window.MozWebSocket; if (!window.WebSocket){ alert("WebSocket not supported by this browser"); return; }
|
建立连接
websocket协议连接的创建
1
| var myWebSocket = new WebSocket("ws://www.websockets.org");
|
关闭连接
关闭连接使用Websocket对象的close方法
发送消息
发送消息使用Websocket对象的send方法
1
| myWebSocket.send("Hello WebSockets!");
|
其他方法
Websocket对象还提供了几个回调方法,在适当的实际会被回调:
1 2 3 4 5 6 7 8 9 10 11
|
myWebSocket.onopen = function(evt) { alert("Connection open ..."); };
myWebSocket.onmessage = function(evt) { alert( "Received Message: " + evt.data); };
myWebSocket.onclose = function(evt) { alert("Connection closed."); };
|
完整的客户端案例

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> </head> <body> <form onsubmit="return false;"> <h1> Netty WebSocket 协议 </h1> <h3>客户端请求消息</h3> <textarea id="requestText" style="width:200px;height:100px;"></textarea> <input type="button" value="发送WebSocket请求消息" onclick="send(document.getElementById('requestText').value)"/> <h3>服务端返回的应答消息</h3> <textarea id="responseText" style="width:200px;height:100px;"></textarea> </form> <script type="text/javascript"> window.WebSocket = window.WebSocket || window.MozWebSocket; if (!window.WebSocket){ alert("你的浏览器不支持websocket协议"); }else{ var socket = new WebSocket("ws://localhost:8080/websocket"); socket.onmessage = function (event) { var ta = document.getElementById('responseText'); ta.value = event.data }; socket.onopen = function (event) { alert("websocket连接建立成功..."); }; socket.onclose = function (event) { alert("连接关闭"); }; function send(message) { if (!window.WebSocket) { return; } if (socket.readyState == WebSocket.OPEN) { socket.send(message); } else { alert("WebSocket not supported by this browser"); } } } </script> </body> </html>
|
Netty Websocket服务端开发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| public class WebSocketServer { public void run(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("http-codec", new HttpServerCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); pipeline.addLast("handler", new WebSocketServerHandler()); } }); Channel ch = b.bind(port).sync().channel(); System.out.println("Web socket server started at port " + port + '.'); System.out .println("Open your browser and navigate to http://localhost:" + port + '/'); ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args.length > 0) { try { port = Integer.parseInt(args[0]); } catch (NumberFormatException e) { e.printStackTrace(); } } new WebSocketServer().run(port); } }
|
上述代码中的HttpServerCodec、HttpObjectAggregator、ChunkedWriteHandler 都是Netty提供的对tttp或webscoket协议支持的ChannelHandler:
HttpServerCodec
HttpServerCodec
的作用是将请求或者应答消息按照HTTP协议的格式进行解码或者编码。
HttpObjectAggregator
HttpObjectAggregator
的目的是将HTTP消息的多个部分组合成一条完整的HTTP消息,也就是处理粘包与解包问题。
ChunkedWriteHandler
ChunkedWriteHandler
用来向客户单发送HTML文件,它主要用于支持浏览器和服务端进行WebSocket通信。
WebSocketServerHandler
WebSocketServerHandler是
我们自己编写的处理webscoket请求的ChannelHandler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
| public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> { private static final Logger logger = Logger .getLogger(WebSocketServerHandler.class.getName()); private WebSocketServerHandshaker handshaker; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { handleWebSocketFrame(ctx, (WebSocketFrame) msg); } } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { if (!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); return; } WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( "ws://localhost:8080/websocket", null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory .sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } if (frame instanceof PingWebSocketFrame) { ctx.channel().write( new PongWebSocketFrame(frame.content().retain())); return; } if (!(frame instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException(String.format( "%s frame types not supported", frame.getClass().getName())); } String request = ((TextWebSocketFrame) frame).text(); if (logger.isLoggable(Level.FINE)) { logger.fine(String.format("%s received %s", ctx.channel(), request)); } ctx.channel().write(new TextWebSocketFrame(" 收到客户端请求:"+request)); } private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { if (res.getStatus().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); setContentLength(res, res.content().readableBytes()); } ChannelFuture f = ctx.channel().writeAndFlush(res); if (!isKeepAlive(req) || res.getStatus().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|
在上述代码中,当接收到客户端发送的请求消息时,首先通过msg是FullHttpRequest
还是WebSocketFrame
来判断判断是http请求还是websocket请求。然后分别调用handleHttpRequest、handleWebSocketFrame方法来处理。
建立连接
对于handleHttpRequest方法,仅仅在第一次请求的时候被调用一次
连接建立时,第一次发送的肯定是一个http请求,因为将http升级websocket协议需要服务端支持,但是并不是所有的服务端都支持websocket协议,因此服务端需要在handleHttpRequest方法中,通知客户端,服务端是否同意将HTTP协议升级成WebSocket协议。在我们的案例中,通过WebSocketServerHandshaker
的handshake
方法,告诉客户端,服务端同意协议升级,此时websocket连接建立完成。
执行流程
一旦websocket协议建立完成,之后的所有的请求都会走handleWebSocketFrame
方法,这个方法的处理顺序如下:
判断是否是CloseWebSocketFrame
请求,如果是,关闭连接
判断是否是PingWebSocketFrame
消息,如果是维持链路的Ping消息,则构造Pong消息返回。Ping和Pong是websocket里的心跳,用来保证客户端是在线的。
判断消息是否是TextWebSocketFrame
文本消息。因为websocket消息是基于HTTP协议的,http除了支持文本消息,还是支持图片上传等复杂的二进制消息。按我们的案例中,只对文本消息进行处理。
返回应答消息
WebSocket帧
在handleWebSocketFrame方法中,我们使用到了webscocket中不同的帧类型,CloseWebSocketFrame、PingWebSocketFrame、TextWebSocketFrame。帧类型实际上就是WebSocket协议定义的消息类型。在一个完整的WebSocket帧中(如下图所示),包含了一个4位的二进制Opcode
字段,其用来表示消息类型。

Opcode的取值和消息类型的对应关系如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| OPCODE:4位
0x0表示附加数据帧
0x1表示文本数据帧
0x2表示二进制数据帧
0x3-7暂时无定义,为以后的非控制帧保留
0x8表示连接关闭
0x9表示ping
0xA表示pong
0xB-F暂时无定义,为以后的控制帧保留
|
可以看到,虽然4位2进制最多支持16种消息类型,但是目前实际上只定义了六种消息类型。在Netty中,针对这每一种帧类型,都有一个java类与之对应,如下所示。

其中Close、Ping以及Pong称之为控制帧,Close关闭帧很容易理解,客户端如果接受到了就关闭连接,客户端也可以发送关闭帧给服务端。Ping和Pong是websocket里的心跳,用来保证客户端是在线的。
Text和Binary表示这个传输的帧是文本类型还是二进制类型,二进制类型传输的数据可以是图片或者语音之类的。
如何实现消息推送
之前的案例,还是通过客户端发送请求,服务端进行响应的方式进行交互。WebSocket协议还可以实现服务端主动向客户端推送消息的功能。
主动推送需要考虑的问题包括:
服务端要保存用户标识
服务端要保存用户唯一标记(假设为userId)与其对应的SocketChannel的对应关系,以便之后根据这个唯一标记给用户推送消息。
对应关系的保存很简单,用一个Map来记录即可。保存这种对应关系的最佳时机是websocket连接刚刚建立的时候,因为这种对应关系只需要保存一次 ,之后就可以拿着这个userId找到对应SocketChannel给用户推送消息。以之前的代码为例,很明显这个操作,应该放在handleHttpRequest方法中处理,因为这个方法在一个websocket连接的生命周期只会调用一次。
我们知道,当连接建立时,我们在服务端就能获取到客户端对应的SocketChannel,我们如果获取到userId呢?WebSocket协议与HTTP协议一样,都支持在url中添加参数。如:
我们可以在handleHttpRequest
中添加到以下代码获取到这个参数
1 2 3 4 5 6 7 8 9 10 11 12 13
| public static Map<String,Channel> map=new ConcurrentHashMap(); private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { ....
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.getUri()); Map<String, List<String>> params = queryStringDecoder.parameters(); String userId=params.get("userId").get(0);
map.put(userId,ctx.channel()); }
|
此外,当用户断开与服务端的连接的时候,也就是当服务端接收到CloseWebSocketFrame
帧的时候, 需要从Map中移除对应的userId和SocketChannel,以免Map中存储过多的无效用户数据导致内存溢出。
通过userId给用户推送消息
通常消息推送是由运营人员根据业务需要,从一个后台管理界面,过滤出符合特定条件的用户来推送消息,因此需要推送的userId和推送内容都是运营指定的。推送服务端只需要根据userId找到对应的Channel,将推送消息推送给浏览器即可。代码片段如下:
1 2 3 4 5 6 7
| String userId="123456"; String msg="xxxxxxxxxxxxxxxxxxxxxx";
Channel channel = map.get(userId); channel.writeAndFlush(new TextWebSocketFrame(msg));
|
一般公司的推送的消息量都比较大,所以需要推送的消息一般会放到一个外部消息队列中,如rocketmq,kafka。推送服务端通过消费这些队列,来给用户推送消息。
客户端ack接受到的消息
一般情况下,我们会每一个推送的消息都设置一个唯一的msgId。客户端在接受推送的消息后,需要对这条消息进行ack,也就是告诉服务端自己接收到了这条消息,ack时需要将msgId带回来。
如果我们只使用websocket做推送服务,那么之前代码中的handleWebSocketFrame方法,要处理的主要就是这类ack消息。
离线消息
如果当给某个用户推送消息的时候,其并不在线,可以将消息保存下来,一般都是保存到一个缓存服务器中(如redis),每次当一个用户与服务端建立连接的时候,首先检查缓存服务器中有没有其对应的离线消息,如果有直接取出来发送给用户。
消息推送记录的存储
服务端推送完成消息记录之后,还需要将这些消息存储下来,以支持一个用户查看自己曾经受到过的推送消息。
疲劳度控制
在实现推送功能时,我们通常还会做一个疲劳度控制的功能,也就是限制给一个用户每天推送消息的数量,以免频繁的推送消息给用户造成不好的体验。疲劳度控制可以从多个维度进行设计。例如用户一天可以接受到消息总数,根据业务划分的某种特定类型的消息的一天可以接受到的消息总数等。