Use NestJS + Redis + Kafka to achieve a simple second kill system

Tech Stack: our old buddyNestJS, as well asioredis.kafka-node

Recently I was researching kafka message queues, so I wanted to write a second kill to try my hand. I read several blogs without specific project examples, so I referenced various implementations to write a working project with NestJS.

Step one, create the project

The Nest CLI command is used to quickly generate the project template;

  1. Install @nest/ CLI scaffolding for generating projects;

npm i -g @nest/cli   # installation nest - cli
Copy the code


  1. Generating project

nest new nest-seckill   Build projects using nest CLI
cd ./nest-seckill
yarn                    # install dependencies
yarn add -S redis ioredis nestjs-redis kafka-node mysql2 typeorm uuid-random    Add a dependency
Copy the code

Second, generate the Seckill module

The Nest CLI command is used to quickly generate template code; See the official documentation: Nest-CLI Documentation for details


  1. generateseckill.module.tsFile;

    Used to create kafka consumers, receive Kafka messages, write order information;

    nest generate module seckill 
    # nest g mo seckill
    Copy the code


  1. generateseckill.controller.ts;

    RESTful interface for implementing seckill;

    nest g co seckill
    Copy the code


  1. generateseckill.service.ts;

    In the service, redis optimistic lock (Watch) and transaction (mult) are used to implement seckill logic, and Kafka Producer is used to produce a consumption data.

    nest g service seckill
    Copy the code


  1. generateredis.service.ts;

    Used to connect redis;

    nest g service redis
    Copy the code

    Modified contents:

     import { Injectable } from '@nestjs/common'
     import { RedisService } from 'nestjs-redis'
    
     @Injectable(a)export class RedisClientService {
       constructor(private readonly redisService: RedisService) {}
    
       // Connection configuration is set in app.module
       async getSeckillRedisClient() {
         return await this.redisService.getClient('seckill')}}Copy the code

Step 3, write the second kill logic;


  1. Define a kill interface:

    Add a Post interface to seckill.controller.ts:

     import { Body, Controller, Post } from '@nestjs/common'
     import * as uuid from 'uuid-random'                   // Use uUID to generate order number
     import { CreateOrderDTO } from '.. /order/order.dto'   // Add order field definition
     import { SeckillService } from './seckill.service'   // Implement the seckill logic
     import { awaitWrap } from '@/utils/index'            // async returns a simplified method
    
    @Controller('seckill')
     export class SeckillController {
      constructor(private readonly seckillService: SeckillService) {}
      
       @Post('/add')
       async addOrder(@Body() order: CreateOrderDTO) {
         constparams: CreateOrderDTO = { ... order,openid: `${uuid()}-The ${new Date().valueOf()}`,}// Call the secKill method of the service and wait for it to complete
         const [error, result] = await awaitWrap(this.seckillService.secKill(params))
         return error || result
       }
     }
    Copy the code


  1. Implement seckill logic:

    Add a seckill method to seckill.service.ts;

    Use Redis optimistic lock (Watch) and transaction (MULT) to modify data concurrently. See the Node Redis documentation for details.

    import { Injectable, Logger } from '@nestjs/common'
    import * as kafka from 'kafka-node'
    import * as Redis from 'ioredis'
    import { RedisClientService } from '.. /redis/redis.service'
    import { getConfig } from '@root/config/index' // Redis and Kafka connection configuration
    import { awaitWrap } from '@/utils'
    
    const { redisSeckill, kafkaConfig } = getConfig()
    
    // Create kafka Client
    const kafkaClient = new kafka.KafkaClient({ kafkaHost: kafkaConfig.kafkaHost })
    // Create a kafka producer
    const producer = new kafka.Producer(kafkaClient, {
      // Configuration for when to consider a message as acknowledged, default 1
      requireAcks: 1.// The amount of time in milliseconds to wait for all acks before considered, default 100ms
      ackTimeoutMs: 100.// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
      partitionerType: 2,})@Injectable(a)export class SeckillService {
      logger = new Logger('SeckillService') // Create a nest log instanceseckillRedisClient! : Redis.Redis// redis connection instance
      count = 0 // Number of current requests
    
      constructor(private readonly redisClientService: RedisClientService) {
        // Initializes the Redis connection asynchronously when a service is created
        this.redisClientService.getSeckillRedisClient().then(client= > {
          this.seckillRedisClient = client
        })
      }
    
      / * * * * * * * * * * * * * * * * * * * * * * * * * * @ desc seconds kill concrete implementation * * * * * * * * * * * * * * * * * * * * * * * * * /
      async secKill(params) {
        const { seckillCounterKey } = redisSeckill
        this.logger.log('Current request count:The ${this.count++}`)
    
        // Tips: Use optimistic locks to solve concurrency
        const [watchError] = await awaitWrap(this.seckillRedisClient.watch(seckillCounterKey)) // Listen for 'counter' field changes
        watchError && this.logger.error(watchError)
        if (watchError) return watchError
    
        // Get the remaining quantity of the current order
        const [getError, reply] = await awaitWrap(this.seckillRedisClient.get(seckillCounterKey))
        getError && this.logger.error(getError)
        if (getError) return getError
        if (parseInt(reply) <= 0) {
          this.logger.warn('It's sold out.')
          return 'It's sold out.'
        }
    
        // Tips: Use redis transactions to change the number of redis counters by one
        const [execError, replies] = await awaitWrap(this.seckillRedisClient.multi().decr(seckillCounterKey).exec())
        execError && this.logger.error(execError)
        if (execError) return execError
    
        // The counter field is in operation, waiting for counter to be released by others
        if(! replies) {this.logger.warn('Counter is used')
          this.secKill(params) // Automatically retry
          return
        }
    
        // Kafka consumes the content of data
        const payload = [
          {
            topic: kafkaConfig.topic,
            partition: 0.messages: [JSON.stringify(params)],
          },
        ]
    
        this.logger.log('Payload :')
        this.logger.verbose(payload)
    
        // Wait asynchronously to send kafka consumption data
        return new Promise((resolve, reject) = > {
          producer.send(payload, (err, kafkaProducerResponse) = > {
            if (err) {
              this.logger.error(err)
              reject(err)
              return err
            }
    
            this.logger.verbose(kafkaProducerResponse)
            resolve({ payload, kafkaProducerResponse })
          })
        })
      }
      
    }
    Copy the code


  1. Listen for kafka messages, consume order queue messages;

    In seckill. Module. Ts new handleListenerKafkaMessage () method, is used to deal with kafka message;

    You also need to call this method when the Seckill module mounts (onApplicationBootstrap) to start subscribesto kafka messages.

    import { Logger, Module, OnApplicationBootstrap } from '@nestjs/common'
    import * as Redis from 'ioredis'
    import { awaitWrap } from '@/utils'
    import { CreateOrderDTO } from '.. /order/order.dto'
    import { OrderModule } from '.. /order/order.module'
    import { OrderService } from '.. /order/order.service'
    import { RedisClientService } from '.. /redis/redis.service'
    import { getKafkaConsumer } from './kafka-utils'
    import { SeckillController } from './seckill.controller'
    import { SeckillService } from './seckill.service'
    import { getConfig } from '@root/config'
    
    const { kafkaConfig } = getConfig()
    
    @Module({
      imports: [OrderModule],
      providers: [RedisClientService, SeckillService],
      controllers: [SeckillController],
    })
    export class SeckillModule implements OnApplicationBootstrap {
      logger = new Logger('SeckillModule') seckillRedisClient! : Redis.Redisconstructor(
        private readonly orderService: OrderService, // The Service that processes the order
        private readonly seckillService: SeckillService, // Seckill related implementation
        private readonly redisClientService: RedisClientService / / redis connection
      ) {
        this.redisClientService.getSeckillRedisClient().then(client= > {
          this.seckillRedisClient = client
        })
      }
    
      async handleListenerKafkaMessage() {
        const kafkaConsumer = getKafkaConsumer() // Extract create consumer implementation methods as functions
    
        kafkaConsumer.on('message'.async message => {
          this.logger.log(The producer data obtained is:)
          this.logger.verbose(message)
    
          letorder! : CreateOrderDTO// The order data from the kafka queue is the messages content of the producer.send service
    
          if (typeof message.value === 'string') {
            order = JSON.parse(message.value)
          } else {
            order = JSON.parse(message.value.toString())
          }
    
          // Write to the database to complete the order creation
          const [err, order] = await awaitWrap(this.orderService.saveOne(value))
          if (err) {
            this.logger.error(err)
            return
          }
          this.logger.log(【 ` order${order.id}The information is stored in the database)})}async onApplicationBootstrap() {
        this.logger.log('onApplicationBootstrap: ')
        await this.seckillService.initCount()         // Reset the remaining inventory in redis
        this.handleListenerKafkaMessage()
      }
    }
    
    Copy the code


  1. Kafka consumersgetKafkaConsumerThe method is implemented as follows:

    Add kafka-utils.ts file to seckill module folder:

    import * as kafka from 'kafka-node'
    import * as Redis from 'ioredis'
    import { getConfig } from '@root/config/index'
    import { awaitWrap } from '@/utils'
    
    const { kafkaConfig } = getConfig()
    letkafkaConsumer! : kafka.Consumer// Get the Kafka client
    function getKafkaClient() {
      letkafkaClient! : kafka.KafkaClientreturn () = > {
        if(! kafkaClient) { kafkaClient =new kafka.KafkaClient({
            kafkaHost: kafkaConfig.kafkaHost,
          })
        }
    
        return kafkaClient
      }
    }
    
        / * * *@desc Get the consumer instance */
      export function getKafkaConsumer() {
        // Consumer the topics configuration to subscribe to
        const topics = [
          {
            topic: kafkaConfig.topic,
            partition: 0.offset: 0,},]const options = {
          // Auto-commit configuration (false does not commit offsets, reads from scratch each time)
          autoCommit: true.autoCommitIntervalMs: 5000.// If set to true, the consumer gets the message from the given offset in the payload
          fromOffset: false,}const kafkaClient = getKafkaClient()()
    
        if(! kafkaConsumer) { kafkaConsumer =new kafka.Consumer(kafkaClient, topics, options)
        }
    
        return kafkaConsumer
      }
    Copy the code


The resulting file structure looks something like this:

Operating Projects:

yarn dev
Copy the code

Some instructions

  1. If you need to test the seckill interface concurrently, you can use Postman’s Runner multiple; Simple test interface logic, you can open the project default configuration swagger – UI page http://localhost:3000/api-docs

  2. So that’s the end of our main kill logic. Since we are mainly implementing the seckill logic, all the order module code is not expanded here. With just a few lines of command as in step 2, we can simply create the Order module for Order curd;

  3. For redis,mysql, Kafka and other services, docker-comemage. yaml can be written to start up quickly. For details, please refer to the code of this project.

    Kafka containers may fail to start due to the firewall of centos. The solution is to close the firewall of the host and restart the docker.
  4. Kafka container after creation, we need to open the browser to access kafka-Manager container mapping 9000 port kafka management page, create cluster and our Topic, specific initialization operation is relatively simple, can search kafka-manager;

    For example,Kafka cluster management tool Kafka-manager installation use

Github address: github.com/wenqieqiu/n…