spiderman

General distributed crawler framework based on Scrapy-Redis

Project address github.com/TurboWay/sp…


directory

  • rendering

    • Acquisition effect
    • Crawler metadata
    • Distributed crawler running
    • Single crawler operation
    • Download the attachment
    • Kafka real-time acquisition monitoring example
  • introduce

    • function
    • Principle that
  • Quick start

    • Download and install
    • How to develop a new crawler
    • How to make up the climb
    • How to download attachments
    • How do I extend distributed crawlers
    • How do I manage crawler metadata
    • How to cooperate with Kafka to do real-time acquisition monitoring
    • How to use the crawler API
  • other

    • Matters needing attention
    • Hive Environment Problems
    • Update log
    • TODO

Demo Collection Effect

Crawler metadata

Cluster mode

Standalone mode

Download the attachment

Kafka real-time acquisition and monitoring

function

  • Automatically build table

  • Automatic generation of crawler code, only need to write a small amount of code to complete the distributed crawler

  • Automatic storage of metadata, analysis of statistics and crawling are very convenient

  • Suitable for multi-site development, each crawler customized independently, each other

  • Easy to call, you can customize the number of pages collected and the number of crawlers enabled according to the parameters

  • The extension is simple and allows you to choose from standalone mode, standalone (default) or distributed cluster as required

  • Easy to collect data landing, support a variety of databases, only in spider to enable the relevant pipeline

    relational

    • mysql
    • sqlserver
    • oracle
    • postgresql
    • sqlite3

    non-relational

    • hbase
    • mongodb
    • elasticsearch
    • hdfs
    • hive
    • Datafile, such as CSV
  • Anti – crawl processing is easy, has encapsulated a variety of anti – crawl middleware

    • Random UserAgent
    • Customize the request Headers
    • Custom Cookies pool
    • Customizing proxy IP addresses
    • Use requests in scrapy
    • Content request
    • Render JS using Splash

Principle that

  1. Message queues use Redis and collection policies use breadth-first, first-in, first-out
  2. Each crawler has a job file, which is used to generate the initial request class ScheduledRequest and push it to Redis.

After all the initial requests are pushed to Redis, a spider is run to parse the generated data and iterate over new requests to Redis until all requests in Redis are consumed

# scrapy_redis request class
class ScheduledRequest:

    def __init__(self, **kwargs) :
        self.url = kwargs.get('url')                 # request url
        self.method = kwargs.get('method'.'GET')   The request mode defaults to get
        self.callback = kwargs.get('callback')  The callback function specifies the parser function for spider
        self.body = kwargs.get('body')  As a POST form when # body, method is POST
        self.meta = kwargs.get('meta')  # meta, carrying metadata such as PAGenum
Copy the code
  1. The item class defines table name, field name, sort number (custom field order), comment description (easy to manage metadata), field type (only valid for relational database pipes)
class zhifang_list_Item(scrapy.Item) :
    # define table
    tablename = 'zhifang_list'
    tabledesc = 'list'
    # define the fields for your item here like:
    VARCHAR(length=255); VARCHAR(length=255);
    # colname = scrapy.Field({'idx': 1, 'comment': 'name ', 'type': VARCHAR(255)})
    tit = scrapy.Field({'idx': 1.'comment': 'Title of House'})
    txt = scrapy.Field({'idx': 2.'comment': 'House Description'})
    tit2 = scrapy.Field({'idx': 3.'comment': 'House floor'})
    price = scrapy.Field({'idx': 4.'comment': 'House price'})
    agent = scrapy.Field({'idx': 5.'comment': 'Real Estate Agent'})
    # default column
    detail_full_url = scrapy.Field({'idx': 100.'comment': 'Details link'})  # generic field
    pkey = scrapy.Field({'idx': 101.'comment': 'md5(detail_full_url)'})  # generic field
    pagenum = scrapy.Field({'idx': 102.'comment': 'page'})  # generic field
Copy the code
  1. Deduplication policy. By default, deduplication is not performed. Each collection is independent. You can modify the following configurations if necessary
  • Job file (single crawler)
class zhifang_job(SPJob) :

    def __init__(self) :
        super().__init__(spider_name=zhifang_Spider.name)
        # self.delete() # Comment this line if you want to de-duplicate, incremental collection
Copy the code
  • Spider file (single crawler)
custom_settings = { ... .'DUPEFILTER_CLASS': 'scrapy_redis.dupefilter.RFPDupeFilter'.'SCHEDULER_PERSIST': True.# Enable persistence
    }
   
    def get_callback(self, callback) :
        # url derename: True does not deduplicate False deduplicate
        callback_dt = {
            'list': (self.list_parse, False),
            'detail': (self.detail_parse, False),}return callback_dt.get(callback)
Copy the code
  • Bloom filter.

When a large amount of data is collected, bloom filter can be used. The algorithm occupies a small space and is controllable, which is suitable for massive data deduplication. However, the algorithm will have a leakage rate, which is the same as the crawler. You can adjust the filter load, memory configuration, and hashing times to reduce the leakage rate. By default, 1 filter, 256 M memory, and 7 seeds are used. This configuration indicates that the probability of missing is 8.56E-05, which can satisfy the de-gravity of 93 million strings. When the leakage rate is 0.000112, it can satisfy the deduplication of 98 million strings. Adjust participation leakage rate reference

custom_settings = { ... .'DUPEFILTER_CLASS': 'SP.bloom_dupefilter.BloomRFDupeFilter'.# Use bloom filters
        'SCHEDULER_PERSIST': True.# Enable persistence
        'BLOOM_NUM': 1.The number of loads on the Bloom filter can be increased when the memory limit is reached
        'BLOOM_MEM': 256.# Bloem filter memory size (in M), the maximum memory size is 512 M (redis string is 512 M).
        'BLOOM_K': 7.# Bloem filter hash times, the fewer times, the faster weight removal, but the higher the leakage rate
    }
   
    def get_callback(self, callback) :
        # url derename: True does not deduplicate False deduplicate
        callback_dt = {
            'list': (self.list_parse, False),
            'detail': (self.detail_parse, False),}return callback_dt.get(callback)
Copy the code

Download and install

  1. Git clone github.com/TurboWay/sp… ; cd spiderman;
  2. Virtualenv -p /usr/bin/python3 venv
  3. Source venv/bin/activate source venv/bin/activate
  4. pip install -i pypi.tuna.tsinghua.edu.cn/simple -r requirements.txt
  5. Modify configuration vi SP/settings.py
  6. Run the demo python SP_JOBS/zhifang_job.py

How to develop a new crawler

Running easy_scrapy.py automatically generates the following code files from the template and automatically opens the spidername_job.py file in the editor.

category The path instructions
job SP_JOBS/spidername_job.py Writing the initial request
spider SP/spiders/spidername.py Write parsing rules to generate new requests
items SP/items/spidername_items.py Define the table name field

Python SP_JOBS/spidername_job.py after the above code file is written, run python SP_JOBS/spidername_job.py

-p Number of pages to be collected, -n Number of crawlers to be enabled. Python SP_JOBS/spidername_job.py -p 10 -n 1

How to make up the climb

Running easy_scrapy.py automatically generates the following code files from the template and automatically opens the spidername_job_patch.py file in the editor.

category The path instructions
job SP_JOBS/spidername_job_patch.py Write the crawl request

After the above code file is written, run Python SP_JOBS/spidername_job_patch.py directly

How to download attachments

There are two ways to download:

  • 1. Enable the attachment download pipe directly in spider
  • 2. Use the custom downloader execute_download.py to upload parameters to download

jpg/pdf/word… And various documents, collectively referred to as attachments. Downloading attachments occupies a lot of bandwidth. Therefore, in large-scale collection, it is best to first put structured table data and metadata of attachments into the database to ensure the integrity of data, and then download attachments through downloaders as required.

How do I extend distributed crawlers

There are two gathering modes (controlled in Settings) : standalone(default) and clustered distributed

To switch to a distributed crawler, you need to enable the following configuration in spiderman/SP/settings.py

Note: The premise is that all SLAVE machines have the same crawler code and python environment, and can run crawler demo

False Single machine (default); True distributed needs to configure the following slaves
CLUSTER_ENABLE = True
Copy the code
The name of the configuration meaning The sample
SLAVES Crawler configuration list [{‘ host ‘:’ 172.16.122.12 ‘and’ port ‘: 22,’ user ‘:’ spiders, ‘the PWD’ : ‘spiders’},

{‘ host ‘:’ 172.16.122.13 ‘and’ port ‘: 22,’ user ‘:’ spiders, ‘the PWD’ : ‘spiders’}]
SLAVES_BALANCE Crawler configuration (SSH load balancing) {‘ host ‘:’ 172.16.122.11 ‘and’ port: 2202, ‘user’ : ‘spiders,’ the PWD ‘:’ spiders’}
SLAVES_ENV [Optional] Path of crawler virtual environment /home/spider/workspace/spiderman/venv
SLAVES_WORKSPACE Crawler machine code engineering path /home/spider/workspace/spiderman

How do I manage crawler metadata

Run easy_meta.py to automatically generate the metadata of all crawlers of the current project, which is recorded in SQLite meta.db by default and can be self-configured in setting.

# the crawler meta
META_ENGINE = 'sqlite:///meta.db'
Copy the code

The meta table meta dictionary is as follows:

The field name type annotation
spider varchar(50) The crawler name
spider_comment varchar(100) The crawler description
tb varchar(50) The name of the table
tb_comment varchar(100) Table describes
col_px int The serial number field
col varchar(50) The field name
col_comment varchar(100) The field
author varchar(20) The developer
addtime varchar(20) Development time
insertime varchar(20) Metadata update time

How to cooperate with Kafka to do real-time acquisition monitoring

  1. Configure kafka (modify setting KAFKA_SERVERS)
  2. Custom monitoring rules (modify writing kafka_mon.py and running the script to start monitoring)
  3. Enable kafka pipeline in spider (run crawler job, start collecting)

How to use the crawler API

Run directly API. Py, then can through http://127.0.0.1:2021/docs to check the relevant API documentation

Matters needing attention

  1. Tablename, ISLOAD, Ctime, bizDate, spider, and other fields cannot be used for field names because these fields are used as common fields to avoid conflicts
  2. It is recommended to add comments to each field of the Items file. When metadata is generated, the comments are imported into the metadata table to facilitate crawler management

Hive Environment Problems

In Windows, there are many pits when python3 is used to connect to Hive. Therefore, when HDFS is used, the hive automatic table creation function is disabled by default for easy deployment. To enable the Hive automatic table building function, perform the following operations:

  1. pip install -i pypi.tuna.tsinghua.edu.cn/simple -r requirements.txt
  2. PIP install – no – deps thrift – sasl = = 0.2.1
  3. To verify the environment, run sp.utils. ctrl_hive

If the command is executed successfully, the Hive environment is ready. You can directly enable hive automatic table creation. If you encounter problems, see Big Data to connect Python3 to Hive in Windows

Update log

The date of Update the content
20200803 1. Use a more elegant way to generate metadata;

2. Adjusting the writing method of pipeline function parameter transfer;

Download status (isLOAD => status)
20200831 1. If data fails to be imported into the database, try again.

2. All pipelines are optimized, and when warehousing fails, it will automatically switch to line-by-line warehousing, and only abnormal records will be discarded
20201104 1. Requests Middleware supports DOWNLOAD_TIMEOUT and DOWNLOAD_DELAY
20201212 1. Payload middleware supports DOWNLOAD_TIMEOUT and DOWNLOAD_DELAY.

2. Optimization of get_SP_cookies method, using lightweight SPLASH to replace Selenium;

3. The principle part of MD adds the description of deduplication strategy
20210105 1. Add a Bloom filter
20210217 Elasticsearch (compatible with ElasticSearch 7 or later) uses table names as index names
20210314 1. All anti-crawl middleware is merged into SPMiddleWare
20210315 1. Generate the initial job request in an elegant way.

2. Optimize the headers middleware to reduce the memory occupation of Redis;

3. Delete the cookie middleware. Cookie is only a value of headers and you can directly use headers middleware.

4. Delete Payload middleware. Payload requests can be used directly

5. Add CookiesPool middleware for randomly switching between multiple accounts
20210317 1. Add distributed attachment downloaders that work independently of scrapy
20210318 1. Add API services