Author: Lin Guanhong/The Ghost at my Fingertips. Transporters, please: Be sure to identify the source.

The Denver nuggets: juejin. Cn/user / 178526…

Blog: www.cnblogs.com/linguanh/

Making: github.com/af913337456…

Published books:

  • 1.0- Blockchain Ethereum DApp Development Combat
  • “2.0 — Blockchain DApp Development: Based on Ethereum and Bitcoin Public Chain”

directory

  • Before the order
  • Ethereum trading poolknowledgeconclusion
  • The source code inside
    • Local deals
      • Initialization of the local wallet address
      • Loading local Transactions
      • pool.journal.load
      • pool.AddLocals
      • Update of local transaction files
    • The remote trading
      • Initialization of P2P communication module
      • Receiving P2P Messages
      • Add a remote transaction to a trading pool
  • “Eggs”

First article in 21 years, open source writing for 6 years.

Bitcoin and Ethereum prices have also taken off recently, and a BTC can now drive a full-size Tesla Model 3. Wide of the mark.

Release this article: From the perspective of a blockchain technology developer, it was last year when I talked about my experience in the blockchain industry and my understanding of it. Now looking back at the last paragraph, it became a prophecy.


Back to the point.

Usually do data pool development and so on. For example: order pool, request pool… Traditional server-side thinking leads us to think directly to message-oriented middleware. Use messaging components like RocketMQ, Redis, Kafka…

However, in the application of blockchain public chain, each of the known multiple public chains has such a functional module as transaction pool, and their code implementation has not been introduced into messaging middleware to implement.

When I read the source code of the Ethereum public chain earlier, I felt novel to the realization idea of the Ethereum trading pool. Today, I summarize and share with you the practice and characteristics of the blockchain public chain application that does not rely on messaging-oriented middleware to realize the trading pool.


Ethereum trading pool summary _(BTW: During the interview can be remembered) :

  1. Classification of transactions:
    • From the point of view of local file storage and non-storage:
      1. Local transaction, if the sender address of the transaction isConfiguration variablesAt the specified address, the transaction is considered to be local:
        • When a node is started, you can specify in the configuration file that local transactions are not enabled.
      2. Remote transactions, transactions that do not meet condition 1.
    • From a memory storage perspective:
      1. Queue, the transaction to be entered Pending, structure ismap[addr]TxList;
      2. Pending: Transactions to be queued. The structure of Pending is the same as Queue.
  2. Transaction input (generation) :
    • At the beginning of the program:
      1. Local transaction, load from local file to memory, local if not, natural is 0 input;
      2. Remote transaction, by P2P communication module, received transaction data, stored in memory.
    • Program running:
      1. To receive transactions on their ownRPC requests, SendTransaction or SendRawTransaction;
      2. Through P2P communication module, receive information from other nodes, including the following actions:
        1. Removal of old deals;
        2. An increase in new deals.
  3. Persistence strategy for transactions:
    • Local transactions:
      1. timingSelect local transactions from Pending and QueueStore to a local file;
      2. Storage, file replacement,First, the newA,To renameA wave;
      3. Note point 2, file substitution, which meansThat is to update.Also deleteOperation;
      4. Encoding mode,RLP coding, not JSON.
    • Remote transactions:
      1. Does not exist, does not persist, always relies on other nodes P2P communication synchronization.
  4. Interrupt recovery:
    1. Local transactions, same as aboveAt the beginning of the programThe operation;
    2. Remote transactions, no recovery, memory transactions lost is lost, no impact. Even if the current node is down, the other nodes are still working.

In the fourth point above, interrupt recovery. Compared with the messaging middleware of traditional back-end services, the guarantee of message loss and the approach of blockchain public chain are completely maintained by distribution. Data loss of a single node can be synchronized from other nodes. Therefore, the implementation of their transaction pool is relatively more flexible, and the coding difficulty lies in the message synchronization part.


Now comes the boring source code analysis phase, read the spare readers can continue

Look at the notes.

Local deals

1. Initialize the local wallet address

Go, config.Locals, specified by the configuration file, is an array of Ethereum wallet addresses.

func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool{...for _, addr := range config.Locals { // Add the local address from the configuration file
		log.Info("Setting new local account"."address", addr)
		// add to the locals variable, which is used later to filter out whether an address is a local address
		pool.locals.add(addr) 
	}
	...
}

Copy the code

2. Load transaction data from a local file, that is, load a local transaction

func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool{... pool.locals = newAccountSet(pool.signer)for _, addr := range config.Locals {
		log.Info("Setting new local account"."address", addr)
		pool.locals.add(addr)
	}
	...	
 	// This is done
	// If local transactions and journaling is enabled, load from disk
	if! config.NoLocals && config.Journal ! ="" { // If the configuration enables the local load requirement
		pool.journal = newTxJournal(config.Journal)
   		// Load is the loading function, and pool.addlocals is the actual adding function
		iferr := pool.journal.load(pool.AddLocals); err ! =nil {
			log.Warn("Failed to load transaction journal"."err", err)
		}
		iferr := pool.journal.rotate(pool.local()); err ! =nil {
			log.Warn("Failed to rotate transaction journal"."err", err)
		}
	}
	...
    go pool.loop() // Loop through events
}
Copy the code

3. pool.journal.load

Source file: tx_journal.go

func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
	// Skip the parsing if the journal file doesn't exist at all
	if _, err := os.Stat(journal.path); os.IsNotExist(err) {
		return nil
	}
	// Open the journal for loading any past transactions
	input, err := os.Open(journal.path) // Open the file and read the stream data
	iferr ! =nil {
		return err
	}
	...
	stream := rlp.NewStream(input, 0) // Use the RLP encoding algorithm to decode data. loadBatch :=func(txs types.Transactions) {
		for _, err := range add(txs) { // Call the add function to add
			iferr ! =nil {
				log.Debug("Failed to add journaled transaction"."err", err)
				dropped++
			}
		}
	}
	// loadBatch is called below. }Copy the code

4. pool.AddLocals

Pool.addlocals is the actual add function. After a series of internal calls, we end up with the tx_pool.add function. Pool queues are all map queues and can be de-weighted based on the same key.

func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error){...// The following if, if already in pool.pending, indicates that it was previously added to the queue
	iflist := pool.pending[from]; list ! =nil && list.Overlaps(tx) {
		...
 		pool.journalTx(from, tx) // Internally call journal.insert
		returnold ! =nil.nil
	}
	replaced, err = pool.enqueueTx(hash, tx) // In this case, it will be added to pool.enqueue
	iferr ! =nil {
		return false, err
	}
	pool.journalTx(from, tx) // Internally call journal.insert. }func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
	// Local wallet address, skip if not specified
	if pool.journal == nil| |! pool.locals.contains(from) {return
	}
 	// Insert will cause repeated add-ons, but load will be unloaded according to addr when it comes out
	iferr := pool.journal.insert(tx); err ! =nil {
		log.Warn("Failed to journal local transaction"."err", err)
	}
}
Copy the code

Local transactions have been added to the pool queue as of this point.

When a node starts up, in addition to loading transactions locally to the queue, it constantly listens for events in the chain, such as receiving transactions and adding them to the queue.

5. Update (insert/delete) of local transaction file

Loop is the triggered entry. Except that the active journal. Insert achieves the purpose of inserting local transactions.

The following update operations also achieve the purpose of including inserts: delete the old transaction from the file and store the new transaction to the file by means of replacement

func (pool *TxPool) loop(a){...for {
		select{...// Handle local transaction journal rotation
 		Rotate The following local transaction data files are updated regularly on the journal timer
		case <-journal.C:
			ifpool.journal ! =nil {
				pool.mu.Lock()
				iferr := pool.journal.rotate(pool.local()); err ! =nil {
					log.Warn("Failed to rotate local tx journal"."err", err)
				}
				pool.mu.Unlock()
			}
		}
	}
}

Copy the code

Journal. Rotate stores transactions related to the locals wallet address from the pool’s transaction pending and queue to the file using file substitution. Note, only save local wallet address, other, do not save.

/ / input
func (pool *TxPool) local(a) map[common.Address]types.Transactions{...for addr := range pool.locals.accounts {
		ifpending := pool.pending[addr]; pending ! =nil {
 			// Add pending
			txs[addr] = append(txs[addr], pending.Flatten()...)
		}
		ifqueued := pool.queue[addr]; queued ! =nil {
 			// Add queue
			txs[addr] = append(txs[addr], queued.Flatten()...) }}return txs
}

// all from local()
func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error{...// journal. Path +".new" suffix.new
	replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
	iferr ! =nil {
		return err
	}
	journaled := 0
	for _, txs := range all {
		for _, tx := range txs {
			iferr = rlp.Encode(replacement, tx); err ! =nil {
				replacement.Close()
				return err
			}
		}
		journaled += len(txs)
	}
	replacement.Close()
 	// rename, rename file to original path, update, replace purpose
	if err = os.Rename(journal.path+".new", journal.path); err ! =nil {
		return err
	}
	sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755)
	iferr ! =nil {
		return err
	}
	...
	return nil
}

Copy the code

The remote trading

Initialization of P2P communication module

Source file: eth/backend.go

func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error){...ifconfig.TxPool.Journal ! ="" {
		config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
	}
 	// Initialize the trading pool
	eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
	...
 	// Initialize the protocolManager with a transaction pool pointer object as an argument
	if eth.protocolManager, err = NewProtocolManager(
    		chainConfig, checkpoint, config.SyncMode, config.NetworkId, 
            	eth.eventMux, `eth.txPool`, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist); err ! =nil {
		return nil, err
	}
	...
	return eth, nil
}

func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
	// Initialize tx_fetcher and assign to addTxs with txpool.addremotes
	manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, fetchTx)
}

Copy the code

Receiving P2P Messages

Source file: eth/handler.go

func (pm *ProtocolManager) handleMsg(p *peer) error{...switch{...// Received transaction data from other nodes
    case msg.Code == TransactionMsg || (msg.Code == PooledTransactionsMsg && p.version >= eth65):
		...
 		// Enqueue adds the transaction to the transaction pool
		pm.txFetcher.Enqueue(p.id, txs, msg.Code == PooledTransactionsMsg)

    }
    ...
}
/ / tx_fetcher. Go file
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error{... errs := f.addTxs(txs)// Add, this function is actually tx_pool.go AddRemotes. }Copy the code

Add a remote transaction to a trading pool

// tx_pool.go
// addTxs adds transactions to Pending and Queue
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
	return pool.addTxs(txs, false.false)}Copy the code

Finished work

For more on ethereum development, see my book:

“2.0 — Blockchain DApp Development: Based on Ethereum and Bitcoin Public Chain”