Dead knock Ethereum source code analysis of TXpool

Read with the following code :github.com/blockchainG…

Writing an article is not easy, but I hope you can point out more problems, make friends and mix circles

Concept and principle of trading pool

Overview of the trading pool:

  1. The data sources of the trading pool mainly come from:
    • Local commit, which is when a third-party application calls a local Ethereum nodeRPCTransactions submitted by the service;
    • Remote synchronization refers to synchronizing transaction data from other Ethereum nodes to the local node through broadcast synchronization.
  2. Transactions in the trading pool: obtained and verified by Miner module for mining; The successful mining is written into the block and broadcast
  3. MinerA pull transaction is replication, and there is no reduction in the number of transactions in the pool. Transactions are not removed from the trading pool until they are written into the normative chain;
  4. If a trade is written into a fork, the pool of trades is not reduced, waiting to be repackaged.

Critical data structure

TxPoolConfig

type TxPoolConfig struct {
	Locals    []common.Address // Store local account address
	NoLocals  bool             // Whether to enable local transaction mechanism
	Journal   string           // The local transaction path
	Rejournal time.Duration    // Persist the interval between local transactions
	PriceLimit uint64         // The price is out of proportion. If you want to cover a transaction, you cannot cover it if the price does not increase by the proportion required
	PriceBump  uint64 // Replace the minimum price increase percentage for an existing transaction (once)
	AccountSlots uint64 // Executable transaction limits per account
	GlobalSlots  uint64 // Maximum executable transaction for all accounts
	AccountQueue uint64 // Unexecutable transaction limits for a single account
	GlobalQueue  uint64 // Maximum non-executed transaction limit for all accounts
	Lifetime time.Duration // The length of time an account can survive a transaction in the queue
}
Copy the code

Default configuration:

Journal:   "transactions.rlp",
Rejournal: time.Hour,

PriceLimit: 1,
PriceBump:  10,

AccountSlots: 16,
GlobalSlots:  4096,
AccountQueue: 64,
GlobalQueue:  1024,

Lifetime: 3 * time.Hour
Copy the code

TxPool

type TxPool struct {
	config      TxPoolConfig // Trade pool configuration
	chainconfig *params.ChainConfig // Blockchain configuration
	chain       blockChain // Define the blockchain interface
	gasPrice    *big.Int
	txFeed      event.Feed / / the time flow
	scope       event.SubscriptionScope // Subscription range
	signer      types.Signer / / signature
	mu          sync.RWMutex

	istanbul bool // Fork indicator whether we are in the istanbul stage.

	currentState  *state.StateDB // The state of the current header block
	pendingNonces *txNoncer      // Pending state tracking virtual nonces
	currentMaxGas uint64         // Current gas limit for transaction caps

	locals  *accountSet // Set of local transaction to exempt from eviction rules
	journal *txJournal  // Journal of local transaction to back up to disk

	pending map[common.Address]*txList   // All currently processable transactions
	queue   map[common.Address]*txList   // Queued but non-processable transactions
	beats   map[common.Address]time.Time // Last heartbeat from each known account
	all     *txLookup                    // All transactions to allow lookups
	priced  *txPricedList                // All transactions sorted by price

	chainHeadCh     chan ChainHeadEvent
	chainHeadSub    event.Subscription
	reqResetCh      chan *txpoolResetRequest
	reqPromoteCh    chan *accountSet
	queueTxEventCh  chan *types.Transaction
	reorgDoneCh     chan chan struct{}
	reorgShutdownCh chan struct{}  // requests shutdown of scheduleReorgLoop
	wg              sync.WaitGroup // tracks loop, scheduleReorgLoop
}

Copy the code

Txpool initialization

Txpool initialization does the following:

Check the configuration. If there is any problem with the configuration, fill it with the default value

   config = (&config).sanitize()
Copy the code

For this check, look at the TxPoolConfig field.

② : Initializes the local account

   pool.locals = newAccountSet(pool.signer)
Copy the code

③ add the configured local account address to the trading pool

   pool.locals.add(addr)
Copy the code

When we install the Ethereum client, we can specify a data store directory, which will store all the account keystore files that we import or create through the local client. The loading process is to load the account data from that directory

④ update the trading pool

   pool.reset(nil, chain.CurrentBlock().Header())
Copy the code

⑤ : Create a list of all transactions stored, and store the prices of all transactions with the minimum heap

   pool.priced = newTxPricedList(pool.all)
Copy the code

By sorting, transactions with higher gasprice are processed first.

⑥ : Load local transactions from local disk if local transactions are enabled

   if! config.NoLocals && config.Journal ! ="" {
   		pool.journal = newTxJournal(config.Journal)
   
   		iferr := pool.journal.load(pool.AddLocals); err ! =nil {
   			log.Warn("Failed to load transaction journal"."err", err)
   		}
   		iferr := pool.journal.rotate(pool.local()); err ! =nil {
   			log.Warn("Failed to rotate transaction journal"."err", err)
   		}
   	}
Copy the code

⑦ : Subscribe to event messages on the chain

   pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
Copy the code

⑧ : Start the main loop

   go pool.loop()
Copy the code

Note: Local transactions have higher permissions than remote transactions. First, they are not easily replaced. The other is persistence, where unpackaged local transactions are kept in a local Journal file. So when the node starts, local transactions are loaded locally first.

Local addresses are whitelisted, and transactions sent from this address are considered local, whether they are sent locally or from a remote end.

This is the end of the trading pool loading process.

Add a transaction to txPool

As we said before, the source of the transaction in the transaction pool is either broadcast by other nodes, or locally submitted, and if you go back to the source code one is AddLocal, one is AddRemote, so addTxs will be called either way. Our discussion of adding transactions will start with this function, which does the following things, illustrated with a sketch:

  1. Transactions that already exist in the filter

    ifpool.all.Get(tx.Hash()) ! =nil {
      errs[i] = fmt.Errorf("known transaction: %x", tx.Hash())
    			knownTxMeter.Mark(1)
    			continue
    		}
    Copy the code
  2. Adds the transaction to the queue

    newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
    Copy the code
    Add (tx, local) - addTxsLockedCopy the code

    Enter the pool.add function, which adds transactions to a queue and waits for a later promote to pending. If it already exists in queue or pending and its gas price is higher, the previous transaction will be overwritten. Let’s break down the add function.

    ① : See if the transaction has been received, if it has been discarded

    ifpool.all.Get(hash) ! =nil {
    		log.Trace("Discarding already known transaction"."hash", hash)
    		knownTxMeter.Mark(1)
    		return false, fmt.Errorf("known transaction: %x", hash)
    	}
    Copy the code

    ② : If the transaction does not pass the verification, it will be discarded.

    ValidateTx: Basically does the following things - the transaction size cannot exceed32KB - The transaction amount cannot be negative - The transaction gas value cannot exceed the gaslimit set by the current trading pool - The transaction signature must be correct - If the transaction is a remote transaction, it is necessary to verify whether the gasprice of the transaction is less than the minimum gasprice of the trading pool. If it is a local transaction, packaging is preferred. Cost == V + GP * GL - Determine whether the transaction cost gas is less than its estimated cost gasCopy the code

    ③ If the trading pool is full, discard trades that are too low in price

    if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
    		if! local && pool.priced.Underpriced(tx, pool.locals) { ... } drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue- 1), pool.locals)
    		for _, tx := range drop {
    			...
    			pool.removeTx(tx.Hash(), false)}}Copy the code

    Notice GlobalSlots and GlobalQueue, which is the maximum capacity of pending and queue. If the number of transactions in the pool is greater than the sum of the two, we discard the transactions that are too low priced.

④ Determine whether there is a transaction with the same Nonce value in the pending queue. If so, determine whether the gasprice set by the current exchange exceeds the PriceBump percentage set. If so, replace overwrites existing transactions. Otherwise, return an error and replace transaction gasprice is too low, and throw it to queue (enqueueTx).

   iflist := pool.pending[from]; list ! =nil && list.Overlaps(tx) {
		// Nonce already pending, check if required price bump is met
   		inserted, old := list.Add(tx, pool.config.PriceBump)
		if! inserted { pendingDiscardMeter.Mark(1)
   			return false, ErrReplaceUnderpriced
   		}
   		// New transaction is better, replace old one
   		ifold ! =nil {
   			pool.all.Remove(old.Hash())
   			pool.priced.Removed(1)
   			pendingReplaceMeter.Mark(1)
   		}
   		pool.all.Add(tx)
   		pool.priced.Put(tx)
   		pool.journalTx(from, tx)
   		pool.queueTxEvent(tx)
   		log.Trace("Pooled new executable transaction"."hash", hash, "from", from, "to", tx.To())
   		returnold ! =nil.nil
   	}
   	// New transaction isn't replacing a pending one, push into queue
   	replaced, err = pool.enqueueTx(hash, tx)
Copy the code

So much for the process of adding transactions. Next is how to quickly promote transactions added to a queue into a pending transaction.

  1. Increase trading

    Promoting transactions basically throws transactions from queue to pending, which we’ll focus on in the next section

    done := pool.requestPromoteExecutables(dirtyAddrs)
    Copy the code

Deal to upgrade

PromoteExecutables moves transactions from the Future queue to pending, and removes many invalid transactions, such as low NOnce or low balance, in the following steps:

① Delete all transactions whose Nonce in queue is lower than the current nonce of the account from all

forwards := list.Forward(pool.currentState.GetNonce(addr))
		for _, tx := range forwards {
			hash := tx.Hash()
			pool.all.Remove(hash)
			log.Trace("Removed old queued transaction"."hash", hash)
		}
Copy the code

② Remove all transactions in queue where the cost is greater than the account balance or gas is greater than the limit

drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
		for _, tx := range drops {
			hash := tx.Hash()
			pool.all.Remove(hash)
			log.Trace("Removed unpayable queued transaction"."hash", hash)
		}
Copy the code

③ Move all executable transactions from queue to Pending (proteTx)

Note: Executable transactions: several consecutive transactions whose Nonce value in pending is greater than or equal to the current account status Nonce are regarded as prepared transactions

readies := list.Ready(pool.pendingNonces.get(addr))
		for _, tx := range readies {
			hash := tx.Hash()
			if pool.promoteTx(addr, hash, tx) {
				log.Trace("Promoting queued transaction"."hash", hash)
				promoted = append(promoted, tx)
			}
		}
Copy the code

This method differs from add in that addTx inserts the new transaction into pending, while promoteTx inserts the Txs from the queue into pending.

inserted, old := list.Add(tx, pool.config.PriceBump)
	if! inserted {// An older transaction was better, discard this
		// Old transaction is better, delete this transaction
		pool.all.Remove(hash)
		pool.priced.Removed(1)

		pendingDiscardMeter.Mark(1)
		return false
	}
	// Otherwise discard any previous transaction and mark this
	// Now this transaction is better, delete the old transaction
	ifold ! =nil {
		pool.all.Remove(old.Hash())
		pool.priced.Removed(1)

		pendingReplaceMeter.Mark(1)}else{... }Copy the code

I mainly did the following things:

  1. Insert transactionpendingIf the transaction is to be insertednonceinpendingList exists, then the transaction to be insertedgas priceGreater than or equal to the original transaction value110% (withpricebumpSet about), replace the original transaction
  2. If the new trade replaces a trade fromallDelete old transactions from the list
  3. Last updateallThe list of

After proteTx, any transactions to be thrown pending are placed in promoted []*types.Transaction, and then back to promoteExecutables, continue the following steps:

④ if the non-local AccountQueue is larger than the limit AccountQueue, remove the transaction with a large nonce from the end

if! pool.locals.contains(addr) { caps = list.Cap(int(pool.config.AccountQueue))
			for _, tx := range caps {
				hash := tx.Hash()
				pool.all.Remove(hash)
			...
		}
Copy the code

⑤ : If the transaction of this account in the queue is empty, the account will be deleted

if list.Empty() {
			delete(pool.queue, addr)
		}
Copy the code

That’s all we need to do with the upgrade deal.


Trading relegation

A few scenarios for trading downgrades:

  1. A new block appears that will be taken frompendingTo remove transactions that appear in a blockqueueIn the
  2. Or another deal (gas priceHigher), will be frompendingRemove thequeueIn the

Key function: demoteUnexecutables, which does the following:

① : Iterate over the transaction list corresponding to all addresses in pending

for addr, list := rangepool.pending { ... }Copy the code

② : Delete all transactions considered too old (low nonce)

olds := list.Forward(nonce)
		for _, tx := range olds {
			hash := tx.Hash()
			pool.all.Remove(hash)
			log.Trace("Removed old pending transaction"."hash", hash)
		}
Copy the code

③ Delete all overpriced transactions (low balance or exhausted) and queue all invalid transactions for later use

drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
		for _, tx := range drops {
			hash := tx.Hash()
			log.Trace("Removed unpayable pending transaction"."hash", hash)
			pool.all.Remove(hash)
		}
		pool.priced.Removed(len(olds) + len(drops))
		pendingNofundsMeter.Mark(int64(len(drops)))

		for _, tx := range invalids {
			hash := tx.Hash()
			log.Trace("Demoting pending transaction"."hash", hash)
			pool.enqueueTx(hash, tx)
		}
Copy the code

④ If there is a gap before the transaction, the subsequent transaction is moved to the queue

if list.Len() > 0 && list.txs.Get(nonce) == nil {
			gapped := list.Cap(0)
			for _, tx := range gapped {
				hash := tx.Hash()
				log.Error("Demoting invalidated transaction"."hash", hash)
				pool.enqueueTx(hash, tx)
			}
			pendingGauge.Dec(int64(len(gapped)))
		}
Copy the code

Note: gaps are usually caused by transaction balance problems. If the original standard chain A transaction M costs 10, after the fork, the account in the fork chain B issued A transaction M costs 20, which leads to the account balance could have paid A transaction on the CHAIN A, but may not be enough on the chain B. If the transaction with insufficient balance is N +3 in B, there will be A gap between n+2 and N +4 transactions in A chain, which will lead to the downgrade of all transactions from n+3 onwards.

This is the end of the downgrade.


Reset trading pool


Resetting the trading pool retrieves the current state of the blockchain (which changes the chain state mainly due to updates) and ensures that the contents of the trading pool are valid for the chain state.

Reset is called when:

  1. TxPoolInitialization process:NewTxPool;
  2. TxPoolEvent listenersgoProcedure Received a specification chain update event

The flow chart is as follows:

According to the above flow chart, the main function is to rearrange the trading pool due to the update of the normative chain:

① if the old block block is not empty and the old block block is not the parent block of the new block, it means that the new and old blocks are not on the same chain

ifoldHead ! =nil&& oldHead.Hash() ! = newHead.ParentHash {}Copy the code

② if the difference between the new header block and the old header block is greater than 64, then all transactions do not need to fall back to the trading pool

if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 {
  log.Debug("Skipping deep transaction reorg"."depth", depth)
}
Copy the code

③ if the head block of the old chain is larger than the head block height of the new chain, the old chain backs up and reclaims all backtracked transactions

for rem.NumberU64() > add.NumberU64() {
				discarded = append(discarded, rem.Transactions()...)
				if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()- 1); rem == nil {
					log.Error("Unrooted old chain seen by tx pool"."block", oldHead.Number, "hash", oldHead.Hash())
					return}}Copy the code

④ if the head block of the new chain is larger than the head block of the old chain, the new chain backs up and reclaims the transaction

for add.NumberU64() > rem.NumberU64() {
				included = append(included, add.Transactions()...)
				if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()- 1); add == nil {
					log.Error("Unrooted new chain seen by tx pool"."block", newHead.Number, "hash", newHead.Hash())
					return}}Copy the code

⑤ : When the old and new chains reach the same height, they fall back at the same time, until the common parent node is found

forrem.Hash() ! = add.Hash() { discarded =append(discarded, rem.Transactions()...)
				if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()- 1); rem == nil {
					log.Error("Unrooted old chain seen by tx pool"."block", oldHead.Number, "hash", oldHead.Hash())
					return
				}
				included = append(included, add.Transactions()...)
				if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()- 1); add == nil {
					log.Error("Unrooted new chain seen by tx pool"."block", newHead.Number, "hash", newHead.Hash())
					return}}Copy the code

⑥ : Set the latest world status for the trading pool

statedb, err := pool.chain.StateAt(newHead.Root)
	iferr ! =nil {
		log.Error("Failed to reset txpool state"."err", err)
		return
	}
	pool.currentState = statedb
	pool.pendingNonces = newTxNoncer(statedb)
	pool.currentMaxGas = newHead.GasLimit
Copy the code

⑦ : Put the old chain back into the trading pool

senderCacher.recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
Copy the code

At this point, the entire reset process is over.


Reference:

mindcarver.cn/

Github.com/mindcarver/…

Learnblockchain. Cn / 2019/06/03 /…

Blog.csdn.net/lj900911/ar…