>>>> 😜😜😜 Github: 👉 github.com/black-ant CASE Backup: 👉 gitee.com/antblack/ca…

A. The preface

Just out of curiosity, this article will take a look at the main flow of MySQL Client calls as a starting point for the upcoming MySQL documentation series

2. Create Connect

In the case of getting a connection, Spring’s DataSourceUtils # getConnection is called in a number of ways when getting a connection

You’re still in the Spring business architecture. The flow of Connect created at startup and invoked at run time are two completely different flows, so let’s take a look at the main flow of CreateConnect

// Step 1: create entry of Connect
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
    
    // ...
    Connection con = fetchConnection(dataSource);
    // ...

}


// Step 2: Connection pool processing
// If a connection pool is used, the connection will be handled by the corresponding connection pool
pool = result = new HikariPool(this); > Skip connection pooling to see the core processing flow of com.mysql.cj.jdbc.driver// Step 3: MySQL driver entry
public java.sql.Connection connect(String url, Properties info) throws SQLException {

    try {
        if(! ConnectionUrl.acceptsUrl(url)) {/* * According to JDBC spec: * The driver should return "null" if it realizes it is the wrong kind of driver to connect to the given URL. This will be common, as when the * JDBC driver manager is asked to connect to a given URL it passes the URL to each loaded driver in turn. */
            return null;
        }

        ConnectionUrl conStr = ConnectionUrl.getConnectionUrlInstance(url, info);
        // Create different connections based on the type
        switch (conStr.getType()) {
            case SINGLE_CONNECTION:
                return com.mysql.cj.jdbc.ConnectionImpl.getInstance(conStr.getMainHost());

            case LOADBALANCE_CONNECTION:
                return LoadBalancedConnectionProxy.createProxyInstance((LoadbalanceConnectionUrl) conStr);

            case FAILOVER_CONNECTION:
                return FailoverConnectionProxy.createProxyInstance(conStr);

            case REPLICATION_CONNECTION:
                return ReplicationConnectionProxy.createProxyInstance((ReplicationConnectionUrl) conStr);

            default:
                return null; }}catch (UnsupportedConnectionStringException e) {
        // when Connector/J can't handle this connection string the Driver must return null
        return null;

    } catch (CJException ex) {
        throw ExceptionFactory.createException(UnableToConnectException.class,
                Messages.getString("NonRegisteringDriver.17".newObject[] { ex.toString() }), ex); }}Copy the code

3. Run the SQL process

This section takes a look at the call flow when executing SQL

In the life cycle of an SQL, there are two main flows:

  • Process 1: SET autocommit based on transaction start
  • Flow two: The real core SQL execution statement

3.1 Transaction entry

Spring starts a transaction with TransactionAspectSupport, and after entering a series of processes, it enters the connection pool processing, which involves the Process of SpringTransaction, Take a look at this article on Spring-based transaction management, which is briefly covered here

  • Step 1: TransactionImpl # begin: Begin starts the transaction process
  • Step 2: Connect Impl # setAutoCommit: Start the automatic commit process
  • Step 3: NativeSession # execSQL: Enter SQL to execute the core process
public void begin(a) {
    if ( !doConnectionsFromProviderHaveAutoCommitDisabled() ) {
        getConnectionForTransactionManagement().setAutoCommit( false );
    }
    status = TransactionStatus.ACTIVE;
}
Copy the code

The transaction calls setAutoCommit, which essentially calls a execSQL statement to control the transaction

this.session.execSQL(null, autoCommitFlag ? "SET autocommit=1" : "SET autocommit=0", -1.null.false.this.nullStatementResultSetFactory, this.database, null.false);
Copy the code

ExecSQL is called directly from here, whereas the normal SQL statement in flow two is initiated by the corresponding executeUpdate/executeQuery /executeInternal process

3.2 Common Service execution process

  • Step 1: AbstractEntityPersister # insert: Hibernate/JPA
  • ExecuteUpdate: Execute the Update statement (or Query -> executeQuery).
  • Step 3: HikariProxyPreparedStatement # executeUpdate: connection pool in the middle of the handle, the can specially to see see
  • Step 4: ClientPreparedStatement # executeUpdate: The mysql driver takes over
  • Step 5 : ClientPreparedStatement # executeUpdateInternal :
  • ExecuteInternal: The abstract class is called by the underlying method, and execSQL is called
// Process the Update statement as follows:
protected long executeUpdateInternal(QueryBindings<? > bindings,boolean isReallyBatch) throws SQLException {

    // 1. Obtain the JDBC connection
    JdbcConnection locallyScopedConn = this.connection;

    // 2. Parse the statement package to be sent to MySQLMessage sendPacket = ((PreparedQuery<? >)this.query).fillSendPacket(bindings);

    // 3. Call to execute SQL through processing methods
    rs = executeInternal(-1, sendPacket, false.false.null, isReallyBatch);

    // 4. Set the result
    this.results = rs;
    
    // Set the number of updates, where repeated statements are counted
    this.updateCount = rs.getUpdateCount();

    // 5. Get the ID of the last insertion
    this.lastInsertId = rs.getUpdateID();
        
    // 6. Return the number of updates
    return this.updateCount;

}
Copy the code

ExecuteInternal main process

protected <M extends Message> ResultSetInternalMethods executeInternal(int maxRowsToRetrieve, M sendPacket, boolean createStreamingResultSet,
        boolean queryIsSelectOnly, ColumnDefinition metadata, boolean isBatch) throws SQLException {

    // Step 1: Get the connection
    JdbcConnection locallyScopedConnection = this.connection;

    // Step 2: Get the insertion worth binding((PreparedQuery<? >)this.query).getQueryBindings() .setNumberOfExecutions(((PreparedQuery<? >)this.query).getQueryBindings().getNumberOfExecutions() + 1);

    // Step 3: Set the return result setting method
    ResultSetInternalMethods rs;

    // Step 4: Set timeout method
    CancelQueryTask timeoutTask = startQueryTimer(this, getTimeoutInMillis());

    // Step 5: Invoke specific SQL execution statements
    rs = ((NativeSession) locallyScopedConnection.getSession()).execSQL(this.null, maxRowsToRetrieve, (NativePacketPayload) sendPacket,
                        createStreamingResultSet, getResultSetFactory(), this.getCurrentCatalog(), metadata, isBatch);

    // Step 6: Handle timeout, omit
}
Copy the code

3.3 General Service processing process

Having looked at the entry method, let’s look at the execQuery process:

// C- NativeSession
public <T extends Resultset> T execSQL(Query callingQuery, String query, int maxRows, NativePacketPayload packet, boolean streamResults,
        ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory, String catalog, ColumnDefinition cachedMetadata, boolean isBatch) {
    
    //  
    int endOfQueryPacketPosition = endOfQueryPacketPosition = packet.getPosition();

    // 
    long queryStartTime = System.currentTimeMillis();

    // If packet == null, the following processing is called
    return ((NativeProtocol) this.protocol).sendQueryString(callingQuery, query, encoding, maxRows, streamResults, catalog, cachedMetadata,
                    this::getProfilerEventHandlerInstanceFunction, resultSetFactory);

    // 
    return ((NativeProtocol) this.protocol).sendQueryPacket(callingQuery, packet, maxRows, streamResults, catalog, cachedMetadata,
                this::getProfilerEventHandlerInstanceFunction, resultSetFactory);

}
Copy the code

The call to sendQueryPacket above initiated the SQL execution operation

// C- NativeProtocol
public final <T extends Resultset> T sendQueryPacket(Query callingQuery, NativePacketPayload queryPacket, int maxRows, boolean streamResults,
        String catalog, ColumnDefinition cachedMetadata, GetProfilerEventHandlerInstanceFunction getProfilerEventHandlerInstanceFunction,
        ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) throws IOException {
    this.statementExecutionDepth++;

    byte[] queryBuf = null;
    int oldPacketPosition = 0;
    long queryStartTime = 0;
    long queryEndTime = 0;

    queryBuf = queryPacket.getByteBuffer();
    oldPacketPosition = queryPacket.getPosition(); // save the packet position
    
    // Query the startup time
    queryStartTime = getCurrentTimeNanosOrMillis();
    
    // Query statement
    LazyString query = new LazyString(queryBuf, 1, (oldPacketPosition - 1));

    // Send the command
    NativePacketPayload resultPacket = sendCommand(queryPacket, false.0);

    // Get all results
    T rs = readAllResults(maxRows, streamResults, resultPacket, false, cachedMetadata, resultSetFactory);

    // Reflection interceptor
    T interceptedResults = invokeQueryInterceptorsPost(query, callingQuery, rs, false);

    // Return the result
    return rs;

}
Copy the code

3.5 Sending Commands

// C- NativeProtocol
public final NativePacketPayload sendCommand(Message queryPacket, boolean skipCheck, int timeoutMillis) {

    int command = queryPacket.getByteBuffer()[0];
    this.commandCount++;

    if (this.queryInterceptors ! =null) {
        NativePacketPayload interceptedPacketPayload = (NativePacketPayload) invokeQueryInterceptorsPre(queryPacket, false);

        if(interceptedPacketPayload ! =null) {
            returninterceptedPacketPayload; }}this.packetReader.resetMessageSequence();
    
    // Get the old timeout and set the new timeout
    // PS: oldTimeout in finally sets soTimeout again
    int oldTimeout = 0;
    oldTimeout = this.socketConnection.getMysqlSocket().getSoTimeout();
    this.socketConnection.getMysqlSocket().setSoTimeout(timeoutMillis);

    // 
    checkForOutstandingStreamingData();
    
    // Set the mutex
    this.serverSession.setStatusFlags(0.true);

    // Empty the input stream
    clearInputStream();
    this.packetSequence = -1;
    
    / / a package
    send(queryPacket, queryPacket.getPosition());

    // Get the Return result
    // 1. resultPacket = readMessage(this.reusablePacket)
    // 2. checkErrorMessage(resultPacket)
    NativePacketPayload returnPacket = checkErrorMessage(command);

    return returnPacket;
}
Copy the code
// C- NativeProtocol
public final void send(Message packet, int packetLen) {
        //....
        
        // Send the remote packet through the Sender
        this.packetSender.send(packet.getByteBuffer(), packetLen, this.packetSequence);
}
Copy the code

C- SimplePacketSender

public void send(byte[] packet, int packetLen, byte packetSequence) throws IOException {
    PacketSplitter packetSplitter = new PacketSplitter(packetLen);
    
    // Keep reading packages from the remote
    while (packetSplitter.nextPacket()) {
        this.outputStream.write(NativeUtils.encodeMysqlThreeByteInteger(packetSplitter.getPacketLen()));
        this.outputStream.write(packetSequence++);
        this.outputStream.write(packet, packetSplitter.getOffset(), packetSplitter.getPacketLen());
    }
    this.outputStream.flush();
}
Copy the code

3.6 analytical Result

Step 1 : packetReader.readMessage

PacketReader includes the following MultiPacketReader implementations:

public NativePacketPayload readMessage(Optional<NativePacketPayload> reuse, NativePacketHeader header) throws IOException {

    // Get the length and Message implementation
    int packetLength = header.getMessageSize();
    NativePacketPayload buf = this.packetReader.readMessage(reuse, header);

    // this is done through a do-while loop
    do {
            
            / /...
            this.packetReader.readMessage(Optional.of(multiPacket), hdr);
            // Write byte data
            buf.writeBytes(StringLengthDataType.STRING_FIXED, multiPacket.getByteBuffer(), 0, multiPacketLength);
    // loop until maximum length -> MAX_PACKET_SIZE = 256 * 256 * 256-1;
    } while (multiPacketLength == NativeConstants.MAX_PACKET_SIZE);

    return buf;
}
Copy the code

CheckErrorMessage determines whether an error is returned

public void checkErrorMessage(NativePacketPayload resultPacket) {

    resultPacket.setPosition(0);
    
    // Get the state
    byte statusCode = (byte) resultPacket.readInteger(IntegerDataType.INT1);

    // Error handling
    // Check whether the result is abnormal based on the status code
    if (statusCode == (byte) 0xff) {
        // omit the error handler


        // The corresponding exception is thrown by the status and the result of the exception handling. This method has no concrete return value
        if(xOpen ! =null) {
            if (xOpen.startsWith("22")) {
                throw new DataTruncationException(errorBuf.toString(), 0.true.false.0.0, errno);
            }

            if (errno == MysqlErrorNumbers.ER_MUST_CHANGE_PASSWORD) {
                throw ExceptionFactory.createException(PasswordExpiredException.class, errorBuf.toString(), getExceptionInterceptor());

            } else if (errno == MysqlErrorNumbers.ER_MUST_CHANGE_PASSWORD_LOGIN) {
                throwExceptionFactory.createException(ClosedOnExpiredPasswordException.class, errorBuf.toString(), getExceptionInterceptor()); }}throw ExceptionFactory.createException(errorBuf.toString(), xOpen, errno, false.null, getExceptionInterceptor()); }}Copy the code

Gets the final return result

public <T extends Resultset> T readAllResults(int maxRows, boolean streamResults, NativePacketPayload resultPacket, boolean isBinaryEncoded,
        ColumnDefinition metadata, ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) throws IOException {

    // Call the concrete implementation class to get the final result, which will be placed in rowData
    T topLevelResultSet = read(Resultset.class, maxRows, streamResults, resultPacket, isBinaryEncoded, metadata, resultSetFactory);


}

// There are several types of Read

Copy the code

conclusion

There are not many things, mainly some main process codes, many of which are vague, mainly to series the whole process, I will take time to have an in-depth understanding of small details.

Compared to other framework code, Mysql code seems to be very complicated, with many auxiliary properties mixed in between each main flow. If there is a business problem and you are not sure of the final execution statement, you can consider adding breakpoints in methods such as sendCommand

The use of connection pools is usually handled between a ResultSetReturnImpl and a ClientPreparedStatement, which you can focus on later