Earlier in the QQ group chat with the test peer, we talked about a fixed QPS pressure measurement problem, recently suddenly there is a demand, want to implement, enrich their own performance testing framework, the latest code please go to my GitHub, address :*github.com/JunManYuanL…

Train of thought

  • There is a multithreaded base class from which other stress task classes inherit.
  • The concurrent execution class consists of a thread pool, a task generator, and a compensator.
  • The single-thread execution task generator throws the generated task object into the thread pool for execution.
  • Start another compensator thread to complete the missing compensation. (Due to a variety of reasons, the actual amount is less than the set value)

The general idea is the same as how to mock the QPS interface and moCO fixed QPS interface upgrade compensation mechanism, but the Semaphore mode is not adopted, because MOCO is multi-thread to single thread, and pressure is single-thread to multi-thread.

  • Language continues to be adoptedJavaLanguage.

The base class

It’s written in a bit of a hurry and hasn’t done a lot of practice, so there are fewer notes. Here, there are still two sub-modes: quantitative pressure and timed pressure. Because the two pressure modes are recorded by an attribute isTimesMode, which is used in the execution class FixedQpsConcurrent, the single pressure task object unified isTimesMode and Limit attributes.

package com.fun.base.constaint;

import com.fun.base.interfaces.MarkThread;
import com.fun.config.HttpClientConstant;
import com.fun.frame.execute.FixedQpsConcurrent;
import com.fun.frame.httpclient.GCThread;
import com.fun.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FixedQpsThread<T> extends ThreadBase {

    private static Logger logger = LoggerFactory.getLogger(FixedQpsThread.class);

    public int qps;

    public int limit;

    public boolean isTimesMode;

    public FixedQpsThread(T t, int limit, int qps, MarkThread markThread) {
        this.limit = limit;
        this.qps = qps;
        this.mark = markThread;
        this.t = t;
        isTimesMode = limit > 1000 ? true : false;
    }


    protected FixedQpsThread(a) {
        super(a); }@Override
    public void run(a) {
        try {
            before();
            threadmark = mark == null ? EMPTY : this.mark.mark(this);
            long s = Time.getTimeStamp();
            doing();
            long e = Time.getTimeStamp();
            long diff = e - s;
            FixedQpsConcurrent.allTimes.add(diff);
            FixedQpsConcurrent.executeTimes.getAndIncrement();
            if (diff > HttpClientConstant.MAX_ACCEPT_TIME)
                FixedQpsConcurrent.marks.add(diff + CONNECTOR + threadmark);
        } catch (Exception e) {
            logger.warn("Mission failed!", e);
            logger.warn("Flags for failed object :{}", threadmark);
            FixedQpsConcurrent.errorTimes.getAndIncrement();
        } finally{ after(); }}@Override
    public void before(a) {
        GCThread.starts();
    }

    /** * Subclasses must implement the change method, otherwise calling deepClone methods will report an error **@return* /
    public abstract FixedQpsThread clone(a);


}

Copy the code

Perform class

The design of the compensation thread here is still to be optimized, and there are two dormancy places in the middle: one is to detect whether compensation is needed in a loop, and the other is the word compensation interval. Configuration variables have not yet been extracted and are to be optimized and adjusted after subsequent practice. The object of the test result still adopts the original value and calculation method, and it will be adjusted according to the practical results in the later stage. You can follow my GitHub for timely updates.

package com.fun.frame.execute;

import com.fun.base.bean.PerformanceResultBean;
import com.fun.base.constaint.FixedQpsThread;
import com.fun.config.Constant;
import com.fun.frame.Save;
import com.fun.frame.SourceCode;
import com.fun.frame.httpclient.GCThread;
import com.fun.utils.Time;
import com.fun.utils.WriteRead;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.stream.Collectors.toList;

/** * concurrency class, used to start stress scripts */
public class FixedQpsConcurrent extends SourceCode {

    private static Logger logger = LoggerFactory.getLogger(FixedQpsConcurrent.class);

    public static boolean key = false;

    public static AtomicInteger executeTimes = new AtomicInteger();

    public static AtomicInteger errorTimes = new AtomicInteger();

    public static Vector<String> marks = new Vector<>();

    /** * is used to record all request times */
    public static Vector<Long> allTimes = new Vector<>();

    /** * start time */
    public long startTime;

    /** * End time */
    public long endTime;

    public int queueLength;

    /** * Task description */
    public String desc = DEFAULT_STRING;

    /** * Task set */
    public List<FixedQpsThread> threads = new ArrayList<>();

    /** * thread pool */
    ExecutorService executorService;

    / * * *@paramThread Thread task */
    public FixedQpsConcurrent(FixedQpsThread thread) {
        this(thread, DEFAULT_STRING);
    }

    / * * *@paramThreads Thread group */
    public FixedQpsConcurrent(List<FixedQpsThread> threads) {
        this(threads, DEFAULT_STRING);
    }

    / * * *@paramThread Thread task *@paramDesc Task description */
    public FixedQpsConcurrent(FixedQpsThread thread, String desc) {
        this(a);this.queueLength = 1;
        threads.add(thread);
        this.desc = desc + Time.getNow();
    }

    / * * *@paramThreads Thread group *@paramDesc Task description */
    public FixedQpsConcurrent(List<FixedQpsThread> threads, String desc) {
        this(a);this.threads = threads;
        this.queueLength = threads.size();
        this.desc = desc + Time.getNow();
    }

    private FixedQpsConcurrent(a) {
        executorService = ThreadPoolUtil.createPool(20.200.3);
    }

    If there is no threadname, the name of the threadname is desc+ the number of threads used as the threadname, removing the date */
    public PerformanceResultBean start(a) {
        key = false;
        FixedQpsThread fixedQpsThread = threads.get(0);
        boolean isTimesMode = fixedQpsThread.isTimesMode;
        int limit = fixedQpsThread.limit;
        int qps = fixedQpsThread.qps;
        long interval = 1 _000_000_000 / qps;
        AidThread aidThread = new AidThread();
        new Thread(aidThread).start();
        startTime = Time.getTimeStamp();
        while (true) {
            executorService.execute(threads.get(limit-- % queueLength).clone());
            if (key ? true : isTimesMode ? limit < 1 : Time.getTimeStamp() - startTime > fixedQpsThread.limit) break;
            sleep(interval);
        }
        endTime = Time.getTimeStamp();
        aidThread.stop();
        GCThread.stop();
        try {
            executorService.shutdown();
            executorService.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            logger.error("Thread pool failed to wait for task to end!", e);
        }
        logger.info("Total execution {}, common :{} s, total execution :{}, errors :{}!", fixedQpsThread.isTimesMode ? fixedQpsThread.limit + "This mission" : "Seconds", Time.getTimeDiffer(startTime, endTime), executeTimes, errorTimes);
        return over();
    }

    private PerformanceResultBean over(a) {
        key = true;
        Save.saveLongList(allTimes, "data/" + queueLength + desc);
        Save.saveStringListSync(marks, MARK_Path.replace(LONG_Path, EMPTY) + desc);
        allTimes = new Vector<>();
        marks = new Vector<>();
        executeTimes.set(0);
        errorTimes.set(0);
        return countQPS(queueLength, desc, Time.getTimeByTimestamp(startTime), Time.getTimeByTimestamp(endTime));
    }

    /** ** calculated result * <p> This result is for reference only </p> **@paramName Number of threads */
    public PerformanceResultBean countQPS(int name, String desc, String start, String end) {
        List<String> strings = WriteRead.readTxtFileByLine(Constant.DATA_Path + name + desc);
        int size = strings.size();
        List<Integer> data = strings.stream().map(x -> changeStringToInt(x)).collect(toList());
        int sum = data.stream().mapToInt(x -> x).sum();
        Collections.sort(data);
        String statistics = StatisticsUtil.statistics(data, desc, this.queueLength);
        double qps = 1000.0 * size * name / sum;
        return new PerformanceResultBean(desc, start, end, name, size, sum / size, qps, getPercent(executeTimes.get(), errorTimes.get()), 0, executeTimes.get(), statistics);
    }


    /** * for later calculations **@param name
     * @param desc
     * @return* /
    public PerformanceResultBean countQPS(int name, String desc) {
        return countQPS(name, desc, Time.getDate(), Time.getDate());
    }

    /** ** **@param name
     * @return* /
    public PerformanceResultBean countQPS(int name) {
        return countQPS(name, EMPTY, Time.getDate(), Time.getDate());
    }


    /** ** compensation thread */
    class AidThread implements Runnable {

        private boolean key = true;

        int i;

        public AidThread(a) {}@Override
        public void run(a) {
            logger.info("Compensation thread started!");
            while (key) {
                long expect = (Time.getTimeStamp() - startTime) / 1000 * threads.get(0).qps;
                if (expect > executeTimes.get() + 10) {
                    range((int) expect - executeTimes.get()).forEach(x -> {
                        sleep(100);
                        executorService.execute(threads.get(i++ % queueLength).clone());
                    });
                }
                sleep(3);
            }
            logger.info("Compensation thread terminated!");
        }

        public void stop(a) {
            key = false; }}}Copy the code

Other supporting tag classes, statistics classes are still waiting to be modified, relatively simple, here do not put the code.


The public number FunTester was first launched, original sharing enthusiasts, Tencent Cloud, Open Source China and Gold excavation community home page recommendation, Zhihu pseudo-eight original author, welcome to pay attention to, exchange, prohibit unauthorized reprinting by third parties.

FunTester hot text selection

  • Programming thinking for everyone
  • 2020 Tester self-improvement
  • Fiddler Everywhere is the future
  • Test development engineer work skills
  • Selenium4 IDE, it’s finally here
  • Automated test soul three questions: What, why, and what to do
  • Why is test coverage so important
  • Let me tell you something. I don’t want to be tested
  • Automated testing framework
  • End-to-end testing in Agile