Author: freewind

Biyuan Project warehouse:

Making address: https://github.com/Bytom/bytom

Gitee address: https://gitee.com/BytomBlockchain/bytom

In the previous article, we saw how to connect a P2P port to a peer node and authenticate with the peer. At this point, the two nodes have established trust, and the connection will not be broken, the next step, the two can continue to exchange data.

So, my first thought was, how can I get the other party to send me all its existing block data?

This can actually be broken down into three questions:

  1. What kind of data do I need to send it?
  2. How does it respond internally?
  3. Once I get the data, what do I do with it?

Since the logic of this piece is complicated, we will answer the first question first:

What kind of data request do we have to send to get the original node to send me the block data it holds?

Find the code that sent the request

The first thing we need to do is figure out in our code when the request is actually sent to the other node.

In the previous article, we saw how to establish a connection and verify identity, so the data request must be made after the last code. Following this line of thought, we found a class called BlockKeeper in the SyncManager class after Switch was started, where the relevant operations are performed.

Here’s the old rule, starting at startup, but a little simpler:

cmd/bytomd/main.go#L54

func main(a) {
    cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv(config.DefaultDataDir()))
    cmd.Execute()
}
Copy the code

cmd/bytomd/commands/run_node.go#L41

func runNode(cmd *cobra.Command, args []string) error {
    n := node.NewNode(config)
    if_, err := n.Start(); err ! =nil {
        // ...
}
Copy the code

node/node.go#L169

func (n *Node) OnStart(a) error {
    // ...
    n.syncManager.Start()
    // ...
}
Copy the code

netsync/handle.go#L141

func (sm *SyncManager) Start(a) {
    go sm.netStart()
    // ...
    go sm.syncer()
}
Copy the code

Note that sm.netStart() is where we did the connection and authentication in the article. This problem is done in sm.syncer() below.

Also note that since both function calls use Goroutine, they are done simultaneously.

The code for sm.syncer() is as follows:

netsync/sync.go#L46

func (sm *SyncManager) syncer(a) {
    sm.fetcher.Start()
    defer sm.fetcher.Stop()

    // ...
    for {
        select {
        case <-sm.newPeerCh:
            log.Info("New peer connected.")
            // Make sure we have peers to select from, then sync
            if sm.sw.Peers().Size() < minDesiredPeerCount {
                break
            }
            go sm.synchronise()
            // ..}}Copy the code

There’s a weird thing called fetcher mixed in, and it looks like it’s just for fetching data. Is that what we’re looking for?

Unfortunately, the fetcher function is to take the block data from multiple peers, organize the data, and put the useful ones on the local chain. We’ll look at that in the future, so we won’t go into it here.

This is followed by a for loop, which determines whether the channel newPeerCh has enough data (minDesiredPeerCount, 5) and synchronise sm.synchronise(). Data is synchronized.

Why wait for a few more nodes instead of synchronizing all at once? I think it’s the desire to have more options, to find a node with enough data.

Synchronise () is also a method of SyncManager. Before the BlockKeeper method is actually called, it does some other things such as cleaning up disconnected peers and finding the most suitable peer for synchronizing data. Among them, the work of “clearing the peer” involves the synchronization between the peer sets held by different objects, which is slightly troublesome, but not helpful to the current problem, so I plan to answer them in a future question (for example, “when a node is disconnected, what will be the original treatment”), which will be omitted here.

The sm.synchronise() code looks like this:

netsync/sync.go#L77

func (sm *SyncManager) synchronise(a) {
    log.Info("bk peer num:", sm.blockKeeper.peers.Len(), " sw peer num:", sm.sw.Peers().Size(), "", sm.sw.Peers().List())
    // ...
    peer, bestHeight := sm.peers.BestPeer()
    // ...
    if bestHeight > sm.chain.BestBlockHeight() {
        // ...
        sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)
    }
}
Copy the code

As you can see, the first thing is to find the most appropriate one among peers. What is Best? Take a look at the definition of BestPeer() :

netsync/peer.go#L266

func (ps *peerSet) BestPeer(a) (*p2p.Peer, uint64) {
    // ...
    for _, p := range ps.peers {
        if bestPeer == nil || p.height > bestHeight {
            bestPeer, bestHeight = p.swPeer, p.height
        }
    }
    return bestPeer, bestHeight
}
Copy the code

It’s actually the one that holds the most blockchain data.

Found after BestPeer call sm. BlockKeeper. BlockRequestWorker (peer. Key, bestHeight) method, from here, officially entered the blockKeeper – that is, the leading role of this article – the world.

BlockKeeper

BlockKeeper BlockRequestWorker logic is complicated, it includes:

  1. Calculate the data to be synchronized according to the block data they hold
  2. Send a data request to the best node found earlier
  3. Get the block data they sent over
  4. Process the data
  5. New state of broadcasting
  6. Deal with all kinds of things that go wrong, etc

Since this article focuses only on “send requests,” I’ll ignore some of the less relevant logic for another time.

In “send a request”, there are actually two cases, one simple and one complex:

  1. Simple: Assuming no forks, simply check the local block with the highest height and request the next block
  2. Complex: Considering the forking case, the current local block may be forked, so which block should be requested needs to be carefully considered

Since the second case is too complex for this article (because it requires a deep understanding of the processing logic of the fork in the original chain), the problem will be simplified in this article and only the first case will be considered. The treatment of bifurcation will be explained later.

Below is the blockKeeper BlockRequestWorker reduces the code contains only 1 kind of situations:

netsync/block_keeper.go#L72

func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
    num := bk.chain.BestBlockHeight() + 1
    reqNum := uint64(0)
    reqNum = num
    // ...
    bkPeer, ok := bk.peers.Peer(peerID)
    swPeer := bkPeer.getPeer()
    // ...
    block, err := bk.BlockRequest(peerID, reqNum)
    // ...
}
Copy the code

In this case, we can assume that the Best in bk.chain-bestBlockheight () refers to the highest height of the locally held blockchain without forks. (Note that Best is not necessarily the highest in case of forks.)

Then we can directly request the block of the next height to the best peer, which is achieved by bk.blockRequest (peerID, reqNum) :

netsync/block_keeper.go#L152

func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
    var block *types.Block

    iferr := bk.blockRequest(peerID, height); err ! =nil {
        return nil, errReqBlock
    }

    // ...

    for {
        select {
        case pendingResponse := <-bk.pendingProcessCh:
            block = pendingResponse.block
            // ...
            return block, nil
        // ...}}}Copy the code

In the simplified code above, there are two main parts. One is to send the request bk.blockRequest(peerID, height), which is the focus of this article. The for-select part below it is already waiting for and processing the return data from the other node, which we’re going to skip today.

Bk. BlockRequest (peerID, height)

  1. Construct the requested information
  2. Sends the information to the peer node

Construct the requested information

Bk. BlockRequest (peerID, height) After a series of method calls, construct a BlockRequestMessage object using height as follows:

netsync/block_keeper.go#L148

func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
    return bk.peers.requestBlockByHeight(peerID, height)
}
Copy the code

netsync/peer.go#L332

func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
    peer, ok := ps.Peer(peerID)
    // ...
    return peer.requestBlockByHeight(height)
}
Copy the code

netsync/peer.go#L73

func (p *peer) requestBlockByHeight(height uint64) error {
    msg := &BlockRequestMessage{Height: height}
    p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
    return nil
}
Copy the code

At this point, the required BlockRequestMessage is constructed, which basically tells the height to the peer.

The message is then sent through the Peer’s TrySend().

Send the request

In TrySend, it is primarily serialized through the github.com/tendermint/go-wire library and sent to each other. It seems to be a very simple operation, a warning, or quite round.

When we enter TrySend() :

p2p/peer.go#L242

func (p *Peer) TrySend(chID byte, msg interface{}) bool {
    if! p.IsRunning() {return false
    }
    return p.mconn.TrySend(chID, msg)
}
Copy the code

Found that it threw the pot to the p.conn.TrySend method, so what is McOnn? What is chID?

McOnn is an instance of MConnection. Where does it come from? It should have been initialized somewhere before, otherwise we couldn’t call it directly. So let’s first find where it initializes.

After some searching, it turns out to be after the previous article, after the original node has authenticated with another node, the specific location is where the Switch class started.

Let’s start with Swtich’s OnStart:

p2p/switch.go#L186

func (sw *Switch) OnStart(a) error {
    / /...
    // Start listeners
    for _, listener := range sw.listeners {
        go sw.listenerRoutine(listener)
    }
    return nil
}
Copy the code

p2p/switch.go#L498

func (sw *Switch) listenerRoutine(l Listener) {
    for {
        inConn, ok := <-l.Connections()
        // ...
        err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
        // ...}}Copy the code

p2p/switch.go#L645

func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
    // ...
    peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
    // ...
}
Copy the code

p2p/peer.go#L87

func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}).ourNodePrivKey crypto.PrivKeyEd25519.config *PeerConfig) (*Peer, error) {
    return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
}
Copy the code

p2p/peer.go#L91

func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}).ourNodePrivKey crypto.PrivKeyEd25519.config *PeerConfig) (*Peer, error) {
    conn := rawConn
    // ...
    if config.AuthEnc {
        // ...
        conn, err = MakeSecretConnection(conn, ourNodePrivKey)
        // ...
    }

    // Key and NodeInfo are set after Handshake
    p := &Peer{
        outbound: outbound,
        conn:     conn,
        config:   config,
        Data:     cmn.NewCMap(),
    }

    p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig)

    p.BaseService = *cmn.NewBaseService(nil."Peer", p)

    return p, nil
}
Copy the code

Finally found it. MakeSecretConnection = createMConnection(…) That’s where the McOnn is created.

Keep going:

p2p/peer.go#L292

func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}).config *MConnConfig) *MConnection {
    onReceive := func(chID byte, msgBytes []byte) {
        reactor := reactorsByCh[chID]
        if reactor == nil {
            if chID == PexChannel {
                return
            } else {
                cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID))
            }
        }
        reactor.Receive(chID, p, msgBytes)
    }

    onError := func(r interface{}) {
        onPeerError(p, r)
    }

    return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
}
Copy the code

It turns out that McOnn is an instance of MConnection, created with NewMConnectionWithConfig.

The code above shows that MConnectionWithConfig is not very different from net.conn, but when receiving data from the other side, it calls the corresponding Reactor Receive method according to the specified chID. So it serves the purpose of distributing data to the Reactor.

Why is this distribution operation needed? This is because there are many different ways in which nodes exchange data:

  1. One is to specify the detailed data interaction protocol (such as what information types, what meaning they represent, which situations to send, how to respond, etc.), inProtocolReactorAnd its correspondingchIDisBlockchainChannelAnd has a value ofbyte(0x40)
  2. Another, which uses a file-sharing protocol similar to BitTorrent, is calledPEXIn thePEXReactorAnd its correspondingchIDisPexChannelAnd has a value ofbyte(0x00)

Therefore, when sending information between nodes, it is necessary to know which method corresponds to the data sent by the other party, and then forward it to the corresponding Reactor for processing.

In comparison, the former is the main mode, while the latter plays a supporting role. The former has been covered in our current article, and the latter will be dealt with in the future.

p.mconn.TrySend

Now that we know what the McOnn in p.conn. TrySend is and when it is initialized, we can move into its TrySend method.

p2p/connection.go#L243

func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
    // ...
    channel, ok := c.channelsIdx[chID]
    // ...
    ok = channel.trySendBytes(wire.BinaryBytes(msg))
    if ok {
        // Wake up sendRoutine if necessary
        select {
        case c.send <- struct{} {} :default:}}return ok
}
Copy the code

As you can see, when it finds the appropriate channel (in this case, the ProtocolReactor’s channel), it calls the trySendBytes method of the channel. When sending data, the github.com/tendermint/go-wire library is used to serialize MSG into a binary array.

p2p/connection.go#L602

func (ch *Channel) trySendBytes(bytes []byte) bool {
    select {
    case ch.sendQueue <- bytes:
        atomic.AddInt32(&ch.sendQueueSize, 1)
        return true
    default:
        return false}}Copy the code

It puts the data to be sent in the channel’s sendQueue and sends it to someone else. Who sent it, exactly? We’re gonna find it.

Careful students will notice that in addition to the trySendBytes method, a Channel has a sendBytes (not used in this article) :

p2p/connection.go#L589

func (ch *Channel) sendBytes(bytes []byte) bool {
    select {
    case ch.sendQueue <- bytes:
        atomic.AddInt32(&ch.sendQueueSize, 1)
        return true
    case <-time.After(defaultSendTimeout):
        return false}}Copy the code

The difference between the two is that an attempt to put bytes to be sent into ch. SendQueue returns true if it can be put in, false if it fails immediately otherwise, so it is non-blocking. The latter, if it can’t be put in (sendQueue is full, it’s not done yet), waits for defaultSendTimeout (10 seconds) before failing. Also, sendQueue has a default capacity of 1.

At this point, we actually know how biyuan requests block data from other nodes and when to send the information out.

I wanted to cover the code that actually sends the data in this article, but I found that the logic is quite complicated, so I’ll cover it in another article.

To return to the question of this article, we emphasize again that as we said earlier, there are two cases for requesting block data from peer: one is simple without considering forking, and the other is complex with considering forking. In this article, we only consider the simple case, where the bestHeight is the height of the highest block, while in the complex case, it is not necessarily the case. That will be discussed in more detail later, and the questions for this article are answered.