preface

I forgot to say a few words in the preface of the last article

Because Kafka source reading is the need to have a certain understanding of Java NIO knowledge, so how to say, if you feel familiar with the Java piece, also as an episode of this article can be directly ignored. Because this article will not involve what heavy and difficult, mainly over the basis, so that the source code behind read more smooth.

1. NIO foundation

Java New IO is a New IO API that can replace the standard IO. NIO has the same function and purpose as the original IO, but it is used in a completely different way. NIO is buffer-oriented, channel-based IO operations. This also makes it read and write more efficiently than traditional IO.

1.1 Main differences between IO and NIO

IO NIO
Facing the flow Buffer-oriented
Blocking IO Non-blocking IO
There is no The selector

1.1.1 Traditional I/O Flows

Following a simple diagram to understand, in the traditional IO when App to network, in the disk file to read and write, they must establish a connection, the flow is a what kind of concept, we can think of it like running water, first to use tap water in the home, need to have water pipe, let water from water pipes, came to the house, have the effect of a transport.

So when the data in our files needs to be entered into the App, they create an input pipeline. When our App has data that needs to be written to the file system, it will establish an output pipeline. These two pipelines are our input stream and output stream. The water never flows upstream, so they’re one-way pipes. Such a talk, is it very good to understand 😁?

1.1.2 NIO

The same file system and App, but at this point change flow to a channel, and now we can think it is a railroad first, then we know the railway itself is can’t deliver goods, so we need a vehicle – the train (buffer), the App need to by the name of data buffer vehicle transportation. The train can come and go back, so NIO is two-way.

1.2 Buffer

At the heart of NIO are channels and buffers. A channel is a connection that opens to an IO device. To use it, you need to obtain the channel to connect the IO device and the buffer to hold the data, and then process the data through the operation buffer. (In fact, it is the above picture, or one is responsible for transmission, one is responsible for storage).

Buffers are defined by the java.nio package, and all buffers are subclasses of the abstract Buffer class. Buffer Depending on the data type, the common subclasses are the basic data types except Boolean xxxBuffer (IntBuffer, DoubleBuffer, etc.). The management of different Buffer classes is the same, and the methods of obtaining objects are all the same

Static xxxBuffer allocate(int capacity)Copy the code

And the buffer provides two core methods: get() and PUT (). Put puts data into the buffer, and GET fetches data from the buffer.

So let’s look at it in code

Public class BufferTest {@test public void testBuffer(){ByteBuffer ByteBuffer = bytebuffer.allocate (1024); }}Copy the code

If you click on ByteBuffer, you’ll see that this thing inherits from the Buffer class

public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer>
Copy the code

If you click on the Buffer class, the first thing you see is that it has several built-in properties

1.2.1 Basic properties of buffer

(1) capacity capacity

Represents the maximum size of the Buffer. This value cannot be negative. And it cannot be changed after creation.

(2) limit restrictions

The first index of data that cannot be read or written. Data after this index is not read or written. The value cannot be negative and cannot exceed capacity. For example, in the third buffer in the figure above, all data blocks with subscript 5 cannot be read or written, so limit is 5

(3) the position location

The index of the next data to be read or written. This value cannot be negative and cannot exceed capacity. In the second buffer, the index of the sixth data block is 5 and position is 5

④ Mark /reset reset

Mark is an index. After specifying a specific position in a Buffer using the mark() method of the Buffer, you can reset() to that position. This is best explained in code

1.2.2 Code Section (very simple)

1. First we create a buffer object and print out its properties

ByteBuffer byteBuffer = ByteBuffer.allocate(10); System.out.println(byteBuffer.position()); System.out.println(byteBuffer.capacity()); System.out.println(byteBuffer.limit()); Run results: 0,10,10Copy the code
2. Execute a put() method to drop a character in

String str = "abcde"; byteBuffer.put(str.getBytes()); System.out.println(byteBuffer.position()); System.out.println(byteBuffer.capacity()); System.out.println(byteBuffer.limit()); Run result: 5,10,10 "abcde" length is 5, position has changed, other unchangedCopy the code
3. Switch to read mode using flip()

byteBuffer.flip(); System.out.println(byteBuffer.position()); System.out.println(byteBuffer.capacity()); System.out.println(byteBuffer.limit()); Run results: 0,10,5Copy the code

Position is now 0, because the initial 5 is a block with subscript 5, and the first block to be read in read mode is obviously a block with subscript 0. The value of limit is also changed to 5, because currently there is no data that can be read since the index 5, so limit is 5

4. Take a quick look at the buffer
byte[] array = new byte[byteBuffer.limit()]; byteBuffer.get(array); System.out.println(new String(array,0,array.length)); Run result: abcdeCopy the code
5.mark() & reset()
byte[] array = new byte[byteBuffer.limit()]; ByteBuffer. Get (array, 0, 2); System. The out. Println (new String (array, 0, 2)); System.out.println(byteBuffer.position()); byteBuffer.mark(); ByteBuffer. Get (array, 2, 2); System. The out. Println (new String (array, 2, 2)); System.out.println(byteBuffer.position()); byteBuffer.reset(); System.out.println(byteBuffer.position()); Run result: ab, 2, CD, 4,2Copy the code

When I read the first two characters, the result of position is 2. When I read the next two characters, the result of position is 4. However, because I mark the first two characters, it automatically returns to the position before I mark. It’s that simple

6. Something else

The rewind() method, which can be read repeatedly, clears the buffer, but this method clears the buffer in a forgotten state, that is, the data is still in the buffer, but is automatically ignored. If you read the data again, you can still get(). The hasRemaining() method is how much data is left to operate. For example, in the mark example, after I reset, the remaining operable data is 3, because I only read AB and CDE.

1.2.3 Direct and Indirect Buffers

Indirect buffers: Buffers are allocated by the allocate() method. The buffer is built into the JVM’s memory.

Direct buffer: The buffer is allocated by the allocateDirect() method, which establishes the buffer in physical memory. More efficient.

① Indirect buffer

When an application wants to read data from disk, it first requests that the physical disk read its data into the kernel address space, which then copies the data into the user address space. The data is then returned to the application using the read() method. Similarly, applications need to write data to the user address space, then copy it to the kernel address space, and then write it to disk. At this point, it is not hard to see that the copy operation is very redundant, so the efficiency of the indirect buffer is relatively low.

② Direct buffer

The direct buffer is really as straightforward as the name suggests, when you write, you write to a physical memory-mapped file and it writes to the physical disk, and when you read, the disk reads the data to that file and then it reads it to the application. There is no intermediate process of copy.

1.3 the channel

1.3.1 A bit of conceptual background

Channels, defined by the Java.nio.Channels package, represents a link that opens between the IO source and the target. It does not have the ability to directly access data and can only interact with buffers

Traditionally, THE CPU is in charge of the IO. In this design, the CPU utilization is very low when there is a large number of file reads, because the IO takes up all the CPU resources.

In this context, some optimizations were made to remove the CPU connection and switch to DMA(direct memory access) mode. Of course, the DMA operation itself is scheduled by the CPU. But of course the loss is much smaller than a lot of IO.

Here comes the concept of a channel, which is a completely independent processor. It is used to perform IO operations on files.

1.3.2 Common Channels

The main implementation classes Java provides for the Channel interface:

FileChannel: channel for reading, writing, mapping, and manipulating files DatagramChannel: data channel for reading and writing files through UDP SocketChannel: data channel for reading and writing files through TCP ServerSocketChannel: You can listen for incoming TCP connections and create a SocketChannel for each incoming connectionCopy the code

One way to get a channel is to call the getChannel() method on an object that supports a channel, with the following supporting classes

FileInputStream
FileOutputStream
RandomAccessFile
DatagramSocket
Socket
ServerSocket
Copy the code

The other way to get it is to get the byte channel using the static method newByteChannel() of the Files class. Or the static method open() of the channel opens and returns the specified channel.

1.3.3 Common methods and Simple usage

① Use indirect buffer to copy files
FileInputStream = new FileInputStream(" testpic.jpg "); FileOutputStream fileOutputStream = new FileOutputStream("testPic2.jpg"); / / through the flow object access channel channel FileChannel inChannel = fileInputStream. GetChannel (); FileChannel outChannel = fileOutputStream.getChannel(); ByteBuffer = ByteBuffer. Allocate (1024); While (inchannel.read (byteBuffer)! // switch to read mode bytebuffer.flip (); // Write the buffer to the output channel outchannel. write(byteBuffer); // Clear the buffer bytebuffer.clear (); } outchannel.close ();} outchannel.close (); inChannel.close(); fileInputStream.close(); fileOutputStream.close(); TestPic2 is a copy of testPic2Copy the code

Because the code itself is not difficult, the comments are already written in more detail and will not be expanded


② Use direct buffer to copy files

FileChannel inChannel = FileChannel.open(Paths.get("testPic.jpg",StandardOpenOption.READ)); FileChannel outChannel = FileChannel. open(Paths.get("testPic2.jpg"),StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE); MappedByteBuffer inMappedBuffer = inchannel.map (filechannel.mapmode.read_only, 0, inchannel.size ()); MappedByteBuffer outMapBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size()); Byte [] array = new byte[inmappedbuffer.limit ()]; inMappedBuffer.get(array); outMapBuffer.put(array); Inchannel.close (); outChannel.close();Copy the code

If you need to look at the time difference between the two, just look at it yourself in the most conventional system time, and I won’t add it here.

NIO non-blocking network communication

When a thread makes a read or write call, the thread is blocked until the data is read or written. During that time, the thread cannot perform other tasks. Therefore, when completing the NETWORK communication and performing THE I/O operation, the thread is blocked. So the server side must provide a separate thread for each client to process, and when the server side needs to process a large number of clients, performance deteriorates dramatically.

NIO is non-blocking. When a thread reads or writes data from a channel, if no data is available, the thread can do other tasks. Threads typically spend idle time of non-blocking IO performing IO operations on other channels, so a single thread can manage multiple input and output channels. So NIO allows the server side to use one or a limited number of threads to process all the clients connected to the server side simultaneously.

2.1 the Selector

The Selector is a register that introduces a channel between the client and the server. For example, if my client is going to send data to the server, the client will send a registration request for a channel to the Selector. After the registration, the Selector will monitor the IO status of the channel (read/write, connect). Only when the data in the channel is fully ready does the Selector assign the data to a thread on the server for processing.

This non-obstructive process makes better use of CPU resources. Improve CPU efficiency. This can be illustrated by receiving express delivery. If you had told me to pick up the package in half an hour, and I’d already arrived at my destination, I might have stood still for half an hour. I can’t go anywhere in the meantime, but you didn’t call me to pick it up until you arrived, so I have more free time.

2.2 Code (Blocking IO network communication)

Now let’s demonstrate network communication for obstructive IO

2.2.1 Client (Obstructive I/O)

Delete schannel.shutdownOutput (). When you start the server and run the client program, the program also blocks. This is because the server is not sure if you have sent the data, so the client also blocks. The two sides have been deadlocked.

Another way is to unblock, which I’ll explain later.

SocketChannel sChannel = socketchannel. open(new InetSocketAddress(" your IP address ",9898)); // 2. Create FileChannel inChannel = FileChannel.open(Paths.get("C:/Users/Administrator/Desktop/testPic.jpg"),StandardOpenOption.READ); ByteBuffer = ByteBuffer. Allocate (1024); While (inchannel.read (byteBuffer)! = -1){ byteBuffer.flip(); // Write buffer data to the channel schannel. write(byteBuffer); byteBuffer.clear(); } // Actively tell the server that the data has been sent schannel.shutdownOutput (); while (sChannel.read(byteBuffer) ! = -1){ byteBuffer.flip(); System.out.println(" Received server data successfully ···"); byteBuffer.clear(); } // 5. Close the channel inchannel.close (); sChannel.close();Copy the code
2.2.2 Server (Obstructive IO)
Serversocketchannelsschannel = ServerSocketChannel.open(); // Create an output channel, write the read data to the output channel, Save it as testPic2 FileChannel outChannel = FileChannel.open(Paths.get("testPic2.jpg"),StandardOpenOption.WRITE,StandardOpenOption.CREATE); // 2. Bind port sschannel. bind(new InetSocketAddress(9898)); SocketChannel sChannel = sschannel.accept (); SocketChannel sChannel = sschannel.accept (); ByteBuffer = ByteBuffer. Allocate (1024); While (schannel.read (byteBuffer)! = -1){ byteBuffer.flip(); outChannel.write(byteBuffer); byteBuffer.clear(); } // Send feedback to the client // write response information to the buffer bytebuffer.put (" server received data successfully ".getBytes()); byteBuffer.flip(); sChannel.write(byteBuffer); // Close the channel schannel.close (); outChannel.close(); byteBuffer.clear();Copy the code

Then, when our client is running, it will copy

2.3 Selector Completes a non-blocking I/O

Network communication using NIO requires three core objects:

Channel: Java nio. Channels. The channel interface, a SocketChannel, ServerSocketChannel, DatagramChannel

Pipe related: pipe. SinkChannel, Pine.SourceChannel

Buffer: Stores data

Selector: A Selector is a multiplexer of the SelectableChannel, which is used to monitor the I/O status of the SelectableChannel

2.3.1 Client (Non-blocking)
SocketChannel sChannel = socketchannel. open(new InetSocketAddress("192.168.80.1",9898)); / / 1.1 will block the socket into a non-blocking sChannel. ConfigureBlocking (false); ByteBuffer = ByteBuffer. Allocate (1024); Bytebuffer.put (new Date().toString().getBytes())); // 3. // 4. Write buffer data to sChannel bytebuffer.flip (); sChannel.write(byteBuffer); byteBuffer.clear(); / / close the sChannel. Close ();Copy the code
2.3.2 Server (Non-Blocking)

The entire process is explained in the comments of the code, so I won’t go through it here.

Serversocketchannelsschannel = ServerSocketChannel.open(); / / (2) will block the socket set to non-blocking ssChannel. ConfigureBlocking (false); Sschannel. bind(new InetSocketAddress(9898)); Selector = Selector. Open (); SelectionKey (); // The selectionKey (); // The selectionKey (); // The selectionKey (); Sschannel. register(selector, selectionkey.op_accept); If the value is greater than 0, at least one SelectionKey is ready while (selector. Select () > 0){// 7 Iterator< selectionKey > selectionKeyIterator = selection.selectedKeys ().iterator();  / / iteration is ready for select key while (selectionKeyIterator. HasNext ()) {/ / 8. Get ready event SelectionKey SelectionKey = selectionKeyIterator. The next (); if (selectionKey.isAcceptable()){ // 9. SocketChannel sChannel = sschannel.accept (); / / sets the sChannel to non-blocking / / again, the whole process can't have any blocking sChannel. ConfigureBlocking (false); Schannel. register(selector, selectionkey.op_read); }else if (selectionkey.isreadable ()){// If the read state is ready, start reading data // 10. SocketChannel sChannel = (SocketChannel) selectionkey.channel (); ByteBuffer = ByteBuffer. Allocate (1024); While (schannel.read (byteBuffer) > 0){bytebuffer.flip (); // Schannel.read (byteBuffer) is the length of the byte array system.out.println (new) String(byteBuffer.array(),0,sChannel.read(byteBuffer))); // Clear the buffer bytebuffer.clear (); }} / / when selectionKey after use need to be removed, otherwise you will have priority selectionKeyIterator. Remove (); }}Copy the code

When the register method is called to register a channel with a selector, the selector’s listening event for the channel needs to be determined by the second parameter, OPS

OP_WRITE(4); selectionKey.op_connect (8); selectionKey.op_accept (16);Copy the code

If there is more than one listening event at the time of registration, a bit or operator concatenation is required

int selectionKeySet = SelectionKey.OP_READ|SelectionKey.OP_WRITE
Copy the code

The selectionKey represents the registration relationship between the SelectableChannel and the Selectr. It also has a set of corresponding methods

2.3.3 Client Modification

Introducing Scanner receives input information, but note that entering IDEA in the test code requires some setup by adding a line in help-Edit Custom VM Option

-Deditable.java.test.console=true
Copy the code

So you can type it in.

SocketChannel sChannel = socketchannel. open(new InetSocketAddress("192.168.80.1",9898)); / / 1.1 will block the socket into a non-blocking sChannel. ConfigureBlocking (false); ByteBuffer = ByteBuffer. Allocate (1024); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){ String str = scanner.next(); Bytebuffer.put ((new Date().toString()+ STR).getBytes())); // 3. // 4. Write buffer data to sChannel bytebuffer.flip (); sChannel.write(byteBuffer); byteBuffer.clear(); } // Close schannel.close ();Copy the code

This completes a q&A mode of network communication.

2.4 Pipe Pipe

A Pipe in Java NIO is a one-way data connection between two threads. A Pipe has a source Pipe and a sink Pipe, where data is written to sink and fetched from the source

Pipe Pipe = pipe.open (); ByteBuffer = ByteBuffer. Allocate (1024); SinkChannel = pipe.sink(); Bytebuffer.put (" Transmitting data through a one-way pipe ".getBytes()); // 4. Write data to sinkChannel bytebuffer.flip (); sinkChannel.write(byteBuffer); Pipe.sourcechannel SourceChannel = pipe.source(); // 6. Read the data in sourceChannel and put it into buffer bytebuffer.flip (); sourceChannel.read(byteBuffer); System.out.println(new String(byteBuffer.array(),0,sourceChannel.read(byteBuffer))); sourceChannel.close(); sinkChannel.close(); The result is to print our character "data through one-way pipe", nothingCopy the code

finally

I listed some basic knowledge of NIO roughly, the content seems to be a lot of knowledge points are not too difficult, they are implemented step by step. In fact, if you want to dig deep, there are many other knowledge points, such as NIO2 Path, Paths and Files. I won’t enumerate them here. Interested friends can go to know about it.