Author: freewind

Biyuan Project warehouse:

Making address: https://github.com/Bytom/bytom

Gitee address: https://gitee.com/BytomBlockchain/bytom

In the last article, we know how to send the BlockRequestMessage to the peer node, so the focus of this study is, when the peer node receives this message, how will it respond?

If this problem is subdivided, it can also be divided into three small problems:

  1. How does the original node receive the message from the other node?
  2. receivedBlockRequestMessageWhat kind of message will be sent to the other party?
  3. How is this message sent?

Let’s start with the first little question.

How does the original node receive the message from the other node?

If we search for BlockRequestMessage in our code, we’ll see that only the ProtocolReactor.Receive method responds for this message. The key is how biyuan receives the message and forwards it to the ProtocolReactor.Receive.

If we remember from our previous article on how Bihara sends a request for block data, we will remember that when bihara sends a message, it ends up writing the message to McOnnection.bufwriter. Accordingly, MConnection also has a bufReader for reading data, which is also tied to net.conn:

p2p/connection.go#L114-L118

func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {
    mconn := &MConnection{
        conn:        conn,
        bufReader:   bufio.NewReaderSize(conn, minReadBufferSize),
        bufWriter:   bufio.NewWriterSize(conn, minWriteBufferSize),
Copy the code

(minReadBufferSize is constant 1024)

So, in order to read the message sent by the other party, you must read the bufReader. After a brief search, we found that it was also started in McOnnection.start:

p2p/connection.go#L152-L159

func (c *MConnection) OnStart(a) error {
    // ...
    go c.sendRoutine()
    go c.recvRoutine()
    // ...
}
Copy the code

C. routine () is the one we will focus on this time. The C. sendroutine on it is for sending and was the focus of our previous article.

Continue to c.r ecvRoutine () :

p2p/connection.go#L403-L502

func (c *MConnection) recvRoutine(a) {
    // ...
    for {
        c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)

        // ...

        pktType := wire.ReadByte(c.bufReader, &n, &err)
        c.recvMonitor.Update(int(n))
        // ...

        switch pktType {
        // ...
        case packetTypeMsg:
            pkt, n, err := msgPacket{}, int(0), error(nil)
            wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
            c.recvMonitor.Update(int(n))
            // ...
            channel, ok := c.channelsIdx[pkt.ChannelID]
            // ...
            msgBytes, err := channel.recvMsgPacket(pkt)
            // ...
            ifmsgBytes ! =nil {
                // ...
                c.onReceive(pkt.ChannelID, msgBytes)
            }
            // ...}}// ...
}
Copy the code

After simplification, the method is broken down into three parts:

  1. The first is to limit the reception rate to prevent a malicious node from suddenly sending too much data to overwhelm it. Just like sending, its limitations are500K/s
  2. The second piece is fromc.bufReaderTo retrieve the type of the next packet. It currently has three values, two of which are related to heartbeat:packetTypePingandpacketTypePongAnother representation is the normal information data typepacketTypeMsgAnd that’s what we need to focus on
  3. The third piece is to continue fromc.bufReaderRead to retrieve the complete packet, and then according to itsChannelIDFind the appropriate channel to handle it.ChannelIDThere are two valuesBlockchainChannelandPexChannel, we only need to focus on the former, which corresponds to REACTORProtocolReactor. When the last callc.onReceive(pkt.ChannelID, msgBytes)When reading binary datamsgBytesIt will beProtocolReactor.ReceiveTo deal with

Our focus is on the third block. Channel. recvMsgPacket(PKT) : how does channel read binary data from packet?

p2p/connection.go#L667-L682

func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
    // ...
    ch.recving = append(ch.recving, packet.Bytes...)
    if packet.EOF == byte(0x01) {
        msgBytes := ch.recving
        // ...
        ch.recving = ch.recving[:0]
        return msgBytes, nil
    }
    return nil.nil
}
Copy the code

I have removed some error checking and performance comments from this method, but those who are interested can click on the source code above to view it, which is ignored here.

This code mainly uses a channel called Recving, and adds the byte array held in packet to it, and then determines whether the packet represents the end of the whole information. If so, the contents of Ch. Recving are returned completely for the caller to process. Otherwise, it returns nil, which means it’s not done yet, it can’t handle it yet. In the previous article, the location of sending data can be the same as here, but the sender is much more troublesome, requiring three channels: sendQueue, sending and Send, while the receiver is simpler.

Then returning to the previous method McOnnection.recvroutine, we continue with the final c. onreceive call. This onReceive is actually a function assigned to the channel by someone else, located where MConnection was created:

p2p/peer.go#L292-L310

func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}).config *MConnConfig) *MConnection {
    onReceive := func(chID byte, msgBytes []byte) {
        reactor := reactorsByCh[chID]
        if reactor == nil {
            if chID == PexChannel {
                return
            } else {
                cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
            }
        }
        reactor.Receive(chID, p, msgBytes)
    }

    onError := func(r interface{}) {
        onPeerError(p, r)
    }

    return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
}
Copy the code

The logic is simple: when c.onreceive (pkt.channelID, msgBytes) is called, it finds the appropriate Reactor based on the passed chID and executes its Receive method. For this article, it enters the ProtocolReactor.Receive.

The ProtocolReactor.Receive:

netsync/protocol_reactor.go#L179-L247

func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
    _, msg, err := DecodeMessage(msgBytes)
    // ...
    switch msg := msg.(type) {
    case *BlockRequestMessage:
        // ...
}
Copy the code

The DecodeMessage (…). Deserialize the incoming binary data into a BlockchainMessage object, which is an interface with no content and has multiple implementation types. We continue to evaluate this object later, and if it is a BlockRequestMessage message, we continue to process it accordingly. I’ve left out the code for that, because it’s for the next little problem.

We seemed to have the second half of the first trivia pretty much figured out before we knew it. So what’s the first half? BufReader starts in McOnnection.start, and starts in McOnnection.start.

Fortunately, the first half of the problem was specifically discussed in the previous article “how to send the information to request the block data”, so we won’t talk about it here. If necessary, we can go back to see it (you can first see the last “summary” section).

Which brings us to the second trivia:

receivedBlockRequestMessageWhat kind of message will be sent to the other party?

This is a continuation of the previous ProtocolReactor.Receive. First, let’s post the more complete code:

netsync/protocol_reactor.go#L179-L247

func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
    _, msg, err := DecodeMessage(msgBytes)
    // ...

    switch msg := msg.(type) {
    case *BlockRequestMessage:
        var block *types.Block
        var err error
        ifmsg.Height ! =0 {
            block, err = pr.chain.GetBlockByHeight(msg.Height)
        } else {
            block, err = pr.chain.GetBlockByHash(msg.GetHash())
        }
        // ...
        response, err := NewBlockResponseMessage(block)
        // ...
        src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
    // ...
}
Copy the code

BlockResponseMessage is a block containing the height or hash information specified in the BlockRequestMessage sent by the other party.

The chain. GetBlockByHeight (…). And chain. GetBlockByHash (…). If detailed, it is necessary to have a deep understanding of how blockchain data is stored in the original node, which we will not cover in this article and will focus on later.

In this case, I think we just need to know that we are going to query the block data and construct a BlockResponseMessage and send it out through the BlockchainChannel.

The last line of code calls the src.TrySend method, which sends information to the peer. (SRC refers to the peer)

So how does it get sent? Which brings us to the final trivia question:

thisBlockResponseMessageHow is the message sent out?

Let’s start with the peer.TrySend code:

p2p/peer.go#L242-L247

func (p *Peer) TrySend(chID byte, msg interface{}) bool {
    if! p.IsRunning() {return false
    }
    return p.mconn.TrySend(chID, msg)
}
Copy the code

Internally, it will call the McOnnection. TrySend method, where chID is a BlockchainChannel, and its counterpart is a ProtocolReactor.

Then there is the familiar McOnnection.trysend, which is not mentioned in this article because it was fully explained in the previous article, but can be checked out if necessary.

So that’s our problem for today.

At this point, we have a complete understanding of what we need to do and what the other node needs to do when we request “block data” from a node.