1. What is broken bag and sticky bag?

Before we talk about breaking packets and sticking packets, let’s talk about message protected boundaries and no message protected boundaries. 1. Message boundary protection means that the transmission protocol transmits data as an independent message over the network, and the receiver can only receive the independent message. In other words, there is a protected message boundary, and the receiver can only receive one packet sent by the sender at a time. Stream-oriented, on the other hand, has no message protection boundary. If the sender sends data continuously, the receiver may receive two or more packets in one receiving action.

TCP, on the other hand, is stream-oriented and needs to handle message boundaries at the receiving end.

When receiving data, the receiver may encounter the following four conditions

A. DataA is received first and then Datab. B. Part of dataA is received first and then the rest of dataA and all of dataB are received. C. All dataA data and some dataB data are received, and then the rest dataB data is received. D. Received all data of dataA and dataB at one time.

A is the normal situation, no sticky package or broken package. B is broken package + sticky package. C is sticky package + broken package. D is sticky bag.

2. How to deal with sticky and broken packets in Mina

In the frame of the Mina a CumulativeProtocolDecoder cumulative protocol decoders, specifically to deal with problems and broken package. The return value of doDecode() is important.

A. your doDecode () method returns true, CumulativeProtocolDecoder decode () method first determines whether you were doDecode () method of the data read from the internal IoBuffer buffer, if not, Raises an illegal status exception, meaning that your doDecode() method returns true to indicate that you have consumed the data (equivalent to reading a complete message in a chat room). Further, You must have consumed even one byte of data in the internal IoBuffer. If verified by CumulativeProtocolDecoder will check whether there is any data within a buffer is not read, if you have will continue to call doDecode () method, did not stop to doDecode () method calls, until there is new data buffer.

B. when your doDecode () method returns false, CumulativeProtocolDecoder will stop to doDecode () method calls, but at the moment, if the data are not read up, The IoBuffer buffer containing the remaining data is saved to IoSession so that it can be extracted and merged from IoSession next time the data comes. If it is found that all data has been read, the IoBuffer buffer is cleared (and the parent class receives the next packet). In short, return true when you think you have read enough data to decode, false otherwise. This CumulativeProtocolDecoder actually one of the most important job is to help you finished the accumulation of data, because this job is very troublesome. That is to say, return true, then CumulativeProtocolDecoder will call again decoder, and the rest of the data is sent down; DoDecode (), returns false, and concatenates the remaining data with the remaining data when a new packet arrives. Then call decoder.

Message header + message length (int) + message content (JSON string) + packet end, packet end is hexadecimal string 00 AA BB cc, converted into byte array 0, -86, -69, -52 four bytes, the following complete example has client, server, The data is parsed, the contents of the message (Json string) are retrieved, and the message is printed. The message is passed between the server and the client as a byte array.

Server code

package com.my.mina; import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.util.Date; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; Public class MinaService {public static void main(String[] args) {// IoAcceptor Acceptor = new NioSocketAcceptor(); // Add a log filter acceptor.getFilterChain().addLast("logger", new LoggingFilter());
		acceptor.getFilterChain().addLast("codec",
				new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8")))); Handler acceptor. SetHandler (new DemoServerHandler()); / / set to read data buffer size acceptor. GetSessionConfig () setReadBufferSize (2048); / / read/write channel without operation into the idle acceptor for 10 seconds. GetSessionConfig () setIdleTime (IdleStatus. BOTH_IDLE, 10); Try {// Bind port acceptor.bind(new InetSocketAddress(20000)); } catch (Exception e) { e.printStackTrace(); } System.out.println("Start service"); } /** * @ClassName: DemoServerHandler * @Description: * @author Chenzheng * @date 2016-12-9 PM 3:57:11 */ Private static class DemoServerHandler Extends IoHandlerAdapter {Override public void sessionCreated(IoSession Session) throws Exception { System.out.println("Server creates connection with client...");
			super.sessionCreated(session);
		}

		@Override
		public void sessionOpened(IoSession session) throws Exception {
			System.out.println("Server/client connection open..."); super.sessionOpened(session); Override public void messageReceived(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub super.messageReceived(session, message); A = (String) message; a = (String) message; System.out.println("Data received:"+ a); session.write(a); Override public void messageSent(IoSession session, Object message) throws Exception { // TODO Auto-generated method stub super.messageSent(session, message); System.out.println("Server sent message successfully..."); } // session closing @override public void sessionClosed(IoSession session) throws Exception {// TODO auto-generated method stub super.sessionClosed(session); System.out.println("Disconnect:"); }}}Copy the code

The encoder

package com.my.mina; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderAdapter; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import java.nio.charset.Charset; Public class ByteArrayEncoder extends ProtocolEncoderAdapter {private final Charset Charset; public ByteArrayEncoder(Charset charset) { this.charset = charset; } /** * send data directly, data format, Packet header + message length (int) + message content (JSON string) + packet header The packet end is a hexadecimal string 00 AA BB cc, which is converted into byte arrays 0, * -86, -69, -52 four bytes * * @param session * @param message * @param out * @throws Exception */ @override public void encode(IoSession Session, Object Message, ProtocolEncoderOutput out) throws Exception { String value = (message == null?"": message.toString()); Byte [] content = value.getBytes(charset); IoBuffer buf = IoBuffer. Allocate (38 + content.length).setautoexpand (allocate(38 + content.length).setautoexpand (true); // Buffer size 38 bytes plus character length buf.put(new byte[] {0, -86, -69, -52}); PutUnsignedInt (content.length); // Enter a fixed value at the beginning of the packet in hexadecimal 00 aa bb cc, converted to a byte array buf.putunsignedint (content.length); // int is 4 bytes, one byte is equal to 2 hexadecimal characters, so there are 8 bits 00 00 00 0c, content length. buf.put(content); // Message contents buf.put(new byte[] {0, -86, -69, -52}); / / package tail buf. Flip (); out.write(buf); // write}}Copy the code

Decoder, focus, Mina packet loss problem

package com.my.mina; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import java.nio.charset.Charset; /** * custom decoder, Make sure to read the complete package * / public class ByteArrayDecoder extends CumulativeProtocolDecoder {private final Charset Charset. public ByteArrayDecoder(Charset charset) { this.charset = charset; } @Override protected booleandoDecode(IoSession IoSession, IoBuffer IoBuffer, ProtocolDecoderOutput ProtocolDecoderOutput) throws Exception {// Packet loss, Broken package processingif(iobuffer.remaining () > 4) {iobuffer.mark (); // Mark the snapshot of the current position so that subsequent reset operations can restore the position, starting with 0 byte[] l = new byte[4]; ioBuffer.get(l); // Read the packet header, which is 4 bytesif(iobuffer.remaining () < 4)// The content length of 4 bytes is not enough, packet disconnection {iobuffer.reset ();return false; / /}else{// The content length of the 4-byte array is enough byte[] bytesLegth = new byte[4]; Iobuffer. get(bytesLegth); Int len = minautil. byteArrayToInt(bytesLegth); // How long is the contentif(iobuffer.remaining () < len)// Not enough content, broken packet {iobuffer.reset ();return false; / /}else{// Message content is sufficient byte[] bytes = new byte[len]; ioBuffer.get(bytes, 0, len); protocolDecoderOutput.write(new String(bytes, charset)); // Read the content and send itif(iobuffer.remaining () < 4) {iobuffer.reset ();return false; / /}else[] tails = new byte[4]; ioBuffer.get(tails); // Read the end of the packetif(iobuffer.remaining () > 0)// If the packet is stuck, it will be called againdoThe Deocde() method gives the rest of the data todoThe Deocde() method handles {return true;
						}

					}
				}

			}

		}
		return false; // The packet is broken, or the execution is complete,}}Copy the code

Decomcoding plant

package com.my.mina; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; import java.nio.charset.Charset; ** / Public class ByteArrayCodecFactory implements ProtocolCodecFactory {private ByteArrayDecoder decoder; private ByteArrayEncoder encoder; publicByteArrayCodecFactory() {
		this(Charset.defaultCharset());
	}

	public ByteArrayCodecFactory(Charset charSet) {
		encoder = new ByteArrayEncoder(charSet);
		decoder = new ByteArrayDecoder(charSet);
	}

	@Override
	public ProtocolDecoder getDecoder(IoSession session) throws Exception {
		return decoder;
	}

	@Override
	public ProtocolEncoder getEncoder(IoSession session) throws Exception {
		returnencoder; }}Copy the code

Note: the client and server need to have the same decoder, encoder and decoder factory classes as the server.

Client core code

package com.example.mina.minaapplication.view; import android.app.Activity; import android.os.Bundle; import android.os.Handler; import android.os.Message; import android.util.Log; import android.view.View; import android.widget.TextView; import android.widget.Toast; import com.example.mina.minaapplication.R; import com.example.mina.minaapplication.mina.ByteArrayCodecFactory; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.SocketSessionConfig; import org.apache.mina.transport.socket.nio.NioSocketConnector; import java.net.InetSocketAddress; import java.nio.charset.Charset; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; /** * Mina client */ public class MainActivity extends Activity {/** * thread pool; The execution thread thread pool as soon as possible * / private static ExecutorService ExecutorService = Executors. NewSingleThreadExecutor (); /** * connection object */ private NioSocketConnector mConnection; /** * session object */ private IoSession; /** * Private InetSocketAddress mAddress; private ConnectFuture mConnectFuture; public static final int UPADTE_TEXT = 1; /** * private TextView tvShow; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);
        tvShow = findViewById(R.id.tv_show);
        initConfig();
        connect();
        findViewById(R.id.send).setOnClickListener(new View.OnClickListener@override public void onClick(View View) {if(mConnectFuture ! . = null && mConnectFuture isConnected ()) {/ / connect to the server on mConnectFuture. GetSession (), write ("{\"id\":11,\"name\":\"ccc\"}"); // Send json string}}}); } /** * Initializes Mina configuration information */ private voidinitConfig() {
        mAddress = new InetSocketAddress("192.168.0.1", 20000); MConnection = new NioSocketConnector(); / / create a connection / / set to read data buffer size SocketSessionConfig SocketSessionConfig = mConnection. GetSessionConfig (); socketSessionConfig.setReadBufferSize(2048); socketSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE, 4); / / set 4 seconds don't read and write operations into idle mConnection. GetFilterChain () addLast ("logging", new LoggingFilter()); / / logging filter mConnection. GetFilterChain () addLast ("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8")))); McOnnection.sethandler (new DefaultHandler()); / / set the handler mConnection. SetDefaultRemoteAddress (mAddress); // Set address} /** * create connection */ private voidconnect() {

        FutureTask<Void> futureTask = new FutureTask<>(new Callable<Void>() {
            @Override
            public Void call() {//

                try {
                    while (true) { mConnectFuture = mConnection.connect(); mConnectFuture.awaitUninterruptibly(); / / until he connection mSession = mConnectFuture getSession (); // Get the session objectif(mSession ! = null && mSession.isConnected()) { Toast.makeText(MainActivity.this,"Connection successful", Toast.LENGTH_SHORT).show();
                            break; } Thread.sleep(3000); // Loop every 3 seconds}} catch (Exception e) {returnnull; }}); executorService.execute(futureTask); */ Private class DefaultHandler extends IoHandlerAdapter {@override */ Private class DefaultHandler extends IoHandlerAdapter {@override public void sessionOpened(IoSession session) throws Exception { super.sessionOpened(session); } /** * A server message is received ** @param session * @param message * @throws Exception */ @override public void messageReceived(IoSession session, Object message) throws Exception { Log.e("tag"."Server side message received:"+ message.toString()); Message message1 = new Message(); message1.what = UPADTE_TEXT; message1.obj = message; handler.sendMessage(message1); } @Override public void sessionIdle(IoSession session, Throws Exception {// The client enters the idle state. Super. sessionIdle(session, status); }} /** * Update UI */ private Handler Handler = newHandler() {
        @Override
        public void handleMessage(Message msg) {
            super.handleMessage(msg);
            switch (msg.what) {
                case UPADTE_TEXT:
                    String message = (String) msg.obj;
                    tvShow.setText(message);
                    break; }}}; }Copy the code

Client effects:

Server effect:

In this paper, a complete project code address: https://download.csdn.net/download/lb1207087645/10314510

Reference Resources:

Brief introduction to TCP sticky packet and broken packet problems and solutions

Mina Custom codec receives and handles Byte arrays (while solving sticky and missing packet problems in data transfer)