preface

With the increasing popularity of various social forums, sensitive word filtering has gradually become a very important and important function. What are the new implementations of sensitive word filtering in Python under the Serverless architecture? Can we implement an API for filtering sensitive words in the simplest way?

Sensitive Filtering Introduction

The Replace method

In Python, when it comes to word replacement, we have to think of replace. We can prepare a sensitive word library and replace sensitive words through replace:

def check_filter(keywords, text):
    for eve in keywords:
        text = text.replace(eve, "* * *")
    return text
keywords = ("Key Word 1"."Key Word 2"."Key Word 3")
content = "This is an example of keyword substitution, involving keyword 1 and keyword 2, and keyword 3 at the end."
print(check_filter(keywords, content))
Copy the code

However, if you think about it a little bit, you will see that this approach has serious performance problems when the text and sensitive thesaurus are very large. For example, I modify the code to perform a basic performance test:

import time

def check_filter(keywords, text):
    for eve in keywords:
        text = text.replace(eve, "* * *")
    return text
keywords =[ "Keywords" + str(i) for i in range(0.10000)]
startTime = time.time()
content = "This is an example of keyword substitution, involving keyword 1 and keyword 2, and keyword 3 at the end." * 10000
check_filter(keywords, content)
print(time.time()-startTime)
Copy the code

The output is 1.235044002532959, which shows very poor performance.

Regular expression method

Instead of using replace, it’s much faster to express re.sub with the re.

def check_filter(keywords, text):
     return re.sub("|".join(keywords), "* * *", text)
keywords = ("Key Word 1"."Key Word 2"."Key Word 3")
content = "This is an example of keyword substitution, involving keyword 1 and keyword 2, and keyword 3 at the end." 
print(check_filter(keywords, content))
Copy the code

We also added the performance test and reformed the test according to the above method. The output result was 0.47878289222717285. In this example, we can see that the performance level of this approach is much higher, which is at least several times higher, if the number of thesaurus increases, the multiple will be multiplied.

DFA filters sensitive words

This method is relatively more efficient. For example, we consider bad person, bad child, bad person to be sensitive words, then their tree relationship can be expressed:

Use the DFA dictionary to express:

{
    'bad': {
        'egg': {
            '\x00': 0}.'people': {
            '\x00': 0}.'child': {
            'child': {
                '\x00': 0}}}}Copy the code

The biggest advantage of using this tree to represent the problem is that it can reduce the number of retrieval times and improve the retrieval efficiency. The basic code is as follows:

import time


class DFAFilter(object):
    def __init__(self):
        self.keyword_chains = {}  # keyword linked list
        self.delimit = '\x00'  # define

    def add(self, keyword):
        keyword = keyword.lower()  Change keyword English to lowercase
        chars = keyword.strip()  The # keyword removes leading and trailing Spaces and line feeds
        if not chars:  Return if the keyword is empty
            return
        level = self.keyword_chains
        Iterate over every word of the keyword
        for i in range(len(chars)):
            Enter the subdictionary if the word already exists in the key of the chain
            if chars[i] in level:
                level = level[chars[i]]
            else:
                if not isinstance(level, dict):
                    break
                for j in range(i, len(chars)):
                    level[chars[j]] = {}
                    last_level, last_char = level, chars[j]
                    level = level[chars[j]]
                last_level[last_char] = {self.delimit: 0}
                break
        if i == len(chars) - 1:
            level[self.delimit] = 0

    def parse(self, path):
        with open(path, encoding='utf-8') as f:
            for keyword in f:
                self.add(str(keyword).strip())

    def filter(self, message, repl="*"):
        message = message.lower()
        ret = []
        start = 0
        while start < len(message):
            level = self.keyword_chains
            step_ins = 0
            for char in message[start:]:
                if char in level:
                    step_ins += 1
                    if self.delimit not in level[char]:
                        level = level[char]
                    else:
                        ret.append(repl * step_ins)
                        start += step_ins - 1
                        break
                else:
                    ret.append(message[start])
                    break
            else:
                ret.append(message[start])
            start += 1

        return ' '.join(ret)


startTime = time.time()
gfw = DFAFilter()
gfw.parse( "./sensitive_words.txt")
content = "This is an example of keyword substitution, involving keyword 1 and keyword 2, and keyword 3 at the end." * 10000
result = gfw.filter(content)
print(time.time()-startTime)
Copy the code

Here our dictionary library is:

with open("./sensitive_words".'w') as f:
    f.write("\n".join( [ "Keywords" + str(i) for i inRange (0100, 00)))Copy the code

Execution Result:

4.9114227294921875 e-05Copy the code

You can see further performance improvements.

AC automata filter sensitive words algorithm

Next, let’s take a look at the AC automata filtering algorithm for sensitive words:

AC automata: A common example is to give n words and then a passage of m characters and ask you to find out how many words appear in the passage.

To put it simply, AC automata is dictionary tree + KMP algorithm + mismatched pointer

Code implementation:

# AC automata algorithm
class node(object):
    def __init__(self):
        self.next = {}
        self.fail = None
        self.isWord = False
        self.word = ""


class ac_automation(object):

    def __init__(self):
        self.root = node()

    # add sensitive word function
    def addword(self, word):
        temp_root = self.root
        for char in word:
            if char not in temp_root.next:
                temp_root.next[char] = node()
            temp_root = temp_root.next[char]
        temp_root.isWord = True
        temp_root.word = word

    # failed pointer function
    def make_fail(self):
        temp_que = []
        temp_que.append(self.root)
        whilelen(temp_que) ! = 0: temp = temp_que.pop(0) p = Nonefor key, value in temp.next.item():
                if temp == self.root:
                    temp.next[key].fail = self.root
                else:
                    p = temp.fail
                    while p is not None:
                        if key in p.next:
                            temp.next[key].fail = p.fail
                            break
                        p = p.fail
                    if p is None:
                        temp.next[key].fail = self.root
                temp_que.append(temp.next[key])

    Find sensitive word functions
    def search(self, content):
        p = self.root
        result = []
        currentposition = 0

        while currentposition < len(content):
            word = content[currentposition]
            while word inp.next == False and p ! = self.root: p = p.failif word in p.next:
                p = p.next[word]
            else:
                p = self.root

            if p.isWord:
                result.append(p.word)
                p = self.root
            currentposition += 1
        return result

    Load sensitive thesaurus functions
    def parse(self, path):
        with open(path, encoding='utf-8') as f:
            for keyword in f:
                self.addword(str(keyword).strip())

    # Sensitive word substitution function
    def words_replace(self, text):
        """:param ah: AC automaton :param text: text: return: filter text after sensitive words"""
        result = list(set(self.search(text)))
        for x in result:
            m = text.replace(x, The '*' * len(x))
            text = m
        return text

ah = ac_automation()
path = './sensitive_words'
ah.parse(path)
content = "This is an example of keyword substitution, involving keyword 1 and keyword 2, and keyword 3 at the end."
print(ah.words_replace(content))
Copy the code

The thesaurus is also:

with open("./sensitive_words".'w') as f:
    f.write("\n".join( [ "Keywords" + str(i) for i inRange (0100, 00)))Copy the code

Using the above method, test the content*10000 as 0.1727597713470459.

summary

All can see this algorithm, in the basic algorithm of DFA filtering sensitive parts of speech to the highest, but in fact, for the latter two algorithms, and no one is better, may be some of the time, AC automata filtering algorithm to get higher performance sensitive word, so in the production and living, recommend use both, when can be done according to their specific business needs.

How to deploy in the Serverless architecture

Very simple, take AC automata filter sensitive words algorithm as an example: we only need to add a few lines of code, the complete code is as follows:

# -*- coding:utf-8 -*-

import json, uuid


# AC automata algorithm
class node(object):
    def __init__(self):
        self.next = {}
        self.fail = None
        self.isWord = False
        self.word = ""


class ac_automation(object):

    def __init__(self):
        self.root = node()

    # add sensitive word function
    def addword(self, word):
        temp_root = self.root
        for char in word:
            if char not in temp_root.next:
                temp_root.next[char] = node()
            temp_root = temp_root.next[char]
        temp_root.isWord = True
        temp_root.word = word

    # failed pointer function
    def make_fail(self):
        temp_que = []
        temp_que.append(self.root)
        whilelen(temp_que) ! = 0: temp = temp_que.pop(0) p = Nonefor key, value in temp.next.item():
                if temp == self.root:
                    temp.next[key].fail = self.root
                else:
                    p = temp.fail
                    while p is not None:
                        if key in p.next:
                            temp.next[key].fail = p.fail
                            break
                        p = p.fail
                    if p is None:
                        temp.next[key].fail = self.root
                temp_que.append(temp.next[key])

    Find sensitive word functions
    def search(self, content):
        p = self.root
        result = []
        currentposition = 0

        while currentposition < len(content):
            word = content[currentposition]
            while word inp.next == False and p ! = self.root: p = p.failif word in p.next:
                p = p.next[word]
            else:
                p = self.root

            if p.isWord:
                result.append(p.word)
                p = self.root
            currentposition += 1
        return result

    Load sensitive thesaurus functions
    def parse(self, path):
        with open(path, encoding='utf-8') as f:
            for keyword in f:
                self.addword(str(keyword).strip())

    # Sensitive word substitution function
    def words_replace(self, text):
        """:param ah: AC automaton :param text: text: return: filter text after sensitive words"""
        result = list(set(self.search(text)))
        for x in result:
            m = text.replace(x, The '*' * len(x))
            text = m
        return text


def response(msg, error=False):
    return_data = {
        "uuid": str(uuid.uuid1()),
        "error": error,
        "message": msg
    }
    print(return_data)
    return return_data


ah = ac_automation()
path = './sensitive_words'
ah.parse(path)


def main_handler(event, context):
    try:
        sourceContent = json.loads(event["body"[])"content"]
        return response({
            "sourceContent": sourceContent,
            "filtedContent": ah.words_replace(sourceContent)
        })
    except Exception as e:
        return response(str(e), True)
Copy the code

Finally, to facilitate local testing, we can add:

def test():
    event = {
        "requestContext": {
            "serviceId": "service-f94sy04v"."path": "/test/{path}"."httpMethod": "POST"."requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef"."identity": {
                "secretId": "abdcdxxxxxxxsdfs"
            },
            "sourceIp": "14.17.22.34"."stage": "release"
        },
        "headers": {
            "Accept-Language": "en-US,en,cn"."Accept": "text/html,application/xml,application/json"."Host": "service-3ei3tii4-251000691.ap-guangzhou.apigateway.myqloud.com"."User-Agent": "User Agent String"
        },
        "body": "{\"content\":\" This is a test text, I will also ha ha \"}"."pathParameters": {
            "path": "value"
        },
        "queryStringParameters": {
            "foo": "bar"
        },
        "headerParameters": {
            "Refer": "10.0.2.14"
        },
        "stageVariables": {
            "stage": "release"
        },
        "path": "/test/value"."queryString": {
            "foo": "bar"."bob": "alice"
        },
        "httpMethod": "POST"
    }
    print(main_handler(event, None))


if __name__ == "__main__":
    test(a)Copy the code

Once that’s done, we can test run, for example, my dictionary is:

Ha ha testCopy the code

Results after execution:

{'uuid': '9961ae2a-5cfc-11ea-a7c2-acde48001122'.'error': False, 'message': {'sourceContent': 'This is a test text, and I'm okay with it.'.'filtedContent': 'This is a ** text, I will **'}}
Copy the code

Next, we deploy the code to the cloud and create a new serverless.yaml:

sensitive_word_filtering:
  component: "@serverless/tencent-scf"
  inputs:
    name: sensitive_word_filtering
    codeUri: . /
    exclude:
      - .gitignore
      - .git/**
      - .serverless
      - .env
    handler: index.main_handler
    runtime: Python3.6
    region: ap-beijing
    description: Sensitive word filtering
    memorySize: 64
    timeout: 2
    events:
      - apigw:
          name: serverless
          parameters:
            environment: release
            endpoints:
              - path: /sensitive_word_filtering
                description: Sensitive word filtering
                method: POST
                enableCORS: true
                param:
                  - name: content
                    position: BODY
                    required: 'FALSE'
                    type: string
                    desc: Sentences to be filtered
Copy the code

SLS –debug is then deployed, and the result is as follows:

Finally, take the POSTMan test:

Additional words

  • For sensitive thesaurus to go there to get questions, Github has many, you can search for yourself, Chinese and English have. I’m just doing an example here;
  • This API usage scenario can be put into our community comment system/message comment system/blog release system to prevent the occurrence of sensitive words, resulting in unnecessary trouble.