Now the enterprise for software testing personnel more and more high requirements, automation has been one of the basic professional skills of intermediate software testing engineers, so today we come to build an interface automation framework to play

Since it is a framework, then there must be a structural design concept, we use this is the way of layered subcontracting, to achieve high cohesion and low coupling object-oriented programming.

Here I use unittest+requests+htmltestrunner+ DDT framework

Start by creating folders with different functions:

1.common package put public scripts, such as database connection, write logs, send requests, and other things to read data ah, take token ah, read configuration file to throw it into the end

Conf package mainly contains configuration files, such as test environment, SSH information, database information, etc., which is convenient for maintenance and parameterization

3. Data package is used to store test data. I used Excel here and I used YML before

The testCase package is used to store test cases

5. TestLogs is used to store log files. Log files are generated each time the script is run and stored in this folder

6. TestReport package to put htmlTestrunner generated test reports

7. Leave a run method as the entry point for the framework to run

After that, it’s time to start writing scripts.

I write is connect to the database scripts, because use SSH to connect mysql before, this time want to give it a try, and then a operation, without success, on the Internet to find a script, you change yourself, become, my IP and port number is used in the transmission way, so if you have bosses want to use, will be changed, don’t when the time comes to scold me:


import pymysql
from sshtunnel import SSHTunnelForwarder
from API.common.readConfig import ReadConfig

class DataBase:

    def __init__(self):
        read = ReadConfig()

        self.server =  SSHTunnelForwarder(
            read.get_config_str("ssh"."sshHost"), read.get_config_int("ssh"."sshPort"),  # SSH IP and port
            ssh_password = read.get_config_str("ssh"."sshPassword"),    # SSH password
            ssh_username = read.get_config_str("ssh"."sshUser"),        # SSH account
            remote_bind_address = (read.get_config_str("mysql"."dbHost"), read.get_config_int("mysql"."dbPort")))The IP address and port where the database resides

        # start service
        self.server.start()
        Check whether the local port is configured correctly
        print(self.server.local_bind_host)


    def Get_db(self,sql):
        read = ReadConfig()

        self.goDB = pymysql.connect(host = read.get_config_str("mysql"."dbHost"),     # fixed
                       port = self.server.local_bind_port,
                       user = read.get_config_str("mysql"."dbUser"),                  # database account
                       passwd = read.get_config_str("mysql"."dbPassword"),            # database password
                        db = read.get_config_str("mysql"."dbName"))                   Mysql > select * from 'mysql'; select * from 'mysql'; select * from 'mysql'


        cur = self.goDB.cursor()

        try:
            SQL statement to check whether the connection is successful
            cur.execute("select version()")
            result = cur.fetchone()
            print("Database version: %s" % result)
            cur.execute(sql)
            data = cur.fetchall()

            return data

        except:
            print("Error")

# close the connection
    def __del__(self):
        self.goDB.close()
        self.server.close()

Copy the code

Then there is the file that encapsulates the logging class:

import logging
import time

class GetLog:

    def __init__(self):
        curTime = time.strftime('%Y-%m-%d')
        self.logname = 'testLogs/AutoTest' + '_' + curTime + '.txt'

    def get_log(self,level,msg):

        Create a log collector
        logger = logging.getLogger()
        logger.setLevel('DEBUG')

        # to create a handler
        fh = logging.FileHandler(self.logname,'a',encoding='gbk')
        fh.setLevel('INFO')
        ch = logging.StreamHandler()
        ch.setLevel('INFO')

        Define the output format of this handler
        formatter = logging.Formatter('%(asctime)s - %(filename)s - %(name)s - %(LevelName)s -)
        ch.setFormatter(formatter)
        fh.setFormatter(formatter)

        Add handler to Logger
        logger.addHandler(fh)
        logger.addHandler(ch)


        if level == 'DEBUG':
            logger.debug(msg)
        elif level == 'INFO':
            logger.info(msg)
        elif level == 'WARNING':
            logger.warning(msg)
        elif level == 'ERROR':
            logger.error(msg)
        elif level == 'CRITICAL':
            logger.critical(msg)

        logger.removeHandler(fh)
        logger.removeHandler(ch)
        fh.close()

    def log_debug(self,msg):
        self.get_log('DEBUG',msg)

    def log_info(self,msg):
        self.get_log('INFO',msg)

    def log_warning(self,msg):
        self.get_log('WARNING',msg)

    def log_error(self,msg):
        self.get_log('ERROR',msg)

    def log_critical(self,msg):
        self.get_log('CRITICAL',msg)
Copy the code

Then encapsulate the requesting class:


import requests

class HttpRequest:

    def __init__(self,method,url,data=None,token=None,headers=None):
        try:
            if method == 'GET':
                self.response = requests.get(url=url,params=data,headers=headers)
            elif method == 'POST':
                self.response = requests.post(url=url,data=data,headers=headers)
            elif method == 'detele':
                self.response = requests.delete(url=url,data=data,headers=headers)
        except Exception as e:
            raise e

    def get_status_code(self):
        return self.response.status_code

    def get_text(self):
        return self.response.text

    def get_json(self):
        return self.response.json()



Copy the code

Read and write Excel classes:

import openpyxl


class Case:

    def __init__(self):
        self.case_id = None
        self.url = None
        self.data = None
        self.title = None
        self.method = None
        self.expected = None

class ReadExcel:

    def __init__(self,file_name):
        try:
            self.filename = file_name
            self.wb = openpyxl.load_workbook(self.filename)
        except FileNotFoundError as e:
            print('{0} not found , please check file path'.format(self.filename))
            raise e

    def get_cases(self,sheet_name):
        sheet = self.wb[sheet_name]
        max_row =  sheet.max_row
        test_cases = []
        for r in range(2,max_row+1):
            case = Case() Instantiate a data object to store the read test data
            case.case_id = sheet.cell(row=r,column=1).value
            case.title = sheet.cell(row=r,column=2).value
            case.method = sheet.cell(row=r,column=3).value
            case.url = sheet.cell(row=r,column=4).value
            case.data = sheet.cell(row=r,column=5).value
            case.expected = sheet.cell(row=r,column=6).value
            case.header = sheet.cell(row=r,column=7).value
            test_cases.append(case)
        return test_cases

    Get a list of all the sheets in the workbook
    def get_sheet_name(self):
        return self.wb.sheetnames

    Locate the sheet based on sheet_name, locate the row based on case_id, and write result
    def write_result(self,sheet_name,case_id,actual,result):
        sheet = self.wb[sheet_name]
        max_row = sheet.max_row
        for r in range(2,max_row+1) :Select case_id from row r, column 1
            case_id_r = sheet.cell(row=r,column=1).value
            Check whether the case_id_r of the current line read by Excel is the same as the case_id passed in
            if case_id_r == case_id:
                sheet.cell(row=r,column=7).value = actual
                sheet.cell(row=r,column=8).value = result
                self.wb.save(filename=self.filename)
                break

Copy the code

Classes that read configuration files:


import configparser



class ReadConfig:

    Instantiate the conf object and read the incoming configuration file
    def __init__(self):
        self.conf = configparser.ConfigParser()
        self.conf.read("conf/config.ini")

    def get_config_str(self,section,option):
        return self.conf.get(section,option)

    def get_config_boolean(self,section,option):
        return self.conf.getboolean(section,option)

    def get_config_int(self,section,option):
        return self.conf.getint(section,option)

    def get_config_float(self,section,option):
        return self.conf.getfloat(section,option)

    def get_items(self,section):
        return self.conf.items(section)


Copy the code

At this point, the methods in the common package are almost written. I added a get_token class because of the design of the system under test, so I won’t write it here.

Then there is the run method:

import unittest
from HTMLTestRunner import HTMLTestRunner
import time
from API.testCase.test_1 import TestRecharge

from API.common.getlog import GetLog

get_log = GetLog()

def RunTest(a):
    suite  = unittest.TestSuite()
    loader = unittest.TestLoader()
    suite.addTest(loader.loadTestsFromTestCase(TestRecharge))
    cur_time = time.strftime('%Y-%m-%d_%H_%M_%S')
    report_name = 'testReport/test_results' + cur_time + '.html'

    with open(report_name,'wb+') as file:
        runner = HTMLTestRunner.HTMLTestRunner(stream=file,
                                               verbosity=2,
                                               title='Interface Test Report',
                                               description='Automated testing of data-driven interfaces based on Python + UnitTest',
                                               )
        runner.run(suite)

if __name__ == '__main__':
    get_log.log_info('" ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ Api Request this Start ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ "')
    RunTest()
    get_log.log_info('" ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ Api Request this End ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ ˉ "')


Copy the code

Test_case = testCase; testCase = testCase; testCase = testCase;


from API.common.readConfig import ReadConfig
from API.common.dataBase import DataBase
from API.common.http_request import HttpRequest
from API.common.oper_token import Get_token
from API.common.read_excel import ReadExcel
from API.common.getlog import GetLog
from ddt import ddt,data
import json
import unittest


read_config = ReadConfig()
url_pre = read_config.get_config_str('api'.'hostName')

# Read excel, get login test data

read_excel = ReadExcel("data/123.xlsx")
recharge_cases = read_excel.get_cases("Sheet1")

get_log = GetLog()

@ddt
class TestRecharge(unittest.TestCase):

    def setUp(self):
        print('Test Start')

    def tearDown(self):
        print('Test End')

    @data(*recharge_cases)
    def test_recharge(self,case):
        url = url_pre + case.url
        token=Get_token().get_token()
        try:
            payload= json.loads(case.data)
            payload["token"]=token
        except:
            payload=case.data
        Record the current test case information
        get_log.log_info("Test Case Info: Case_id: {0} title: {1} method: {2} URL: {3} Data: {4} Expected: {5} Header: {6}".format(case.case_id,case.title,case.method,url,payload,case.expected,case.header))


        response = HttpRequest(method=case.method,url=url,data=payload,headers=eval(case.header))

        actual = response.get_text()

        Record the current test case interface response information
        get_log.log_info(Test Case Request Response Result: Response: {0} actual: {1}".format(response.get_json(),actual))

        # assert and write the result to the last line of Excel
        try:
            self.assertEquals(case.expected,actual)
            read_excel.write_result('recharge',case.case_id,actual,'Pass')
            get_log.log_info('Test Result is Passed ! Case_id is {0}, title is {1} '.format(case.case_id,case.title))
        except Exception as e:
            read_excel.write_result('recharge',case.case_id,actual,'Fail')
            get_log.log_info('Test Result is Failed ! Case_id is {0}, title is {1} '.format(case.case_id,case.title))
            get_log.log_error('the Error MSG: {0}'.format(e))
            raise e

Copy the code

The libraries you need to install for this framework are:

Configparser ==5.0.0 OpenPyXL ==3.0.3 requests==2.23.0 pymysql==0.9.3 sshtunnel==0.1.5 DDT ==1.4.1 HTMLTestRunner==0.8.0Copy the code

A bamboo hat and a boat

One foot silk, one inch hook

A song and a bottle of wine

One man catches a fish all by himself