This article is participating in Python Theme Month. See the link to the event for more details

These two little things happened a few weeks ago, I have been trying to record them, but I have put them on hold. Today I have some time to write about them

A little

(1) the cause

In the last two weeks, there was no big new demand for the company’s project. I had time to complete the technical planning. After consulting with the team leader, I decided to optimize the startup of the APP, that is, to speed up the application startup and page loading.

When it comes to page loading speed, then the question comes β†’ there are so many pages, how do you know which pages need to be optimized?

The company’s APP is designed to have multiple fragments in a single Activity. How many fragments can there be?

import os


def search_all_fragment(path) :
    os.chdir(path)
    items = os.listdir(os.curdir)
    for item in items:
        path = os.path.join(item)
        # Get the last part of the path, that is, the file name
        path_split_last = path.split(os.path.sep)[-1]
        # determine if it is a directory
        if os.path.isdir(path):
            print("[-].", path)
            search_all_fragment(path)
            os.chdir(os.pardir)
        # Because ARouter is used in the project, it will generate the corresponding Fragment, which is not in the statistical category, so filter it out
        elif 'Fragment' in path_split_last and '$$' not in path_split_last:
            print("[!] ", path)
            # save only.java and.kt files
            if path_split_last.endswith('.java') or path_split_last.endswith('.kt'):
                result_list.append(path)
        else:
            print('[+]', path)


if __name__ == '__main__':
    result_list = []
    search_all_fragment(R 'Project file path')
    print(result_list)
    print('Total Fragment:'.len(result_list), "δΈͺ")
Copy the code

The script will know if there is one:

All right, so 412 fragments in total, and a bunch of scripts, so back to the topic of how to know which pages to optimize.

As it happens, just their own full burial point has to do rendering time burial point log report, and do not care about the specific implementation scheme is reliable, start Kibana, enter the filter conditions (only to view this Event), part of the log is as follows:

β‘‘ is the path of the page, just take the last Fragment, β‘’ is the renderCost, here is the render time, just get these two data.

Then, how do you analyze these logs to find out what pages need to be optimized? Patted the forehead with the group leader decided a not very reliable plan:

Count the render times of all fragments, average them, and then sort them from long to short, prioritizing the pages with long render times.

After the discussion in the morning, I read 53068377 entries in the last year’s render type log. On the way to the doctor in the afternoon, I began to think about how to do it. There are three ideas:

  • 1, Kibana support save query results exported to CSV, directly exported to CSV, adjust the CSV library or Pandas read and parse a wave;
  • 2, if you have database permission, directly query all logs, export JSON or SCV;
  • 3. Capture packets or simulate access to capture data and save it to the local, and then do batch processing;

The fact proves to leave more hind hand is right, the first two train of thought + the first half of the third train of thought all GG, listen to my eloquently come ~

(2)

The next morning to the company, ready to export CSV, read the tutorial others sent very simple, just three steps:

Enter the search criteria to query β†’ Obtain the query result Save β†’ Generate a CSV file

However, I can’t find Save, so the generate CSV button is always grey:

em… Is it a question of access? Changed the group leader to have the database access to the account, the same do not wake up, is to open such a configuration, click on Kibana Settings, all kinds of no access, so to find the background boss, get the reply is:

Can’t open this, guide hundreds of thousands of good, guide millions of servers can’t hold up directly, there is a risk, so disable this function.

The train of thought had failed…


(3)

On the second train of thought, have database permission account, execute query statement after walk script export, write two simple SQL condition query statement is not easy to get!

You can use Kibana Dev Tools to concatenate json string queries on the database:

em… The data in the bool block is the same as the data in the filter log request, copy and paste a wave, and then change the size to 1000000.

em… You can only check 10,000 items at a time. If you check more items at a time, you can only change the configuration.

I thought I could copy and paste and save the Json. Ok, 5306W pieces of data, manually copy and paste, to calculate how long it will take:

  • Change the search criteria (start and end date and time) to 10s
  • Click query to wait for the query result to be displayed β†’ within 20s
  • Create a new file, copy and paste it, and save the file name for 30s

Each save 1W of data, I need to spend at least 1 minute, when converted into, get the data to how many hours: 5306/60β‰ˆ88.5h, converted into a standard working day (8h), it takes a little more than 11 days, which does not count the rest time, it will take more than two weeks to repeat this repeated thing, no one can hold!!

Plan two is on its knees…


(4) Solution 3 Γ— √

em… Grabbed package? Analyze the parameters, and then write a crawler grab, grab a few requests and THEN I give up, there’s a SID in the Cookie that changes every time a request is made:

And unlike the response header set-cookie returned, it is clearly impossible to construct it in a short period of time. Alas, only the most low-looking analog user accesses the browser.

Tips: Later, I found that set-cookie returned only a part of the Cookies. After login, I got the Cookies, and then I replaced this part after each request

Break down the simulation steps:

  • 1. Open the login page β†’ wait for loading to complete β†’ Fill in the account and password β†’ click login
  • 2. Wait until the page finishes loading β†’ click the Dev Tools icon on the left
  • 3. Wait for the page to load β†’ clear the left query Json β†’ fill the new query Json
  • 4. Click send request β†’ wait for the query result on the right β†’ select the query result β†’ save to the local file

How convenient how to, the author directly put the query results in the TXT, analog access with Selenium, directly open:

import time

from selenium import webdriver

base_url = 'http://kibana.xxx.xxx'
login_url = base_url + '/login'
login_data = {
    'password': 'xxx'.'username': 'xxx',}Initialize the browser
def init_browser() :
    chrome_options = webdriver.ChromeOptions()
    chrome_options.add_argument(r'--start-maximized')    # Start full screen
    return webdriver.Chrome(options=chrome_options)


# Mock login
def login() :
    browser.get(login_url)
    time.sleep(10)
    inputs = browser.find_elements_by_class_name('euiFieldText')
    inputs[0].send_keys(login_data['username'])
    inputs[1].send_keys(login_data['password'])
    submit = browser.find_element_by_xpath('//button[1]')
    submit.click()


if __name__ == '__main__':
    browser = init_browser()
    login()
Copy the code

Add the following configuration Settings to the user data directory. The next time you open the browser and access it in the login state, you don’t need to log in again:

chrome_options.add_argument(r'--user-data-dir=D:\ChromeUserData')   
Copy the code

But the actual situation is no use, or jump to the login page, I don’t know why, just every time run the script to log in…

After successful login, wait for a moment to jump to the home page, wait for the load is finished, click on the icon on the left, this does not use explicit or implicit waiting, but stupid methods such as sleep death ~

The login() function finally calls the following method:

Go to the home page and click Tab
def click_tab() :
    time.sleep(8)   Feign death and wait for the page to load
    browser.find_element_by_xpath('//ul[3]/li[2]').click()
Copy the code

Then write text to the input field:

Elements locate to target location:

Instead of a normal text input field, get the outer ace_content div and try send_keys:

Def set_left_text(): inputs browser. Find_element_by_xpath ('//div[@class="ace_content"]') inputs.send_keys(' test text ')Copy the code

Error:

If you can’t set the text directly, you have to find another way.

Click on the last cursor, then keep pressing the backspace key to clear, and then simulate the keyboard to type in a letter

The changed code

Set the text on the left
def set_left_text() :
    time.sleep(5)
    cursor_div = browser.find_element_by_xpath('//div[@class="ace_cursor"]')
    cursor_div.click()
    action_chains = ActionChains(browser)
    for i in range(0.500) : action_chains.context_click(cursor_div).send_keys(Keys.BACKSPACE).perform() action_chains.context_click(cursor_div).send_keys('GET _search' + str(search_dict)).perform()
Copy the code

Empty after input, a little ghost:

This actually did a lot of invalid operation, press the back key 500 times, in fact, the character is not so much, and have to wait for it to finish the word, have to improve.

Another idea: paste copy, to achieve is:

Write the queried string to the clipboard β†’ click the cursor or content to get focus β†’ Ctrl+A select all content β†’ Roll back β†’ Ctrl+V paste the content

Code to implement a wave:

def set_left_text() :
    time.sleep(5)
    input_div = browser.find_element_by_xpath('//div[@class="ace_content"]')
    input_div.click()
    action = ActionChains(browser)
    action.key_down(Keys.CONTROL).key_down('a').key_up('a').key_up(Keys.CONTROL).perform()
    action.key_down(Keys.BACKSPACE).key_up(Keys.BACKSPACE).perform()
    action.key_down(Keys.CONTROL).key_down('v').key_up('v').key_up(Keys.CONTROL).perform()
Copy the code

Look at the effect:

Ok, simulation click on the run small button:

Click the query button
def click_submit() :
    submit_button = browser.find_element_by_xpath('//button[@data-test-subj="sendRequestButton"]')
    submit_button.click()
Copy the code

Then to the right side of the query results, direct processing some trouble, get the content node, recursively traversing all child nodes, extract text to space and line, and finally output splicing.

Then I thought of a plan:

Is it possible to intercept the requests received by selenium’s browser and write the response directly to a specific request

You can download the library and copy it to your project using browserMo-proxy:

# Enable proxy
server = Server(os.path.join(os.getcwd(), R 'browsermob - proxy - 2.1.4 \ bin \ browsermob - proxy'))
server.start()
proxy = server.create_proxy()

# Chrome add config
chrome_options.add_argument('--proxy-server={0}'.format(proxy.proxy))

# Before packet capture:
proxy.new_har(options={
    'captureContent': True.'captureHeaders': True
})

# Filter specific requests after capturing packets and save the contents to a local file:
def save_log(date_str, index) :
    for entry in proxy.har['log'] ['entries'] :if entry['request'] ['url'].endswith('path=_search&method=GET'):
            log_file_path = os.path.join(out_dir, date_str + '_' + str(index) + '.txt')
            with open(log_file_path, "w+", encoding='utf-8') as f:
                f.write(str(entry['response'] ['content'])
                    .replace("\n".' ').replace("\\n"."").replace(' '.' '))
            print("Date log saved:", log_file_path)
Copy the code

Growl, perfect, then complete the clipboard write, and query the date structure:

def set_copy_text(content) :
    w.OpenClipboard()
    w.EmptyClipboard()
    w.SetClipboardData(win32con.CF_UNICODETEXT, content)
    w.CloseClipboard()

The # construct generates a date from 20200709 to today
def init_date_list(begin_date, end_date) :
    date_list = []
    begin_date = datetime.datetime.strptime(begin_date, "%Y%m%d")
    end_date = datetime.datetime.strptime(end_date, "%Y%m%d")
    while begin_date <= end_date:
        date_str = begin_date.strftime("%Y-%m-%d")
        date_list.append(date_str)
        begin_date += datetime.timedelta(days=1)
    return date_list
Copy the code

Finally, update the request parameters to the clipboard every time the request is made, and open the proxy packet capture:

def input_query_content() :
    try:
        for pos, date in enumerate(str_date_list[]):
            for index in range(1.3):
                input_div = browser.find_element_by_xpath('//div[@class="ace_content"]')
                input_div.click()
                action = ActionChains(browser)
                print(str(pos + 1) + ", Request date:" + date + "-" + ("First half day" if (index == 1) else "The second half."))
                update_dict_and_proxy(date, index)
                action.key_down(Keys.CONTROL).key_down('a').key_up('a').key_up(Keys.CONTROL).perform()
                set_copy_text('GET _search' + '\n' + str(search_dict).replace("'".'"'))
                time.sleep(1)
                action.key_down(Keys.BACKSPACE).key_up(Keys.BACKSPACE).perform()
                action.key_down(Keys.CONTROL).key_down('v').key_up('v').key_up(Keys.CONTROL).perform()
                submit_button = browser.find_element_by_xpath('//button[@data-test-subj="sendRequestButton"]')
                submit_button.click()
                time.sleep(20)
                save_log(date, index)
    except Exception as e:
        print(e)
        proxy.close()
        browser.close()

Update the request dictionary and create a captured packet
def update_dict_and_proxy(date_str, index) :
    gte_str = date_str + 'T00:00:00. 000 z' if (index == 1) else date_str + 'T12:00:00. 000 z'
    lte_str = date_str + 'T12:00:01onsaturday (UK time). 000 z' if (index == 1) else date_str + 'T23:59:59. 000 z'
    search_dict['query'] ['bool'] ['filter'] [20] ['range'] ['time'] ['gte'] = gte_str
    search_dict['query'] ['bool'] ['filter'] [21] ['range'] ['time'] ['lte'] = lte_str
    proxy.new_har(options={
        'captureContent': True.'captureHeaders': True
    })
Copy the code

Scripts run, you can start to hang up, it is recommended to find a idle computer hanging, because the script will occupy the clipboard, will affect the normal work oh! In addition, the query time is divided into up and down, in order to query the required data as much as possible.

Door scripting language is really sweet ah! The original work entrusted to the automation scripts, efficiency is twice as high, 2 w data as long as 1 minute, gather all the data of time-consuming plunged at least 44 hours, the machine can run 24 hours a day, so you only need two days, also does not affect the work I do (mo) as a (yu), again, of course, can also be optimized, the script to deploy on multiple computer to perform at the same time, Cut and cut again, the original two weeks of work, less than a day to finish. Still not sweet?

Fortunately, there were only 600W valid data in fact. The burial point of the original rendering event was added in October of the previous year, so it took five minutes for the single machine to climb the data.

Write a script in the morning, run in the afternoon, and write down the statistics script in the middle, this Part is very Easy.

data_pattern = re.compile('pagePath":"(.*?) ". *?" renderCost","value":(.*?) } ', re.S)
Copy the code

Read the contents of the file, match the full text, iterate the matching results, do processing on the two groups in turn, and then write the rendering time to the page. TXT file:

for log in log_list:
    print("Parse the file:", log)
    with open(log, 'r', encoding='utf8') as f:
        content = f.read()
    data_result = data_pattern.findall(content)
    if data_result is not None:
        for data in data_result:
            page_name = ' '
            page_render_time = 0
            page_split = data[0].split(The '-')
            if page_split is not None and len(page_split) > 0:
                other_page_split = page_split[-1].split(",")
                if other_page_split is not None and len(other_page_split) > 0:
                    page_name = other_page_split[-1]
                else:
                    page_name = page_split[-1]
            else:
                other_page_split = data[0].split(",")
                if other_page_split is not None and len(other_page_split) > 0:
                    page_name = other_page_split[-1]
                else:
                    page_name = data[0]
            page_render_time = data[1].replace('"'.' ')
            if page_name == 'i':
                print(data)
                break
            cp_utils.write_str_data(page_render_time, os.path.join(page_dir, page_name + ".txt"))
Copy the code

The following is an example:

Then again, through the folder, the dictionary stores the data (page: average), and saves the statistics:

def average_time(file_path) :
    page_dir_split = file_path.split(os.path.sep)
    if page_dir_split is not None and len(page_dir_split) > 0:
        result_average = 0
        render_time_list = cp_utils.load_list_from_file(file_path)
        for render_time in render_time_list:
            ifrender_time ! ='0':
                result_average = int((result_average + int(render_time)) / 2)
        print(page_dir_split[-1] + "The results have been calculated...")
        cp_utils.write_str_data(page_dir_split[-1] + "-" + str(result_average), result_file)
    else:
        print("Abnormal exit")

Copy the code

Finally, output in reverse order to the file:

def order_list(file_path) :
    time_info_list = cp_utils.load_list_from_file(file_path)
    order_list_result = sorted(time_info_list, key=lambda x: int(x.split(The '-')[-1]), reverse=True)
    cp_utils.write_list_data(order_list_result, result_order_file)
Copy the code

This gives you the average rendering time of the page. Of course, the average is not reliable, because there are too many variables:

Device hardware is different, the loading speed must be different, some users are active, some are not active, there are versions and so on…

Unreliable, but wanted to rely on the data of the full burial site to do something, thought about it and decided on another plan:

According to the page use frequency sorting, priority for users commonly used pages for optimization, such as this version optimization of two commonly used pages, pick a few typical specific devices to track, release a period of time that the two pages of new data compared with the old data, you can optimize the benefits of quantification ~

Of course, this is a later story, I can’t imagine how I would have solved these problems if I wasn’t good at Python…


Small 2

The second small thing compared to the first thing on the small, gay friend A, let me help him get some industry report, all of A while to A few urls, the beginning of A few good, is A simulation request, parsing pages, take an ID what splice, get real PDF download link, and then download.

For the next few sites, they simply post each page of the PDF as a picture, such as:

To convert an image to A PDF, if you don’t have a scripting language, you need to right-click each image and save it locally, and then use a compositing tool to convert the image to a PDF.

I found img2PDF. The API is simple, so it’s ok to use it, but if the image has an Alpha channel, it will report an error directly, so you need to go to it yourself. Easy, you can do this using the Pillow library:

from PIL import Image, ImageFont, ImageDraw

# Batch RGBA image conversion, while deleting invalid files
def remove_pic_alpha(pic) :
    try:
        img = Image.open(pic)
        if img.mode == "RGBA":
            img = img.convert("RGB")
            img.save(pic)
            print("Transform picture:", pic)
    except Exception as e:
        print("File exception, removed:" + pic)
        os.remove(pic)
Copy the code

The simple conversion code is as follows:

import img2pdf

try:
    with open(pdf_path, "wb+") as f:
        f.write(img2pdf.convert(pic_path_list))
    print("Output PDF file:", pdf_path)
except Exception as e:
    print("Abnormal occurrence:", pdf_path)
Copy the code

Later found a large, 851 pages, a total of 13950 valid report, there are some report page structure is not a pure image, but similar to: words – words – pictures, so don’t want to put words to miss, you can put it into pictures, is to use pillow library, according to certain rules, draw the text on a white background.

def font2pic(content, pic_path) :
    # First convert to a list
    content_list = list(content)
    i = 30
    while i < len(content_list):
        content_list.insert(i, "\n")
        i += 30
    content_after = ' '.join(content_list)
    im = Image.new("RGB", (960.720), (255.255.255))
    dr = ImageDraw.Draw(im)
    font = ImageFont.truetype(os.path.join("fonts"."msyh.ttf"), 24)
    dr.text((50.50), content_after, font=font, fill="# 000000")
    im.save(pic_path)
Copy the code

When you crawl the page, you have to record the order and use it as the image name. An example of a temporary file to crawl is as follows:

Then it is to iterate through each line of the file, text generated images, image links to perform download (you can also batch download after replacing the URL), the RESULTING PDF sample is as follows:

Text rendering is less brain, not very beautiful, the specific rendering rules have to be a long term plan, but these are later, data, you want to deal with, can ~


summary

When we need to do a lot of repetitive tasks, scripting is great: as long as the program is robust enough to run for 24 hours without getting tired, deploying the script to multiple machines can also reduce the time to complete. Of course, scripts are dead, people are alive, and some of the problems are not taken into account, scripts die in the middle of the process, so large tasks also need to introduce an alarm and log system, so that timely follow-up and error detection can quickly locate problems.

Bat for Windows,.sh for Linux, C Shell, JavaScript, Lua, etc. The main reason why I prefer Python is that it has a lot of libraries, and you can find third-party libraries for almost anything you can think of.

Life is short. I use Python