Reactor model
Traditional IO model
characteristic:
- 1) Using blocking I/O model to obtain input data;
- 2) Each connection requires an independent thread to complete the complete operation of data input, business processing and data return. Each thread is complete
Existing problems:
- 1) When the number of concurrency is large, a large number of threads need to be created to process the connection, which takes up a large amount of system resources;
- 2) After the connection is established, if the current thread has no data readable temporarily, the thread will block the Read operation, resulting in a waste of thread resources.
Single Reactor single thread
Among them, Select is the standard network programming API introduced by the I/O multiplexing model above, which can realize the application program to listen for multiple connection requests through a blocking object. The schematic diagrams of other schemes are similar.
Scheme Description:
- 1) The Reactor object monitors the client request events through Select and distributes them through Dispatch after receiving the events;
- 2) If it is a connection request event, the Acceptor processes the connection request through Accept, and then creates a Handler object to handle the subsequent business processing after the connection is completed;
- 3) If it is not a connection establishment event, Reactor will distribute and call the Handler corresponding to the connection to respond;
- 4) The Handler will complete the complete business process of Read → business processing → Send.
Advantages: the model is simple. There are no problems of multithreading, process communication and competition. All of them are completed in one thread.
Disadvantages: performance problem. There is only one thread, which can not give full play to the performance of multi-core CPU. When the Handler handles the business on a connection, the whole process cannot handle other connection events, which can easily lead to performance bottlenecks.
Single Reactor multithreading
Scheme Description:
- 1) The Reactor object monitors the client request events through Select and distributes them through Dispatch after receiving the events;
- 2) If it is a connection establishment request event, the Acceptor processes the connection request through Accept, and then creates a Handler object to handle various subsequent events after the connection is completed;
- 3) If it is not a connection establishment event, Reactor will distribute and call the Handler corresponding to the connection to respond;
- 4) The Handler is only responsible for responding to events without specific business processing. After reading data through Read, it will be distributed to the subsequent Worker thread pool for business processing;
- 5) The Worker thread pool will allocate independent threads to complete the real business processing. How to send the response results to the Handler for processing;
- 6) After receiving the response result, the Handler returns the response result to the Client through Send.
The business is separated in different threads
Advantages: it can make full use of the processing power of multi-core CPU.
Disadvantages: multi thread data sharing and access are complex; Reactor is responsible for monitoring and responding to all events and runs in a single thread. It is easy to become a performance bottleneck in high concurrency scenarios.
Master slave Reactor multithreading
In the single Reactor multithreading model, Reactor runs in a single thread, which is easy to become a performance bottleneck in high concurrency scenarios, so Reactor can run in multithreading.
Scheme Description:
- 1) The MainReactor object of the main thread of the Reactor monitors the connection establishment event through Select. After receiving the event, it receives it through the Acceptor and processes the connection establishment event;
- 2) After the Acceptor handles the connection establishment event, MainReactor assigns the connection sub thread to SubReactor for processing;
- 3) SubReactor adds the connection to the connection queue to listen, and creates a Handler to handle various connection events;
- 4) When a new event occurs, SubReactor will call the Handler corresponding to the connection to respond;
- 5) After the Handler reads the data through Read, it will be distributed to the subsequent Worker thread pool for business processing;
- 6) The Worker thread pool will allocate independent threads to complete the real business processing. How to send the response results to the Handler for processing;
- 7) After receiving the response result, the Handler returns the response result to the Client through Send.
Advantages: the data interaction between the parent thread and the child thread is simple, and the responsibilities are clear. The parent thread only needs to receive new connections, and the child thread completes the subsequent business processing.
The data interaction between the parent thread and the child thread is simple. The Reactor main thread only needs to pass the new connection to the child thread, and the child thread does not need to return data.
This model is widely used in many projects, including the support of Nginx master-slave Reactor multi process model, Memcached master-slave multithreading and Netty master-slave multithreading model.
Netty
Thread model
[external chain picture transfer failed. The source station may have anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-5ikoox1d-1650085348023)( https://unpkg.zhimg.com/youthlql @1.0.0/netty/introduction/chapter_ 002/0012. png)]
Netty abstracts two groups of thread pools. BossGroup is responsible for receiving client connections (primary reactor) and WorkerGroup is responsible for network read-write (secondary reactor)
Both BossGroup and WorkerGroup types are NioEventLoopGroup
The thread in NioEventLoopGroup is NioEventLoop
NioEventLoop refers to a thread that continuously circulates to execute processing tasks. Each NioEventLoop has a Selector to listen to the network communication of the socket bound to it
The select of the BossGroup thread listens to SeverSocketChannel, that is, accept connection events
The worker group thread select s and listens for SocketChannel, i.e. read / write events
Pipeline, which contains channels, that is, the corresponding channels can be obtained through the pipeline, and many processors are maintained in the pipeline.
The handler is responsible for read and write operations. The business part is handed over to the business thread pool, not to the handler for direct processing
The I/O thread (Work thread) reads the message from the TCP buffer to the receiving buffer of SocketChannel; The I/O thread is responsible for generating corresponding events, triggering the event to execute upward and scheduling it into ChannelPipeline; I/O thread scheduling executes the corresponding method of Handler chain in ChannelPipeline until the last Handler of business implementation; Last Handler encapsulates the message into Runnable and puts it into the business thread pool for execution. The I/O thread returns and continues to read / write and other I/O operations; The business thread pool pops up messages from the task queue and executes business logic concurrently.
Echo instance
Bootstrap
Boot class
Single threaded model configuration
EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); //Set relevant parameters bootstrap.group(group) //Set thread group .channel(NioSocketChannel.class) // Set the implementation class of the client channel (reflection) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler()); //Add your own processor } });
Master-slave multithreading model
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); //Use chain programming to set bootstrap.group(bossGroup, workerGroup) //Set up two thread groups .channel(NioServerSocketChannel.class) //NioSocketChannel is used as the channel implementation of the server .option(ChannelOption.SO_BACKLOG, 128) // Set the number of thread queues waiting for connections .childOption(ChannelOption.SO_KEEPALIVE, true) //Set keep active connection status // . handler(null) / / this handler corresponds to bossgroup and childhandler corresponds to workerGroup .childHandler(new ChannelInitializer<SocketChannel>() {//Create a channel initialization object (anonymous object) //Set processor for pipeline @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("customer socketchannel hashcode=" + ch.hashCode()); //You can use a collection to manage socketchannels. When pushing messages, you can add services to the taskQueue or scheduleTaskQueue of NIOEventLoop corresponding to each channel ch.pipeline().addLast(new NettyServerHandler()); } }); // Set the processor for the pipeline corresponding to the EventLoop of our workerGroup
attribute
channel is responsible for the serverSocketChannel connected
childHandler read / write socketChannel Processor Handler
option is responsible for the connected channel
ChannelOption.SO_BACKLOG: store the maximum number of three handshakes waiting for connection
Semi connection queue: the first handshake exists in the semi connection queue of the server kernel
Full connection queue: exists in the full connection queue of the server after the handshake
The parameter of option(ChannelOption.SO_BACKLOG, 128) is subject to the small one. Generally, the full connection is large
ChannelOption.TCP_NODELAY: the default is false. Send cumulatively
true data is sent in real time
channel
Channel: connection channel
handler: read / write, business logic processing related
channelPipeline: an ordered container that manages ChannelHandler and is associated with channel
Handler
Read / write processing can be seen as processing ByteBuf as an interceptor, and multiple handler s form a chain for transmission
Inherited adapter, adapter mode The adapter implements all the interface methods by default If you implement the interface directly, you need to implement all methods Code redundancy
public class EchoServerHandler extends ChannelInboundHandlerAdapter{ public void channelRegistered(ChannelHandlerContext ctx) { System.out.println("register"); } public void channelActive(ChannelHandlerContext ctx) { System.out.println("activation"); } public void channelInactive(ChannelHandlerContext ctx) { System.out.println("to break off"); } public void channelUnregistered(ChannelHandlerContext ctx) { System.out.println("cancellation"); } public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("Read message"); } public void channelReadComplete(ChannelHandlerContext ctx) { System.out.println("Message read complete"); } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { System.out.println("User events"); } public void channelWritabilityChanged(ChannelHandlerContext ctx){ System.out.println("The writable status changes to"+ctx.channel().isWritable()); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { System.out.println("exception occurred"); }
ChannelHandlerContext
The ChannelPipeline actually maintains a two-way linked list composed of ChannelHandlerContext
ChannelPipeline addFirst(ChannelHandler... handlers), add a business processing class (handler) to the first position in the chain, ChannelPipeline addLast(ChannelHandler... handlers), and add a business processing class (handler) to the last position in the chain
Channel and channelpipeline call methods flow through the pipeline (broadcast), and ChannelHandlerContext only flows through subsequent handler nodes
Channel channel=ctx.chennel(); channel.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)Meow", CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)Meow", CharsetUtil.UTF_8));
Inbound outbound sequence
pipline.addLast(in1) .addLast(out2) .addLast(in2) .addLast(out1);
Every time a read event occurs, the Inbound method will be called from beginning to end, i.e. Inbound method processing;
When a write event is triggered, outbound method processing will be called from end to end.
1. Inbound operation mainly refers to the operation of reading data; The outbound operation mainly refers to the operation of writing data
2. The inbound Handler will be read first and then executed; The outbound will execute the outbound Handler before writing
Here, in fact, the operation of the entry and exit station has been explained, but the underlying problem has not been solved: why can it be realized with only a two-way linked list?
Although it looks like two linked lists are used, in fact, inbound and outbound operations are realized by traversing the same two-way linked list.
When the operation is read, the findContextInbound(int mask) method will be called to traverse the InboundHandler from beginning to end. Note that only the Inbound operation is traversed;
When the operation is write, findContextOutbound(int mask) will be called to traverse the OutBound handler from end to end. At this time, only the OutBound operation will be executed
in1->in2:ctx.fireChannelRead(data)
This method with "Read" means to call the next in to process the data in the receive buffer. It will not call an out because it is a "Read" method
in2->out1:ctx.write(data)
This method is called in2 in the order netty is executed, and out2 is called instead of out1
out1->out2:ctx.write(data),ctx.writeAndFlush(data)
No more, refer to Read
in1->out2,out1->out2:ctx.channel().write(data)
This is quite special. When channel() writes data, it will call the last out (to be exact, the out handler closest to the tail)
ChannelHandlerContext summary
read is executed by finding nodes in the InboundHandler in order, and write is executed by finding nodes in the OutboundHandler in reverse order
Data transfer between inboundhandlers through CTX fireChannelRead(data)
InboundHandler passes CTX Write (data), passed to OutboundHandler
The InboundHandler is placed at the end. Otherwise, it will not be executed after the OutboundHandler
The server InboundHandler executes first (receives the request response)
ChannelFuture
Client connection server
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channel(): returns the channel associated with ChannelFuture;
addListener(): add the specified listener to Future. When Future completes, the specified listener will be notified. If Future has been completed, notify the designated listener immediately;
addListeners(): the same as the above method, except that this method can add a series of listeners;
removeListener(): deletes the specified listener that appears for the first time from Future. When Future is completed, the specified listener will not be notified. If the specified listener is not associated with this Future, this method does nothing and returns silently.
removeListeners(): the same as the above method, except that this method can remove a series of listeners;
sync(): wait for the Future until it is completed. If the Future fails, the failure reason will be thrown;
Syncuninterruptible(): sync() that will not be interrupted;
await(): wait for Future to complete;
Awaituninterruptible(): await() that will not be interrupted;
Multi Handler example
package handler; import echo.NettyServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class HandlerServer { public static void main(String[] args) throws Exception { //Create BossGroup and WorkerGroup //explain //1. Create two thread groups, bossGroup and workerGroup //2. bossGroup only handles connection requests. The real and client business processing will be handed over to the worker group //3. Both are infinite loops //4. Number of sub threads (NioEventLoop) contained in bossgroup and workerGroup // Default actual cpu cores * 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8 try { //Create a server-side startup object and configure parameters ServerBootstrap bootstrap = new ServerBootstrap(); //Use chain programming to set bootstrap.group(bossGroup, workerGroup) //Set up two thread groups .channel(NioServerSocketChannel.class) //NioSocketChannel is used as the channel implementation of the server .option(ChannelOption.SO_BACKLOG, 128) // Set the number of thread queues waiting for connections .childOption(ChannelOption.SO_KEEPALIVE, true) //Set keep active connection status // . handler(null) / / this handler corresponds to bossgroup and childhandler corresponds to workerGroup .childHandler(new ChannelInitializer<SocketChannel>() {//Create a channel initialization object (anonymous object) //Set processor for pipeline @Override protected void initChannel(SocketChannel ch) throws Exception { //System.out.println("customer socketchannel hashcode=" + ch.hashCode())// You can use a collection to manage socketchannels. When pushing messages, you can add services to the taskQueue or scheduleTaskQueue of NIOEventLoop corresponding to each channel ch.pipeline().addLast(new InHandler1()); ch.pipeline().addLast(new OutHandler2()); ch.pipeline().addLast(new OutHandler1()); ch.pipeline().addLast(new InHandler2()); } }); // Set the processor for the pipeline corresponding to the EventLoop of our workerGroup System.out.println(".....The server is ready..."); //Bind a port and synchronously generate a ChannelFuture object (that is, return such an object immediately) //Start the server (and bind the port) ChannelFuture cf = bootstrap.bind(6669).sync(); //Register a listener for cf to monitor events we care about cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("Listening port 6669 succeeded"); } else { System.out.println("Listening port 6669 failed"); } } }); //Listen for channel closing events cf.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package handler; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import java.nio.ByteBuffer; public class InHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx,Object msg) { ByteBuf data=(ByteBuf) msg; System.out.println("In1 received"+data.toString(CharsetUtil.UTF_8)); ctx.fireChannelRead(Unpooled.copiedBuffer("In1"+data.toString() ,CharsetUtil.UTF_8) ); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) { cause.printStackTrace(); ctx.close(); } }
package handler; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import java.nio.ByteBuffer; public class InHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf data=(ByteBuf) msg; System.out.println("In2 received"+data.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer("In2"+data.toString() , CharsetUtil.UTF_8) ); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) { cause.printStackTrace(); ctx.close(); } }
package handler; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.util.CharsetUtil; import java.nio.ByteBuffer; public class OutHandler1 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { ByteBuf data=(ByteBuf) msg; System.out.println("write"+data.toString()); ctx.write(Unpooled.copiedBuffer("In2"+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); ctx.flush(); } }
package handler; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.util.CharsetUtil; public class OutHandler2 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { ByteBuf data=(ByteBuf) msg; System.out.println("write"+data.toString()); ctx.write(Unpooled.copiedBuffer("In2"+data.toString(CharsetUtil.UTF_8), CharsetUtil.UTF_8)); ctx.flush(); } }
Asynchronous model
The concept of asynchrony is relative to that of synchronization. When an asynchronous procedure call is issued, the caller can not immediately get the result. The component that actually handles the call notifies the caller through status, notification, and callback after completion.
I/O operations in Netty are asynchronous. Bind, Write, Connect and other operations will simply return a ChannelFuture first.
The caller cannot obtain the result immediately, but through the future listener mechanism, the user can easily obtain the IO operation result actively or through the notification mechanism.
Netty's asynchronous model is based on Future and callback. A callback is a callback. Focus on Future. Its core idea is: assuming a method, fun, the calculation process may be very time-consuming, and it is obviously inappropriate to wait for fun to return. Then you can immediately return a Future when calling fun, and then you can monitor the processing process of the method fun through Future (i.e. Future listener mechanism)
Future description
Represents the asynchronous execution result. You can use the methods it provides to detect whether the execution is completed, such as retrieval and calculation.
ChannelFuture is an interface: public interface ChannelFuture extends Future. We can add listeners. When the monitored events occur, we will notify the listeners.
Future method
Judge whether the current operation is completed by isDone method;
Judge whether the completed current operation is successful through the isSuccess method;
Get the reason why the completed current operation failed through the getCause method;
Judge whether the completed current operation is cancelled by isCancelled method;
Register the listener through the addListener method. When the operation is completed (the isDone method returns completion), the specified listener will be notified; Notifies the specified listener if the Future object has completed
//Bind a port and synchronize to generate a ChannelFuture object //Start the server (and bind the port) ChannelFuture cf = bootstrap.bind(6668).sync(); //Register a listener for cf to monitor events we care about cf.addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("Listening port 6668 succeeded"); } else { System.out.println("Listening port 6668 failed"); } } });
netty encoding and decoding
decoder
ByteToMessageDecoder
It is used to convert bytes into messages. It is necessary to judge whether there is enough data in the buffer
public class ToIntegerDecoder extends ByteToMessageDecoder { //1 @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() >= 4) { //2 out.add(in.readInt()); //3 } } }
ReplayingDecoder
If you use ReplayingDecoder, you don't need to check it yourself; If there are enough bytes in ByteBuf, it will be read normally; If there are not enough bytes, decoding will stop.
ReplayingDecoder is slightly slower than ByteToMessageDecoder
Take out the binary array of bytebuf, deserialize the object and add it to the object list. Pass out to subsequent handler s
public class ToIntegerDecoder2 extends ReplayingDecoder<Void> { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { int length = in.readInt(); byte[] bytes = new byte[length]; in.readBytes(bytes); Object obj = serializer.deserialize(bytes, packageClass); out.add(in.readInt()); } }
Ecoder
MessageToByteEncoder
The encoder receives the msg message, encodes the serialized objects into binary and writes them to ByteBuf. ByteBuf then advances to the ChannelOutboundHandler of the next pipeline.
@Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { out.writeInt(MAGIC_NUMBER); if (msg instanceof RpcRequest) { out.writeInt(PackageType.REQUEST_PACK.getCode()); } else { out.writeInt(PackageType.RESPONSE_PACK.getCode()); } out.writeInt(serializer.getCode()); byte[] bytes = serializer.serialize(msg); out.writeInt(bytes.length); out.writeBytes(bytes); }
TCP sticky packet
What is TCP packet sticking problem?
Sticky packets are caused by tcp, but it prevents the application layer data from sticking (complete request response) rather than the data of tcp packets. Multiple tcp may contain one application layer data or multiple application layer data (nagle algorithm). Stickiness occurs after the tcp header is removed, causing confusion due to the accumulation of buffer application layer data
tcp enables the nagle algorithm by default (reason of the sender)
A group of tcp packets are sent to the buffer, the header tcp header is removed, and the socket reads the buffer data. The data reading speed is slower than the receiving speed. If there is no sticky packet processing, sticky packets will be generated. (receiver's reason)
Data in the same application layer can be sticky under tcp. Different application layer data needs to be segmented.
Application layer data (http example) and tcp:
How to deal with sticking phenomenon?
(1) Sender
The sticky packet problem caused by the sender can be solved by turning off the Nagle algorithm and using TCP_NODELAY option to turn off the algorithm.
(2) Receiver
The receiver has no way to deal with the packet sticking phenomenon, and can only hand over the problem to the application layer.
Will UDP cause packet sticking problem?
In order to ensure reliable transmission and reduce additional overhead (verification is required for each contract), TCP adopts stream based transmission. Stream based transmission does not think that messages are one by one and has no protected message boundary (protected message boundary: refers to that the transmission protocol transmits data as an independent message on the Internet, and the receiving end can only accept one independent message at a time).
UDP is message transmission oriented and has a protected message boundary. The receiver only accepts one independent message at a time, so there is no packet sticking problem.
For example: there are three data packets with sizes of 2k, 4k and 6k respectively. If UDP is used for sending, no matter how large the receiving cache of the receiver is, we must send the data packets at least three times before sending them. However, if TCP protocol is used for sending, we only need the receiving cache of the receiver to be 12k in size to send all the three data packets at one time.
Sticky package instance
(115 messages) introduction to Netty - third message_ youthlql blog - CSDN blog Based on the examples in
public class MyServer { public static void main(String[] args) throws Exception{ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //Customize an initialization class ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //pipeline.addLast(new LineBasedFrameDecoder(1024)); //pipeline.addLast(new StringDecoder()); pipeline.addLast(new MyServerHandler()); } }
public class MyServerHandler extends ChannelInboundHandlerAdapter { private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // String body=(String) msg; // System.out.println("server receives" + body + "times" + + + count); ByteBuf body=(ByteBuf) msg; System.out.println("Server received"+body.toString()+" frequency"+ ++count); System.out.println("!!!!!!!!!!!!!!!"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //cause.printStackTrace(); ctx.close(); } }
public class MyClientHandler extends ChannelInboundHandlerAdapter { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //Use the client to send 10 pieces of data, hello,server number for(int i= 0; i< 10; ++i) { System.out.println("send out "+i); byte[] req=("In2 received"+System.getProperty("line.separator")).getBytes(); ByteBuf buffer = Unpooled.buffer(req.length); buffer.writeBytes(req); ctx.writeAndFlush(buffer); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
result
Server: Server received PooledUnsafeDirectByteBuf(ridx: 0, widx: 110, cap: 2048) Number 1 !!!!!!!!!!!!!!! client: Send 0 Send 1 Send 2 Send 3 Send 4 Send 5 Send 6 Send 7 Send 8 Send 9
Message length of application layer protocol
The application layer of the sender adds the protocol header (message header) before the actual data (message body), removes the header after tcp transmission, and the receiver obtains the application layer data (header and body) for processing
1. Decode (decoding component) reads buffer data
2. Read the message header. Each part of the message header is fixed in length and will not be disordered
3. Create a byte array according to the data length of the message header
4. Read the message body of data into byte array
agreement:
packgeCode(int)
length(int)
Data (variable length)
class:Decode
int packageCode = byteBuf.readInt(); Class<?> packageClass; if (packageCode == 0) { packageClass = RpcRequest.class; } else if (packageCode == 1) { packageClass = RpcResponse.class; } else { logger.error("Unrecognized packet: {}", packageCode); throw new Exception("Unknown packet"); } int length = byteBuf.readInt(); byte[] bytes = new byte[length]; byteBuf.readBytes(bytes); Object obj = deserializer.deserialize(bytes, packageClass); list.add(obj);
packageCode determines that the inverse sequence is a response or request object
Length: data body length
Newline segmentation
Using LineBasedFrameDecoder decoder
Split by \ n
class MyServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LineBasedFrameDecoder(1024)); pipeline.addLast(new StringDecoder()); pipeline.addLast(new MyServerHandler()); } }
public class MyClientHandler extends ChannelInboundHandlerAdapter { private int count; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //Use the client to send 10 pieces of data, hello,server number for(int i= 0; i< 10; ++i) { System.out.println("send out "+i); byte[] req=("In2 received"+System.getProperty("line.separator")).getBytes();//Client data followed by \ n ByteBuf buffer = Unpooled.buffer(req.length); buffer.writeBytes(req); ctx.writeAndFlush(buffer); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
public class MyServerHandler extends ChannelInboundHandlerAdapter { private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body=(String) msg; System.out.println("Server received"+body+" frequency"+ ++count); //ByteBuf body=(ByteBuf) msg; //System.out.println("received by the server" + body.toString() + "times" + + + count); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //cause.printStackTrace(); ctx.close(); } }
Separator segmentation
Using the DelimiterBasedFrameDecoder decoder
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ByteBuf decode= Unpooled.copiedBuffer("$$".getBytes(StandardCharsets.UTF_8)); pipeline.addLast(new DelimiterBasedFrameDecoder(1024,decode) ); pipeline.addLast(new StringDecoder()); pipeline.addLast(new MyServerHandler()); }
MyClientHandler
public void channelActive(ChannelHandlerContext ctx) throws Exception { String message="inherit SimpleChannelInboundHandler,because Sim$$" + "pleChannelInboundHandler Has helped us put the logic that has nothing to do with business $$" + "stay ChannelRead Method is implemented, we only need to implement its chan$$" + "nelRead0 Methods to complete our logic is enough: $$"; ByteBuf mes=null; mes=Unpooled.buffer(message.getBytes(CharsetUtil.UTF_8).length); System.out.println("send out"); mes.writeBytes(message.getBytes()); ctx.writeAndFlush(mes); }
ByteBuf
Byte container for transferring data between handler s
ByteBuf is an optimized and easy-to-use data container. Byte data can be effectively added to ByteBuf or directly obtained from ByteBuf. There are two indexes in ByteBuf: one for reading and one for writing. These two indexes achieve the purpose of easy operation. We can read the data in order, or we can read the data repeatedly by adjusting the index of the read data or directly passing the read location index as a parameter to the get method.
It is the optimization of NIO's byteBuffer, such as two indexes
handler can be regarded as an interceptor for bytebuf processing
heartbeat mechanism
scene
The heartbeat mechanism is to check whether the long connection can still be connected and know whether the other party is down. If it is disconnected, close to save resources or reconnect
optimization
Normally, the client sends heartbeat packets and the server replies to heartbeat packets. However, the concurrency of the server is much greater than that of the client, so the server knows that it is important for the client to release resources when the client is down, and there is little impact if the client is not concurrent and has redundant long connections. Therefore, the server does not need to send heartbeat packets to a large number of clients, but does not reply. Because compared with the heartbeat between the master and the slave in the database, the heartbeat of netty is more to close invalid long connections and reduce resource consumption
technological process
1)Client connection server 2)On the client side ChannelPipeline Add a special IdleStateHandler,Set the write idle time of the client, for example, 5 s 3)When all clients ChannelHandler Medium 5 s Not inside write Event, it will be triggered userEventTriggered Method (described above) 4)We are on the client side userEventTriggered Send a heartbeat packet to the server under the corresponding trigger event in to detect whether the server is still alive and prevent the server from being down and the client does not know 5)Similarly, the server should respond to the heartbeat packet. In fact, the best reply to the client is "no reply", which can reduce the pressure of the server. If there is 10 w Free Idle It is also a bother for the server to send heartbeat reply. How can we tell the client that it is still alive? In fact, it is very simple, because 5 s The server will receive the heartbeat information from the client. If the heartbeat information is not received within 10 seconds, the server can think that the client is hung up close link 6)If the server is shut down due to any factor, all links will be closed. Therefore, what the client needs to do is short-term reconnection