Multiple threads (connection pooling) operate MySQL to insert data

What I learned from this blog:

  • First, you can build connected databasesThe connection pool, so that you canMultiple open connection, join different tables at the same time for query,insertFor multithreading operation database foundation
  • Multithreading According to the way of multi-connection, in order to complete multi-language warehousing operation, we can enable multithreading for parallel operation of data in different languages
  • Cur.executemany = cur.executemany = cur.executemany = cur.executemany = cur.executemany = cur.executemany = cur.executemany = cur.executemany = cur.executemany = cur.executemany

1. Main modules

DBUtils: The Threading module suite that allows connections between multi-threaded applications and databases

2. Create a connection pool

PooledDB

Mincached: The minimum number of idle connections. If the number of idle connections is less than this number, the Pool automatically creates new connections. Maxcached: Indicates the maximum number of idle connections. If the number of idle connections is greater than this, the Pool closes idle connections. Maxconnections: The maximum number of connections; Blocking: if True, the application will wait until the current number of connections is less than the maximum number of connections. If False, an error is reported.

def mysql_connection() :
    maxconnections = 15  # Maximum number of connections
    pool = PooledDB(
        pymysql,
        maxconnections,
        host='localhost',
        user='root',
        port=3306,
        passwd='123456',
        db='test_DB',
        use_unicode=True)
    return pool

# Usage
pool = mysql_connection()
con = pool.connection()
Copy the code

3. Data preprocessing

Four copies of virtual data were prepared for testing, with 100,000, 500,000, 1,000,000 and 5,000,000 lines of data respectively

MySQL > alter table table_name

Data processing ideas:

  • Each line contains one record, and each field is separated by the TAB character “\ T”, with double quotation marks.
  • The read data type is Bytes;
  • The result is a nested list format for a multi-threaded loop that processes 100,000 rows per task at a time; Format: [[(A, B, C, D), (A, B, C, D), (A, B, C, D),…], [(A, B, C, D), (A, B, C, D), (A, B, C, D),…], [],…
import re
import time

st = time.time()
with open("10w.txt"."rb") as f:
    data = []
    for line in f:
        line = re.sub("\s"."".str(line, encoding="utf-8"))
        line = tuple(line[1: -1].split("\" \ ""))
        data.append(line)
    n = 100000  Split into nested lists in the smallest unit per 100,000 rows of data
    result = [data[i:i + n] for i in range(0.len(data), n)]
print("100,000 lines of data, time :{}".format(round(time.time() - st, 3)))

# 100,000 lines of data, time :0.374
# 500,000 rows of data, time :1.848
# 1 million rows of data, time :3.725
5 million rows of data, time :18.493
Copy the code

4. Threaded tasks

Each time the insert function is called, a link operation is taken from the pool and the link is closed. Executemany Batch operations reduce commit times and improve efficiency.

def mysql_insert(*args) :
    con = pool.connection()
    cur = con.cursor()
    sql = "INSERT INTO test(sku,fnsku,asin,shopid) VALUES(%s, %s, %s, %s)"
    try:
        cur.executemany(sql, *args)
        con.commit()
    except Exception as e:
        con.rollback()  Transaction rollback
        print(SQL execution error, cause:, e)
    finally:
        cur.close()
        con.close()
Copy the code

5. Start multithreading

Code thread:

Set maximum queue number, the value must be less than the maximum number of connections of the pool, or create a thread task need connection can’t satisfy, complains: pymysql. Err. OperationalError: (1040, ‘Too many connections’) the loop preprocesses the list data and adds tasks to the queue. If the queue reaches the maximum value or the current task is the last one, the queue starts to execute tasks in multiple threads until the queue is empty.

def task() :
    q = Queue(maxsize=10)  # set the maximum number of queues and threads
    # data: Preprocessed data (nested list)
    while data:
        content = data.pop()
        t = threading.Thread(target=mysql_insert, args=(content,))
        q.put(t)
        if (q.full() == True) or (len(data)) == 0:
            thread_list = []
            while q.empty() == False:
                t = q.get()
                thread_list.append(t)
                t.start()
            for t in thread_list:
                t.join()
Copy the code

6. Complete example

import pymysql
import threading
import re
import time
from queue import Queue
from DBUtils.PooledDB import PooledDB

class ThreadInsert(object) :
    MySQL > insert data into MySQL
    def __init__(self) :
        start_time = time.time()
        self.pool = self.mysql_connection()
        self.data = self.getData()
        self.mysql_delete()
        self.task()
        print("========= Data insertion, total time :{}' =========".format(round(time.time() - start_time, 3)))
        
    def mysql_connection(self) :
        maxconnections = 15  # Maximum number of connections
        pool = PooledDB(
            pymysql,
            maxconnections,
            host='localhost',
            user='root',
            port=3306,
            passwd='123456',
            db='test_DB',
            use_unicode=True)
        return pool

    def getData(self) :
        st = time.time()
        with open("10w.txt"."rb") as f:
            data = []
            for line in f:
                line = re.sub("\s"."".str(line, encoding="utf-8"))
                line = tuple(line[1: -1].split("\" \ ""))
                data.append(line)
        n = 100000    Split into nested lists in the smallest unit per 100,000 rows of data
        result = [data[i:i + n] for i in range(0.len(data), n)]
        print("Total {} group data, each group {} elements.==>> Time :{}'s".format(len(result), n, round(time.time() - st, 3)))
        return result

    def mysql_delete(self) :
        st = time.time()
        con = self.pool.connection()
        cur = con.cursor()
        sql = "TRUNCATE TABLE test"
        cur.execute(sql)
        con.commit()
        cur.close()
        con.close()
        print("Clear the original data.==>> Time :{}'s".format(round(time.time() - st, 3)))

    def mysql_insert(self, *args) :
        con = self.pool.connection()
        cur = con.cursor()
        sql = "INSERT INTO test(sku, fnsku, asin, shopid) VALUES(%s, %s, %s, %s)"
        try:
            cur.executemany(sql, *args)
            con.commit()
        except Exception as e:
            con.rollback()  Transaction rollback
            print(SQL execution error, cause:, e)
        finally:
            cur.close()
            con.close()

    def task(self) :
        q = Queue(maxsize=10)  # set the maximum number of queues and threads
        st = time.time()
        while self.data:
            content = self.data.pop()
            t = threading.Thread(target=self.mysql_insert, args=(content,))
            q.put(t)
            if (q.full() == True) or (len(self.data)) == 0:
                thread_list = []
                while q.empty() == False:
                    t = q.get()
                    thread_list.append(t)
                    t.start()
                for t in thread_list:
                    t.join()
        print("Data insertion complete.==>> Time :{}'s".format(round(time.time() - st, 3)))

if __name__ == '__main__':
    ThreadInsert()
Copy the code

Insert data comparison

A total of obtaining1Groups of data, each group100000== >> Time:0.374Clear the original data == >> > Time:0.031'Data is inserted successfully.== >> > Time:2.499S =============== 10w data insertion time:3.092S ===============5Groups of data, each group100000== >> Time:1.745Clear the original data == >> > Time:0.0'Data is inserted successfully.== >> > Time:16.129S =============== 50w data insertion time:17.969S ===============10Groups of data, each group100000== >> Time:3.858Clear the original data == >> > Time:0.028'Data is inserted successfully.== >> > Time:41.269S =============== 100w data insertion time:45.257S ===============50Groups of data, each group100000== >> Time:19.478Clear the original data == >> > Time:0.016'Data is inserted successfully.== >> > Time:317.346S =============== 500w data insertion, total time:337.053's = = = = = = = = = = = = = = =Copy the code

7. Think/summarize

Thinking: multi-threaded + queue can basically meet the needs of daily work, but there is still insufficient to think carefully; In this example, 10 thread tasks are executed at a time. The queue can be added only after the 10 tasks are completed, which will cause the queue to be idle. If the remaining 1 task is not completed, the idle number is 9, and the resources and time in the task are wasted; Is it possible to keep the queue full and refill it every time a task is completed?