By JackTian editor: JackTian source: public number “jack elder brother’s IT journey” ID: Jake_Internet original text link: super core! 11 very useful Python and Shell scripts to use examples! Please contact authorization for reprinting (wechat ID: Hc220088)

Concern public number: “Jie Ge’s IT journey” background reply: “script collection” can obtain all the script instance files of this article concern public number: “Jie ge’s IT journey” background reply: “WX” can invite you to join the reader exchange group

Hello, I’m JackTian.

In my last post, I shared 7 Very useful Shell Scripts! From the data of reading, liking, viewing and leaving comments on this article, it is very popular among readers. I have to say, scripts in our daily work can improve the efficiency of a lot of work, really delicious!

This time I will share with you a wave of scripts I use in my work, mainly divided into two parts: Python and Shell.

Python script examples include enterprise wechat alarm, FTP client, SSH client, Saltstack client, vCenter client, obtaining the SSL certificate expiration time of the domain name, and sending today’s weather forecast and future weather trend charts.

Examples of Shell scripts: SVN complete backup, Zabbix monitoring user password expiration, building local YUM and reader requirements in the previous article (when the load is high, find out the process scripts that occupy a lot and store or push notifications);

The space is a little long, but also please be patient to turn to the end of the article, after all, there are eggs.

Python Script section

Enterprise wechat alarm

This script generates wechat alarms through the enterprise wechat application and can be used for Zabbix monitoring.

# -*- coding: utf-8 -*-


import requests
import json


class DLF:
    def __init__(self, corpid, corpsecret) :
        self.url = "https://qyapi.weixin.qq.com/cgi-bin"
        self.corpid = corpid
        self.corpsecret = corpsecret
        self._token = self._get_token()

    def _get_token(self) :
        Access_token for enterprise wechat API :return: access_token
        token_url = self.url + "/gettoken? corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
        try:
            res = requests.get(token_url).json()
            token = res['access_token']
            return token
        except Exception as e:
            return str(e)

    def _get_media_id(self, file_obj) :
        get_media_url = self.url + "/media/upload? access_token={}&type=file".format(self._token)
        data = {"media": file_obj}

        try:
            res = requests.post(url=get_media_url, files=data)
            media_id = res.json()['media_id']
            return media_id
        except Exception as e:
            return str(e)

    def send_text(self, agentid, content, touser=None, toparty=None) :
        send_msg_url = self.url + "/message/send? access_token=%s" % (self._token)
        send_data = {
            "touser": touser,
            "toparty": toparty,
            "msgtype": "text"."agentid": agentid,
            "text": {
                "content": content
            }
        }

        try:
            res = requests.post(send_msg_url, data=json.dumps(send_data))
        except Exception as e:
            return str(e)

    def send_image(self, agentid, file_obj, touser=None, toparty=None) :
        media_id = self._get_media_id(file_obj)
        send_msg_url = self.url + "/message/send? access_token=%s" % (self._token)
        send_data = {
            "touser": touser,
            "toparty": toparty,
            "msgtype": "image"."agentid": agentid,
            "image": {
                "media_id": media_id
           }
        }

        try:
            res = requests.post(send_msg_url, data=json.dumps(send_data))
        except Exception as e:
            return str(e)
Copy the code

FTP client

Use the FTPLIB module to perform operations such as uploading and downloading on the FTP server.

# -*- coding: utf-8 -*-

from ftplib import FTP
from os import path
import copy


class FTPClient:
    def __init__(self, host, user, passwd, port=21) :
        self.host = host
        self.user = user
        self.passwd = passwd
        self.port = port
        self.res = {'status': True.'msg': None}
        self._ftp = None
        self._login()

    def _login(self) :
        Log in to FTP server :return: Error message is returned when connection or login is abnormal.
        try:
            self._ftp = FTP()
            self._ftp.connect(self.host, self.port, timeout=30)
            self._ftp.login(self.user, self.passwd)
        except Exception as e:
            return e

    def upload(self, localpath, remotepath=None) :
        "Upload FTP file :param localPath: local file path: param remotePath: remote file path: return:"
        if not localpath: return 'Please select a local file. '
        # Read local file
        # fp = open(localpath, 'rb')

        If the remote file path is not passed, upload the file to the current directory with the same name as the local file
        if not remotepath:
            remotepath = path.basename(localpath)

        # upload file
        self._ftp.storbinary('STOR ' + remotepath, localpath)
        # fp.close()

    def download(self, remotepath, localpath=None) :
        ''' localpath :param localpath: local file path :param remotepath: remote file path :return: '''

        if not remotepath: return 'Please select a remote file. '
        If the local file path is not passed, the file will be downloaded to the current directory with the same name as the remote file
        if not localpath:
            localpath = path.basename(remotepath)
        Select basename from remotePath if localPath is a directory
        if path.isdir(localpath):
            localpath = path.join(localpath, path.basename(remotepath))

        Write to a local file
        fp = open(localpath, 'wb')

        # Download file
        self._ftp.retrbinary('RETR ' + remotepath, fp.write)
        fp.close()

    def nlst(self, dir='/') :
        Return: Returns all contents of a directory as a list.
        files_list = self._ftp.nlst(dir)
        return files_list

    def rmd(self, dir=None) :
        "Delete directory :param dir: directory name :return: execution result"
        if not dir: return 'Please input dirname'
        res = copy.deepcopy(self.res)
        try:
            del_d = self._ftp.rmd(dir)
            res['msg'] = del_d
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)

        return res

    def mkd(self, dir=None) :
        Create directory :param dir: directory name :return: result of execution
        if not dir: return 'Please input dirname'
        res = copy.deepcopy(self.res)
        try:
            mkd_d = self._ftp.mkd(dir)
            res['msg'] = mkd_d
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)

        return res

    def del_file(self, filename=None) :
        "Delete file :param filename: filename: return: execution result"
        if not filename: return 'Please input filename'
        res = copy.deepcopy(self.res)
        try:
            del_f = self._ftp.delete(filename)
            res['msg'] = del_f
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)

        return res

    def get_file_size(self, filenames=[]) :
        Param filename: filename: return: execution result ""
        if not filenames: return {'msg': 'This is an empty directory'}
        res_l = []
        for file in filenames:
            res_d = {}
            # Error if directory or file does not exist
            try:
                size = self._ftp.size(file)
                type = 'f'
            except:
                / (/dir/)
                size = The '-'
                type = 'd'
                file = file + '/'

            res_d['filename'] = file
            res_d['size'] = size
            res_d['type'] = type
            res_l.append(res_d)

        return res_l

    def rename(self, old_name=None, new_name=None) :
        Rename :param old_name: old file or directory name: param new_name: new file or directory name: return: execution result ""
        if not old_name or not new_name: return 'Please input old_name and new_name'
        res = copy.deepcopy(self.res)
        try:
            rename_f = self._ftp.rename(old_name, new_name)
            res['msg'] = rename_f
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)

        return res

    def close(self) :
        Exit FTP connection :return: ""
        try:
            Send the quit command to the server
            self._ftp.quit()
        except Exception:
            return 'No response from server'
        finally:
            The client closes the connection unilaterally
            self._ftp.close()
Copy the code

SSH client

This script is only used for key connection. If password connection is required, simply change it.

# -*- coding: utf-8 -*-

import paramiko

class SSHClient:
    def __init__(self, host, port, user, pkey) :
        self.ssh_host = host
        self.ssh_port = port
        self.ssh_user = user
        self.private_key = paramiko.RSAKey.from_private_key_file(pkey)
        self.ssh = None
        self._connect()

    def _connect(self) :
        self.ssh = paramiko.SSHClient()
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10)
        except:
            return 'ssh connect fail'

    def execute_command(self, command) :
        stdin, stdout, stderr = self.ssh.exec_command(command)
        out = stdout.read()
        err = stderr.read()
        return out, err

    def close(self) :
        self.ssh.close()
Copy the code

Saltstack client

Perform operations on the Saltstack server through the API and execute commands.

#! /usr/bin/env python
# -*- coding:utf-8 -*-


import requests
import json
import copy


class SaltApi:
    """ Class that defines the SALT API interface initializes to obtain token """
    def __init__(self) :
        self.url = "http://172.85.10.21:8000/"
        self.username = "saltapi"
        self.password = "saltapi"
        self.headers = {"Content-type": "application/json"}
        self.params = {'client': 'local'.'fun': None.'tgt': None.'arg': None}
        self.login_url = self.url + "login"
        self.login_params = {'username': self.username, 'password': self.password, 'eauth': 'pam'}
        self.token = self.get_data(self.login_url, self.login_params)['token']
        self.headers['X-Auth-Token'] = self.token

    def get_data(self, url, params) :
        "" Request URL to get data :param URL: request URL address :param params: parameter passed to url: return: result of request" "
        send_data = json.dumps(params)
        request = requests.post(url, data=send_data, headers=self.headers)
        response = request.json()
        result = dict(response)
        return result['return'] [0]

    def get_auth_keys(self) :
        Get all authenticated keys :return: ""
        data = copy.deepcopy(self.params)
        data['client'] = 'wheel'
        data['fun'] = 'key.list_all'
        result = self.get_data(self.url, data)
        try:
            return result['data'] ['return'] ['minions']
        except Exception as e:
            return str(e)

    def get_grains(self, tgt, arg='id') :
        Get basic system information: TGT: target host :return: """
        data = copy.deepcopy(self.params)
        if tgt:
            data['tgt'] = tgt
        else:
            data['tgt'] = The '*'
        data['fun'] = 'grains.item'
        data['arg'] = arg
        result = self.get_data(self.url, data)

        return result


    def execute_command(self, tgt, fun='cmd.run', arg=None, tgt_type='list', salt_async=False) :
        Run 'command' :param TGT: target host: Param fun: Module methods can be null :param arg: Pass parameters can be null :return: Execution Result
        data = copy.deepcopy(self.params)

        if not tgt: return {'status': False.'msg': 'target host not exist'}
        if not arg:
            data.pop('arg')
        else:
            data['arg'] = arg
        iftgt ! =The '*':
            data['tgt_type'] = tgt_type
        if salt_async: data['client'] = 'local_async'
        data['fun'] = fun
        data['tgt'] = tgt
        result = self.get_data(self.url, data)

        return result


    def jobs(self, fun='detail', jid=None) :
        "" Task: Param fun: active, detail :param jod: Job ID :return: task execution result ""

        data = {'client': 'runner'}
        data['fun'] = fun
        if fun == 'detail':
            if not jid: return {'success': False.'msg': 'job id is none'}
            data['fun'] = 'jobs.lookup_jid'
            data['jid'] = jid
        else:
            return {'success': False.'msg': 'fun is active or detail'}
        result = self.get_data(self.url, data)

        return result
Copy the code

More original articles on Saltstack can be found at:

VCenter client

I perform daily operations on vCenter through the official SDK. This script is used for the CMDB platform to automatically obtain host information and save it to the database.

from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSL
from pyVmomi import vim
from asset import models
import atexit


class Vmware:
    def __init__(self, ip, user, password, port, idc, vcenter_id) :
        self.ip = ip
        self.user = user
        self.password = password
        self.port = port
        self.idc_id = idc
        self.vcenter_id = vcenter_id

    def get_obj(self, content, vimtype, name=None) :
        The "" list is returned, and name can specify the matching object" ".
        container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True)
        obj = [ view for view in container.view ]
        return obj

    def get_esxi_info(self) :
        Host information
        esxi_host = {}
        res = {"connect_status": True."msg": None}

        try:
            # connect this thing
            si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60)
        except Exception as e:
            res['connect_status'] = False
            try:
                res['msg'] = ("%s Caught vmodl fault : " + e.msg) % (self.ip)
            except Exception as e:
                res['msg'] = '%s: connection error' % (self.ip)
            return res
        # disconnect this thing
        atexit.register(Disconnect, si)
        content = si.RetrieveContent()
        esxi_obj = self.get_obj(content, [vim.HostSystem])

        for esxi in esxi_obj:
            esxi_host[esxi.name] = {}
            esxi_host[esxi.name]['idc_id'] = self.idc_id
            esxi_host[esxi.name]['vcenter_id'] = self.vcenter_id
            esxi_host[esxi.name]['server_ip'] = esxi.name
            esxi_host[esxi.name]['manufacturer'] = esxi.summary.hardware.vendor
            esxi_host[esxi.name]['server_model'] = esxi.summary.hardware.model

            for i in esxi.summary.hardware.otherIdentifyingInfo:
                if isinstance(i, vim.host.SystemIdentificationInfo):
                    esxi_host[esxi.name]['server_sn'] = i.identifierValue

            # system name
            esxi_host[esxi.name]['system_name'] = esxi.summary.config.product.fullName
            # Total number of CPU cores
            esxi_cpu_total = esxi.summary.hardware.numCpuThreads
            # Total memory GB
            esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 / 1024

            Obtain the total disk size of GB
            esxi_disk_total = 0
            for ds in esxi.datastore:
                esxi_disk_total += ds.summary.capacity / 1024 / 1024 / 1024

            The default configuration is 4-core 8GB 100GB. Calculate the remaining assignable VMS based on this configuration
            default_configure = {
                'cpu': 4.'memory': 8.'disk': 100
            }

            esxi_host[esxi.name]['vm_host'] = []
            vm_usage_total_cpu = 0
            vm_usage_total_memory = 0
            vm_usage_total_disk = 0

            # Virtual machine information
            for vm in esxi.vm:
                host_info = {}
                host_info['vm_name'] = vm.name
                host_info['power_status'] = vm.runtime.powerState
                host_info['cpu_total_kernel'] = str(vm.config.hardware.numCPU) + 'nuclear'
                host_info['memory_total'] = str(vm.config.hardware.memoryMB) + 'MB'
                host_info['system_info'] = vm.config.guestFullName

                disk_info = ' '
                disk_total = 0
                for d in vm.config.hardware.device:
                    if isinstance(d, vim.vm.device.VirtualDisk):
                        disk_total += d.capacityInKB / 1024 / 1024
                        disk_info += d.deviceInfo.label + ":" +  str((d.capacityInKB) / 1024 / 1024) + ' GB' + ', '

                host_info['disk_info'] = disk_info
                esxi_host[esxi.name]['vm_host'].append(host_info)

                Calculate the current host available capacity: total - allocated
                if host_info['power_status'] = ='poweredOn':
                    vm_usage_total_cpu += vm.config.hardware.numCPU
                    vm_usage_total_disk += disk_total
                    vm_usage_total_memory += (vm.config.hardware.memoryMB / 1024)

            esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu
            esxi_memory_free = esxi_memory_total - vm_usage_total_memory
            esxi_disk_free = esxi_disk_total - vm_usage_total_disk

            esxi_host[esxi.name]['cpu_info'] = 'Total: %d core, Free: %d core ' % (esxi_cpu_total, esxi_cpu_free)
            esxi_host[esxi.name]['memory_info'] = 'Total: %dGB, Free: %dGB' % (esxi_memory_total, esxi_memory_free)
            esxi_host[esxi.name]['disk_info'] = 'Total: %dGB, Free: %dGB' % (esxi_disk_total, esxi_disk_free)

            Calculate the minimum CPU memory disk allocation based on the default, that is, the currently available resources
            if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100:
                free_allocation_vm_host = 0
            else:
                free_allocation_vm_host = int(min(
                    [
                        esxi_cpu_free / default_configure['cpu'],
                        esxi_memory_free / default_configure['memory'],
                        esxi_disk_free / default_configure['disk']
                    ]
                ))
            esxi_host[esxi.name]['free_allocation_vm_host'] = free_allocation_vm_host
        esxi_host['connect_status'] = True
        return esxi_host

    def write_to_db(self) :
        esxi_host = self.get_esxi_info()
        Failed to connect
        if not esxi_host['connect_status'] :return esxi_host

        del esxi_host['connect_status']

        for machine_ip in esxi_host:
            Physical machine information
            esxi_host_dict = esxi_host[machine_ip]
            # Virtual machine information
            virtual_host = esxi_host[machine_ip]['vm_host']
            del esxi_host[machine_ip]['vm_host']

            obj = models.EsxiHost.objects.create(**esxi_host_dict)
            obj.save()

            for host_info in virtual_host:
                host_info['management_host_id'] = obj.id
                obj2 = models.virtualHost.objects.create(**host_info)
                obj2.save()
Copy the code

Obtain the domain name SSL certificate expiration time

Used for Zabbix alarms

import re
import sys
import time
import subprocess
from datetime import datetime
from io import StringIO

def main(domain) :
    f = StringIO()
    comm = f"curl -Ivs https://{domain} --connect-timeout 10"

    result = subprocess.getstatusoutput(comm)
    f.write(result[1])

    try:
        m = re.search('start date: (.*?) \n.*? expire date: (.*?) \n.*? common name: (.*?) \n.*? issuer: CN=(.*?) \n', f.getvalue(), re.S)
        start_date = m.group(1)
        expire_date = m.group(2)
        common_name = m.group(3)
        issuer = m.group(4)
    except Exception as e:
        return 999999999

    # time string to time array
    start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")
    start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)
    # dateTime string to time array
    expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")
    expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S")

    # Remaining days
    remaining = (expire_date-datetime.now()).days

    return remaining 

if __name__ == "__main__":
    domain = sys.argv[1] 
    remaining_days = main(domain)
    print(remaining_days)
Copy the code

Send today’s forecast and charts showing future weather trends



This script is used to send today’s weather forecast and the future weather trend chart to your wife. Now wechat has banned the webpage, so it can’t be sent to wechat. I am notifyingyour wife through the enterprise wechat, and you need to drag your wife to the enterprise wechat.

# -*- coding: utf-8 -*-


    import requests
    import json
    import datetime

    def weather(city) :
        url = "http://wthrcdn.etouch.cn/weather_mini?city=%s" % city

        try:
            data = requests.get(url).json()['data']
            city = data['city']
            ganmao = data['ganmao']

            today_weather = data['forecast'] [0]
            res = "The wife today is {} \ n today weather profiles \ n city: {: < 10} \ n time: {: < 10} \ n high temperature: {: < 10} \ n low temperature: {: < 10} \ n wind: {: < 10} \ n the wind: {: < 10} \ n weather: {:<10}\n\n will send the recent temperature trend chart later, please pay attention to check. \".format(
                ganmao,
                city,
                datetime.datetime.now().strftime('%Y-%m-%d'),
                today_weather['high'].split()[1],
                today_weather['low'].split()[1],
                today_weather['fengli'].split('[') [2].split('] ') [0],
                today_weather['fengxiang'],today_weather['type'],)return {"source_data": data, "res": res}
        except Exception as e:
            return str(e) + Get weather forecast trend chart python# -*- coding: utf-8 -*-


    import matplotlib.pyplot as plt
    import re
    import datetime


    def Future_weather_states(forecast, save_path, day_num=5) :
        Param Day_num: Next few days: Return: Trend chart ""
        future_forecast = forecast
        dict= {}for i in range(day_num):
            data = []
            date = future_forecast[i]["date"]
            date = int(re.findall("\d+",date)[0])
            data.append(int(re.findall("\d+", future_forecast[i]["high"[])0]))
            data.append(int(re.findall("\d+", future_forecast[i]["low"[])0]))
            data.append(future_forecast[i]["type"])
            dict[date] = data

        data_list = sorted(dict.items())
        date=[]
        high_temperature = []
        low_temperature = []
        for each in data_list:
            date.append(each[0])
            high_temperature.append(each[1] [0])
            low_temperature.append(each[1] [1])
            fig = plt.plot(date,high_temperature,"r",date,low_temperature,"b")

        current_date = datetime.datetime.now().strftime('%Y-%m')
        plt.rcParams['font.sans-serif'] = ['SimHei']
        plt.rcParams['axes.unicode_minus'] = False
        plt.xlabel(current_date)
        plt.ylabel("℃")
        plt.legend(["Hot"."Cold"])
        plt.xticks(date)
        plt.title("Temperature trends in recent days.") plt.savefig(save_path) + send to enterprise wechat python# -*- coding: utf-8 -*-


    import requests
    import json


    class DLF:
        def __init__(self, corpid, corpsecret) :
            self.url = "https://qyapi.weixin.qq.com/cgi-bin"
            self.corpid = corpid
            self.corpsecret = corpsecret
            self._token = self._get_token()

        def _get_token(self) :
            Access_token for enterprise wechat API :return: access_token
            token_url = self.url + "/gettoken? corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
            try:
                res = requests.get(token_url).json()
                token = res['access_token']
                return token
            except Exception as e:
                return str(e)

        def _get_media_id(self, file_obj) :
            get_media_url = self.url + "/media/upload? access_token={}&type=file".format(self._token)
            data = {"media": file_obj}

            try:
                res = requests.post(url=get_media_url, files=data)
                media_id = res.json()['media_id']
                return media_id
            except Exception as e:
                return str(e)

        def send_text(self, agentid, content, touser=None, toparty=None) :
            send_msg_url = self.url + "/message/send? access_token=%s" % (self._token)
            send_data = {
                "touser": touser,
                "toparty": toparty,
                "msgtype": "text"."agentid": agentid,
                "text": {
                    "content": content
                }
            }

            try:
                res = requests.post(send_msg_url, data=json.dumps(send_data))
            except Exception as e:
                return str(e)

        def send_image(self, agentid, file_obj, touser=None, toparty=None) :
            media_id = self._get_media_id(file_obj)
            send_msg_url = self.url + "/message/send? access_token=%s" % (self._token)
            send_data = {
                "touser": touser,
                "toparty": toparty,
                "msgtype": "image"."agentid": agentid,
                "image": {
                    "media_id": media_id
               }
            }

            try:
                res = requests.post(send_msg_url, data=json.dumps(send_data))
            except Exception as e:
                return str(e) + main script# -*- coding: utf-8 -*-


from plugins.weather_forecast import weather
from plugins.trend_chart import Future_weather_states
from plugins.send_wechat import DLF
import os


# Enterprise wechat related information
corpid = "xxx"
corpsecret = "xxx"
agentid = "xxx"
Save path of weather forecast trend chart
_path = os.path.dirname(os.path.abspath(__file__))
save_path = os.path.join(_path ,'./tmp/weather_forecast.jpg')

# Get weather forecast information
content = weather("Building")

# Send text messages
dlf = DLF(corpid, corpsecret)
dlf.send_text(agentid=agentid, content=content['res'], toparty='1')

# Generate a weather forecast trend chart
Future_weather_states(content['source_data'] ['forecast'], save_path)
# Send picture messages
file_obj = open(save_path, 'rb')
dlf.send_image(agentid=agentid, toparty='1', file_obj=file_obj)
Copy the code

Shell script section

Full SVN Backup

Hotcopy is used to back up the SVN for 7 days.

#! /bin/bash
# Filename : svn_backup_repos.sh
# Date : 2020/12/14
# Author : JakeTian
# Email : JakeTian@***.com
# Crontab : 59 23 * * * /bin/bash $BASE_PATH/svn_backup_repos.sh >/dev/null 2>&1
# Notes: Add the script to crontab and execute it regularly every day
# Description: SVN is fully backed up


set -e

SRC_PATH="/opt/svndata"
DST_PATH="/data/svnbackup"
LOG_FILE="$DST_PATH/logs/svn_backup.log"
SVN_BACKUP_C="/bin/svnadmin hotcopy"
SVN_LOOK_C="/bin/svnlook youngest"
TODAY=$(date +'%F')
cd $SRC_PATH
ALL_REPOS=$(find ./ -maxdepth 1 -type d ! -name 'httpd' -a ! -name 'bak' | tr -d '/')

Create a backup directory for the script log directorytest -d $DST_PATH || mkdir -p $DST_PATH test -d $DST_PATH/logs || mkdir $DST_PATH/logs test -d $DST_PATH/$TODAY || mkdir  $DST_PATH/$TODAY# Backup repos files
for repo in $ALL_REPOS
do
    $SVN_BACKUP_C $SRC_PATH/$repo $DST_PATH/$TODAY/$repo

    Check whether the backup is complete
    if$SVN_LOOK_C $DST_PATH/$TODAY/$repo; then echo"$TODAY: $repo Backup Success" >> $LOG_FILE 
    else
        echo "$TODAY: $repo Backup Fail" >> $LOG_FILE
    fi
done

Backup user password file and permission file
cp -p authz access.conf $DST_PATH/$TODAY

Dump log files
mv $LOG_FILE $LOG_FILE-$TODAY

Delete the backup created seven days ago
seven_days_ago=$(date -d "7 days ago" +'%F')
rm -rf $DST_PATH/$seven_days_ago
Copy the code

Zabbix monitors user password expiration

Zabbix is used to monitor the expiration of the passwords of Linux users (whose shells are /bin/bash and /bin/sh) and trigger automatic user discovery within 7 days.

#! /bin/bash


diskarray=(`awk -F':' '$NF ~ /\/bin\/bash/||/\/bin\/sh/{print $1}' /etc/passwd`)
length=${#diskarray[@]}

printf "{\n"
printf  '\t'"\"data\":["
for ((i=0; i<$length; i++)) do printf'\n\t\t{'
    printf "\"{#USER_NAME}\":\"${diskarray[$i]}\"}"
    if [ $i -lt $[$length-1] ];then
            printf ', '
    fi
done
printf  "\n\t]\n"
printf "}\n"Check that the user password is expired#! /bin/bash

export LANG=en_US.UTF-8

SEVEN_DAYS_AGO=$(date -d '-7 day' +'%s')
user="$1"

# convert Sep 09, 2018 time to Unix time
expires_date=$(sudo chage -l $user | awk -F':' '/Password expires/{print $NF}' | sed -n 's/^ //p')
if [[ "$expires_date"! ="never"]]. then expires_date=$(date -d"$expires_date" +'%s')

    if [ "$expires_date" -le "$SEVEN_DAYS_AGO"]; then echo"1"
    else
        echo "0"
    fi
else
    echo "0"
fi
Copy the code

Build local YUM

Nginx allows you to synchronize only HTTP yum sites with rsync.

However, centos6 mirrors are not used recently, it seems that all domestic are disabled, if you find a suitable self-change address.

#! /bin/bash
# update yum image


RsyncCommand="rsync -rvutH -P --delete --delete-after --delay-updates --bwlimit=1000"
DIR="/app/yumData"
LogDir="$DIR/logs"
Centos6Base="$DIR/Centos6/x86_64/Base"
Centos7Base="$DIR/Centos7/x86_64/Base"
Centos6Epel="$DIR/Centos6/x86_64/Epel"
Centos7Epel="$DIR/Centos7/x86_64/Epel"
Centos6Salt="$DIR/Centos6/x86_64/Salt"
Centos7Salt="$DIR/Centos7/x86_64/Salt"
Centos6Update="$DIR/Centos6/x86_64/Update"
Centos7Update="$DIR/Centos7/x86_64/Update"
Centos6Docker="$DIR/Centos6/x86_64/Docker"
Centos7Docker="$DIR/Centos7/x86_64/Docker"
Centos6Mysql5_7="$DIR/Centos6 / x86_64 / Mysql/Mysql5.7"
Centos7Mysql5_7="$DIR/Centos7 / x86_64 / Mysql/Mysql5.7"
Centos6Mysql8_0="$DIR/Centos6 / x86_64 / Mysql/Mysql8.0"
Centos7Mysql8_0="$DIR/Centos7 / x86_64 / Mysql/Mysql8.0"
MirrorDomain="rsync://rsync.mirrors.ustc.edu.cn"

Create a directory that does not exist
check_dir(){
    for dir in $*
    do
        test -d $dir || mkdir -p $dir
    done
}

Check the rsync result
check_rsync_status(){
    if [ $? -eq 0]; then echo"rsync success"> > $1
    else
        echo "rsync fail"> > $1fi } check_dir $DIR $LogDir $Centos6Base $Centos7Base $Centos6Epel $Centos7Epel $Centos6Salt $Centos7Salt $Centos6Update  $Centos7Update $Centos6Docker $Centos7Docker $Centos6Mysql5_7 $Centos7Mysql5_7 $Centos6Mysql8_0 $Centos7Mysql8_0# Base yumrepo
#$RsyncCommand "$MirrorDomain"/repo/centos/6/os/x86_64/ $Centos6Base >> "$LogDir/centos6Base.log" 2>&1
# check_rsync_status "$LogDir/centos6Base.log"
$RsyncCommand "$MirrorDomain"/repo/centos/7/os/x86_64/ $Centos7Base >> "$LogDir/centos7Base.log" 2> &1
check_rsync_status "$LogDir/centos7Base.log"

# Epel yumrepo
# $RsyncCommand "$MirrorDomain"/repo/epel/6/x86_64/ $Centos6Epel >> "$LogDir/centos6Epel.log" 2>&1
# check_rsync_status "$LogDir/centos6Epel.log"
$RsyncCommand "$MirrorDomain"/repo/epel/7/x86_64/ $Centos7Epel >> "$LogDir/centos7Epel.log" 2> &1
check_rsync_status "$LogDir/centos7Epel.log"

# SaltStack yumrepo
# $RsyncCommand "$MirrorDomain"/repo/salt/yum/redhat/6/x86_64/ $Centos6Salt >> "$LogDir/centos6Salt.log" 2>&1
# ln -s $Centos6Salt/archive/$(ls $Centos6Salt/archive | tail -1) $Centos6Salt/latest
# check_rsync_status "$LogDir/centos6Salt.log"
$RsyncComman "$MirrorDomain"/repo/salt/yum/redhat/7/x86_64/ $Centos7Salt >> "$LogDir/centos7Salt.log" 2> &1
check_rsync_status "$LogDir/centos7Salt.log"
# ln -s $Centos7Salt/archive/$(ls $Centos7Salt/archive | tail -1) $Centos7Salt/latest

# Docker yumrepo
$RsyncCommand "$MirrorDomain"/repo/docker-ce/linux/centos/7/x86_64/stable/ $Centos7Docker >> "$LogDir/centos7Docker.log" 2> &1
check_rsync_status "$LogDir/centos7Docker.log"

# centos update yumrepo
# $RsyncCommand "$MirrorDomain"/repo/centos/6/updates/x86_64/ $Centos6Update >> "$LogDir/centos6Update.log" 2>&1
# check_rsync_status "$LogDir/centos6Update.log"
$RsyncCommand "$MirrorDomain"/repo/centos/7/updates/x86_64/ $Centos7Update >> "$LogDir/centos7Update.log" 2> &1
check_rsync_status "$LogDir/centos7Update.log"

# mysql 5.7 yumrepo
# $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/6/x86_64/ "$Centos6Mysql5_7" >> "$LogDir/centos6Mysql5. 7. The log" 2 > &1
# check_rsync_status "$LogDir/centos6Mysql5. 7. The log"
$RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/7/x86_64/ "$Centos7Mysql5_7" >> "$LogDir/centos7Mysql5. 7. The log" 2> &1
check_rsync_status "$LogDir/centos7Mysql5. 7. The log"

# mysql 8.0 yumrepo
# $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/6/x86_64/ "$Centos6Mysql8_0" >> "$LogDir/centos6Mysql8. 0. The log" 2 > &1
# check_rsync_status "$LogDir/centos6Mysql8. 0. The log"
$RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/7/x86_64/ "$Centos7Mysql8_0" >> "$LogDir/centos7Mysql8. 0. The log" 2> &1
check_rsync_status "$LogDir/centos7Mysql8. 0. The log"
Copy the code

Answers to readers’ needs

When the load is high, check the process scripts that occupy a large proportion and store or push notifications

This is part of the previous chapter7 very useful Shell take to use script examples!The needs of readers’ messages at the bottom of the middle are as follows:

#! /bin/bash

Number of physical cpus
physical_cpu_count=$(egrep 'physical id' /proc/cpuinfo | sort | uniq | wc -l)
Number of cores per physical CPU
physical_cpu_cores=$(egrep 'cpu cores' /proc/cpuinfo | uniq | awk '{print $NF}')
The total number of nuclear
total_cpu_cores=$((physical_cpu_count*physical_cpu_cores))

# is the threshold of one minute, five minutes, and fifteen minutes of load, and one of them is triggered when the threshold is exceeded
one_min_load_threshold="$total_cpu_cores"
five_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores""0.8"} '*')
fifteen_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores""0.7"} '*')

The values are the average load of minutes, five minutes, and fifteen minutes respectively
one_min_load=$(uptime | awk '{print $(NF-2)}' | tr -d ', ')
five_min_load=$(uptime | awk '{print $(NF-1)}' | tr -d ', ')
fifteen_min_load=$(uptime | awk '{print $NF}' | tr -d ', ')

Get the current CPU disk I/O information and write it to the log file
If you need to send a message or call something else, please write your own function
get_info(){
    log_dir="cpu_high_script_log"
    test -d "$log_dir" || mkdir "$log_dir"
    ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > "$log_dir"/cpu_top10.log
    ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > "$log_dir"/mem_top10.log
    iostat -dx 1 10 > "$log_dir"/disk_io_10.log
}


export -f get_info

echo "$one_min_load $one_min_load_threshold $five_min_load $five_min_load_threshold $fifteen_min_load $fifteen_min_load_threshold" | \
awk '{ if ($1>=$2 || $3>=$4 || $5>=$6) system("get_info") }'
Copy the code

And that’s it for today.

I hope you can apply what you have learned through these cases and apply it in combination with your own actual scenes, so as to improve your work efficiency.

If you have more script examples, you are welcome to share them or leave a comment in the comments section of this article about your specific script instance needs. If there are too many examples, next time Jie Ge will share with you in a whole collection of script article examples.

By the way, if you think any of these script examples are useful in your work, you’d love to give a little support (like, comment, retweet).

Original is not easy, code word is not easy. If you think this article is useful to you, please give it a thumbs up, leave a comment or forward it, because this will be my motivation to output more high-quality articles, thank you!