Sorted into hyperledger fabric source | kafka consensus

Article and code: github.com/blockchainG…

Branches: v1.1.0

An overview of the

Orderer Consensus component provides HandleChain() method to create consensus component Chain objects (consensus.Chain interface) for channel binding, including Solo (solo.chain type), Kafka (Kafka. ChainImpl type), etc. An important implementation module that belongs to the channel consensus component and is set to the cs.chain field of the Chain support object. The consensus component chain object provides Orderer consensus ordering service, which is responsible for ordering transactions on associated channels, packaging blocks, submitting ledger, channel management, etc. Currently, Golang channel or Kafka cluster is used as the consensus ordering back end to receive and sort transaction messages filtered and forwarded by Broadcast service.

Kafka consensus sorting service

Orderer service cluster

Orderer node uses Sarama open source Kafka third-party library to construct Kafka consensus components, which can accept and process transaction message requests sent by multiple clients at the same time, effectively improving the concurrency ability of Orderer node to process transaction messages. At the same time, we can use the function of Kafka cluster in a single partition to collect the same topic messages sequentially (message sequence number is unique) to ensure that the transaction messages have a deterministic order (sorting by message sequence number), so as to achieve a global consensus on the purpose of transaction ordering.

Kafka producers produce and publish messages by Topic, and The Kafka server cluster automatically classifies message topics. Messages on the same topic are collected into one or more partition files and appended to the end of the file in FIFO order. Each message has an OFFSET position in the partition as the unique ID of the message. Currently, Hyperledger Fabric binds a topic (i.e. chainID, chainID) for each channel created based on a Kafka cluster, and sets only one partition (partition number 0). Kafka consumers manage multiple partition consumers and subscribe to the topic messages of the specified partition, including the topic (chainID), the partition number (currently there is only one partition with partition number 0), and the start offset (the message location from which the subscription started).

Hyperledger Fabric uses a Kafka cluster to sort transaction messages submitted by a single or multiple Orderer sorting nodes. At this point, the Orderer ordering node acts as both the message producer (partition) and the consumer of the Kafka cluster, publishing and subscribing messages to the same topic partition on the Kafka cluster, that is, forwarding the transaction messages submitted by the Peer node to the Kafka server. At the same time, Retrieves sorted transaction messages from the Kafka partition on the specified topic and automatically filters restarted transaction messages. During this period, there may be network delay resulting in the difference in message acquisition time. If packet loss is not taken into account, the order and number of messages to be obtained by all Orderer nodes should be determined and consistent. At the same time, the same Kafka consensus component chain objects and block generation rules are adopted to ensure that all Orderer nodes can create and update the same configuration channel, and cut to generate the same batch transaction set block, and then “synchronize” to construct the same block data, so as to achieve global consensus based on Kafka cluster. To ensure global consistency of block data.

Start the consensus component chain object

Startup entry:

orderer/consensus/kafka/chain.go/Start()

func (chain *chainImpl) Start(a) {
	go startThread(chain)
}
Copy the code
func startThread(chain *chainImpl){...// Create a kafka producer
	chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
	...
	// The Kafka producer sends a CONNECT message to establish a connection
	iferr = sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel); err ! =nil {
		logger.Panicf("[channel: %s] Cannot post CONNECT message = %s", chain.channel.topic(), err)
	}
	...
	// Create Kafka consumer
	chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
	...
	// create a Kafka partition consumer
	chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1)...close(chain.startChan) // The consensus component chain object has been started, and Broadcast is not blocked
	chain.errorChan = make(chan struct{}) // Create the errorChan channel without blocking the Deliver service processing handle. chain.processMessagesToBlocks()// Create a message processing loop to process the messages received on the subscription partition
}
Copy the code

The startThread function first creates the Kafka producer and publishes the message to the channel partition (chain-.channel) with the specified topic (channel ID) and partition number.

The connection is then established by sending a CONNECT message specifying the Topic field for the chain ID, the Key field for the partition number 0, the Value field for the CONNECT type message payload, and so on. The Message is received by the Kafka (partition) consumer that subscribes to the topic.

Next, create a Kafka consumer object that specifies the Kafka partition and Broker server configuration, and set up to get messages from the Kafka partition that specifies the topic (chain ID) and partition number (0).

Finally, the processMessagesToBlocks() method is called to create a message-processing loop that processes the subscription messages received from the Kafka cluster.

Process the message

ProcessMessagesToBlocks receives a normal Kafka partition consumer message and processes it based on the type of Kafka message, including the following types:

  • Kafka- Message_Regular
  • KafkaMessage_TimeToCut
  • KafkaMessage_Connect
func (chain *chainImpl) processMessagesToBlocks(a) ([]uint64, error){...for { // Message processing loop
		select{...case in, ok := <-chain.channelConsumer.Messages(): // A normal Kafka partition consumer message was received.select {
			case <-chain.errorChan: // If this channel was closed... // If the channel is already closed, recreate it.switch msg.Type.(type) { // Analyze the Kafka message type
			case *ab.KafkaMessage_Connect: // The Kafka connection message resumes the Kafka consumer partition subscription process due to an error
				_ = chain.processConnect(chain.ChainID()) // Process the CONNECT connection message without doing anything
				counts[indexProcessConnectPass]++         // The count of successfully processed messages increases by 1
			case *ab.KafkaMessage_TimeToCut: // Kafka generates block messages with timed cuts
				iferr := chain.processTimeToCut(msg.GetTimeToCut(), in.Offset); err ! =nil {
					logger.Warningf("[channel: %s] %s", chain.ChainID(), err)
					logger.Criticalf("[channel: %s] Consenter for channel exiting", chain.ChainID())
					counts[indexProcessTimeToCutError]++
					return counts, err // TODO Revisit whether we should indeed stop processing the chain at this point
				}
				counts[indexProcessTimeToCutPass]++ // The count of successfully processed messages increases by 1
			case *ab.KafkaMessage_Regular: // Kafka regular messages
				iferr := chain.processRegular(msg.GetRegular(), in.Offset); err ! =nil { // Process Kafka normal messages. counts[indexProcessRegularError]++ }... }case <-chain.timer: // Timeout timer
			if err := sendTimeToCut(chain.producer, chain.channel, chain.lastCutBlockNumber+1, &chain.timer); err ! =nil { // Send a message of type TimeToCut requesting to pack blocks. counts[indexSendTimeToCutError]++ } ... }}}Copy the code

① : KafkaMessage_Connect type message

The Kafka connection message is used to test the working status of connected Kafka partition consumers, to verify the normal working status of Kafka consensus components and troubleshoot, and call chain-processConnect (chain-.chainid ()) method to process the message.

② : KafkaMessage_TimeToCut message

The processMessagesToBlocks() method calls the chain-.processTimetocut () method to handle messages of type TIMETOCUT. If the block number ttcNumber in the message is not the block number of the next packaged block in the current channel ledger of the current Orderer node (lastCutBlockNumber+1), it is discarded without processing. Otherwise, the BlockCutter().cut () method is called to Cut the list of cached transaction messages to be processed on the channel into batch ([]* Cb.envelope), and then the CreateNextBlock(Batch) method is called to construct a new block and submit the ledger. Finally, the WriteBlock(metadata) method is called to update the block metadata and submit the ledger, while the lastCutBlockNumber of the Kafka Consensus component chain object is updated with an increment of 1.

In fact, the points in time at which the Orderer service cluster nodes independently package blocks are usually not completely synchronized, and it is possible to repeatedly receive TIMETOCUT type messages (duplicate block numbers) submitted by other Orderer nodes. At this point, the Orderer node takes the first TIMETOCUT message it receives, packs it up, submits it to the ledger, and updates the latest block number lastCutBlockNumber for the current channel. Thus, the processTimeToCut() method can use the latest lastCutBlockNumber to filter out other duplicate TIMETOCUT messages to ensure data synchronization of ledger block files on all Orderer nodes. In effect, the original time synchronization mechanism is converted into a message synchronization mechanism.

③ : KafkaMessage_Regular message

This includes channel configuration transaction messages (type KafkaMessageRegular_CONFIG) and normal transaction messages (type KafkaMessageRegular_NORMAL). The detailed analysis will be shown in the processRegular method.

Process configuration transaction messages

Let’s take a quick look at the section of ProcessRegular’s code that handles configuration transaction messages, and since it’s quite long, we need to start with an overview:

func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error{... commitConfigMsg :=func(message *cb.Envelope, newOffset int64){... } seq := chain.Sequence()// Get the latest configuration number of the current channel.switch regularMessage.Class {
	case ab.KafkaMessageRegular_UNKNOWN: // Unknown message type.case ab.KafkaMessageRegular_NORMAL: // Normal transaction message type.case ab.KafkaMessageRegular_CONFIG: // The channel configures the transaction message. }... }Copy the code

KafkaMessageRegular_CONFIG case ab.kafkamessageregular_config

(1) : if regularMessage. OriginalOffset to 0

Note This is a channel configuration transaction message for refiltering validation and sorting.

1.1 Filtering Messages that are repeatedly submitted

if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {}
Copy the code

1.2 Verify that the configuration transaction message is recently revalidated and reordered, and that the channel configuration serial number is up to date

if regularMessage.OriginalOffset == chain.lastResubmittedConfigOffset &&regularMessage.ConfigSeq == seq {
  // Therefore, the channel is closed and the Broadcast service processing handle is unblocked waiting to receive the message again for processing
  close(chain.doneReprocessingMsgInFlight) 
}
Copy the code

1.3 actively update the initial offset lastResubmitted of the configured transaction message of the lastResubmitted order of this channel

Another Orderer node resubmitted the configuration message, but the local Orderer node did not resubmit the message. Therefore, it is necessary to update the initial offset of the configured transaction message lastResubmitted for the lastResubmitted order of this channel.

if chain.lastResubmittedConfigOffset < regularMessage.OriginalOffset {
				chain.lastResubmittedConfigOffset = regularMessage.OriginalOffset
			}
Copy the code

(2) : regularMessage. OriginalOffset is 0

This indicates that the channel configuration transaction message is submitted for the first time, rather than being revalidated and reordered.

2.1 If the configuration number regularmessage. ConfigSeq in a message is smaller than the latest configuration number seq of the current channel

The channel configuration has been updated (the configuration number is higher), and the current configuration transaction message is processed (the configuration number is lower). ProcessConfigMsg will be called to refilter and process the message.

The configuration message is then resubmitted through configure to sort, resetting the initial offset of the message. It then updates the offset of the most recently resubmitted message.

if regularMessage.ConfigSeq < seq {
  ...
	configEnv, configSeq, err := chain.ProcessConfigMsg(env)
  iferr := chain.configure(configEnv, configSeq, receivedOffset); err ! =nil{... }// Blocks receiving message processing to update the offset of the most recently resubmitted message
  chain.lastResubmittedConfigOffset = receivedOffset 
  // Create a channel to block the Broadcast service from receiving processing messages
  chain.doneReprocessingMsgInFlight = make(chan struct{})}Copy the code

③ : Submit the configuration transaction message to perform channel management operations

Filter out the unqualified cases through ① and ② above, then commit configuration transaction message to perform channel management operation, core function: commitConfigMsg(env, offset)

3.1 Cut the current cached transaction messages into batch transaction sets

batch := chain.BlockCutter().Cut()
Copy the code

3.2 Create a new block

block := chain.CreateNextBlock(batch)
Copy the code

3.3 Constructing Kafka Metadata

metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{ // Construct Kafka metadata
				LastOffsetPersisted:         receivedOffset - 1.// The offset is reduced by 1
				LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
				LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
			})
Copy the code

3.4 Writing blocks

Submit a new block to the ledger via the block write component, updating the current channel’s latest block number, chain.lastCutBlockNumber, by 1

chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++  
Copy the code

Then update the chain’s lastoriginal-offset argument to newOffset and do much the same as above:

chain.lastOriginalOffsetProcessed = newOffset
		block := chain.CreateNextBlock([]*cb.Envelope{message}) // Create a new block
		metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{     // Construct Kafka metadata
			LastOffsetPersisted:         receivedOffset,
			LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
			LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
		})
		chain.WriteConfigBlock(block, metadata) // Write the configuration block
		chain.lastCutBlockNumber++              // New block number increment by 1
Copy the code

WriteBlock and WriteConfigBlock are both commitblocks that are called as follows:

func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte){...// Add block signatures
	bw.addBlockSignature(bw.lastBlock)
  // Add the latest configuration signature
	bw.addLastConfigSignature(bw.lastBlock)
	// Write a new block
	err := bw.support.Append(bw.lastBlock)
	...
}
Copy the code

Next, we’ll discuss how the Kafka Consensus component handles ordinary transaction messages.

Process ordinary transaction messages

Going back to the processRegular method, the general method for handling normal messages is as follows:

func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error{...case ab.KafkaMessageRegular_NORMAL: // Normal transaction message type
		// If OriginalOffset is not 0, the message is revalidated and resubmitted
		ifregularMessage.OriginalOffset ! =0{.../ / if the news offset is not more than lastOriginalOffsetProcessed has recently been offset processing messages,
			// the message has already been processed, in which case the return should be discarded to prevent reprocessing of ordinary transaction messages with the same offset submitted by other Orderers
			if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
				...
		}

		// // Check whether the channel configuration serial number is updated
		if regularMessage.ConfigSeq < seq {
			...
			//// The configuration sequence number of the message is low, and you need to verify the filtering message again
			configSeq, err := chain.ProcessNormalMsg(env)
			...
			// Resubmit the normal trade message
      iferr := chain.order(env, configSeq, receivedOffset); err ! =nil{}... }// advance lastOriginalOffsetProcessed iff message is re-validated and re-ordered
		/ / if and only if the message validation and reorder again, only need to modify lastOriginalOffsetProcessed offset
		offset := regularMessage.OriginalOffset
		if offset == 0 {
			offset = chain.lastOriginalOffsetProcessed
		}
		// Submit ordinary transaction messages. Offset is the offset of the most recently processed ordinary transaction messages
		commitNormalMsg(env, offset)
}
Copy the code

The process of processing ordinary transaction messages is similar to the process of configuring transaction messages. CommitNormalMsg (env, offset)

commitNormalMsg := func(message *cb.Envelope, newOffset int64) {
		//// Add the received messages to the cached transaction message list and cut them into a batch transaction set list
		batches, pending := chain.BlockCutter().Ordered(message)
		...
		if len(batches) == 0 {
			// If there is no batch transaction set, start the timer to periodically send the cut out block message n
			chain.lastOriginalOffsetProcessed = newOffset
			if chain.timer == nil {
				chain.timer = time.After(chain.SharedConfig().BatchTimeout())
			...
			return
		}
		chain.timer = nil
		offset := receivedOffset // Set the current message offset
		if pending || len(batches) == 2 {
			offset-- // Calculate the offset of the first batch trade message as offset minus 1
		} else {  // There is only one batch transaction set that constitutes one block
			//// Sets the message offset for the first batch trade set to newOffset
			chain.lastOriginalOffsetProcessed = newOffset
		}
		//// construct and submit the first block
		block := chain.CreateNextBlock(batches[0])
		metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
			LastOffsetPersisted:         offset,
			LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
			LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
		})
		chain.WriteBlock(block, metadata) // Update block metadata and commit blocks to ledger
		chain.lastCutBlockNumber++ // Update the block number of the latest block on the current channel by 1.// Commit the second block if exists
		//// examines the second batch trade set, constructs and commits the second block
		if len(batches) == 2 {
			chain.lastOriginalOffsetProcessed = newOffset
			offset++ // Set the message offset of the second batch trade set to 1

			block := chain.CreateNextBlock(batches[1]) metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{ LastOffsetPersisted: offset, LastOriginalOffsetProcessed: newOffset, LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset, }) chain.WriteBlock(block, metadata) chain.lastCutBlockNumber++ ... }}Copy the code

The new ordinary trade messages are first added to the current cached trade list and sliced into a batch trade set list of batches, containing a maximum of two batches of batches and a second batch of batches containing a maximum of one transaction. WriteBlock is also ultimately called to write to the ledger.

At this point the entire processRegular() method finishes processing the message.

Summary and Reference

Kafka consensus sort logic is actually relatively simple, roughly the flow is as follows:

Github.com/blockchainG… (Article picture code information in it)

Wechat official account: Blockchain technology stack