A preface.

NetDiscovery is a general crawler framework developed by me based on vert. x and RxJava 2. It contains a wealth of features.

Two. The use of multithreading

Although NetDiscovery uses RxJava 2 for thread switching, there are still plenty of multi-threaded scenarios. This article lists some common multithreading usage scenarios for crawler frameworks.

2.1 Suspension and recovery of crawlers

Pause and restore are the most common crawler usage scenarios, implemented with the CountDownLatch class.

CountDownLatch is a synchronization utility class that allows one or more threads to wait until other threads have completed their operations.

The pause method will initialize a CountDownLatch class pauseCountDown and set its value to 1.

The recovery method will take the pauseCountDown () just as its countDown reaches zero.

    /** * the crawler pauses, the current crawler continues to crawl, and subsequent crawlers wait until resume is called
    public void pause(a) {
        this.pauseCountDown = new CountDownLatch(1);
        this.pause = true;
        stat.compareAndSet(SPIDER_STATUS_RUNNING, SPIDER_STATUS_PAUSE);
    }

    /** * the crawler restarts */
    public void resume(a) {

        if (stat.get() == SPIDER_STATUS_PAUSE
                && this.pauseCountDown! =null) {

            this.pauseCountDown.countDown();
            this.pause = false; stat.compareAndSet(SPIDER_STATUS_PAUSE, SPIDER_STATUS_RUNNING); }}Copy the code

When the crawler’s Request is taken from the message queue, it determines whether the crawler’s behavior needs to be suspended. If it needs to be suspended, it should take the pausecounawait (). Await () will block the thread and suspend the crawler until the CountDownLatch count is 0, at which point the crawler can be restored.

        while(getSpiderStatus() ! = SPIDER_STATUS_STOPPED) {// Pause fetching
            if(pause && pauseCountDown! =null) {
                try {
                    this.pauseCountDown.await();
                } catch (InterruptedException e) {
                    log.error("can't pause : ", e);
                }

                initialDelay();
            }
            // Retrieve request from message queue
           finalRequest request = queue.poll(name); . }Copy the code

2.2 Multi-latitude control of climbing speed

The diagram below shows the flow of a single crawler.

If the crawler crawler speed is too fast, it will be identified by the other system. NetDiscovery can implement basic anti-crawler through speed limit.

Multiple latitude crawler speed limits are supported within NetDiscovery. These latitudes basically correspond to the flow of a single crawler.

2.2.1 Request

First, the crawler encapsulated Request Request supports pauses. After a Request is removed from the message queue, the Request is checked to see if it needs to be paused.

        while(getSpiderStatus() ! = SPIDER_STATUS_STOPPED) {// Pause fetching.// Retrieve request from message queue
            final Request request = queue.poll(name);

            if (request == null) {

                waitNewRequest();
            } else {

                if (request.getSleepTime() > 0) {

                    try {
                        Thread.sleep(request.getSleepTime());
                    } catch(InterruptedException e) { e.printStackTrace(); }}... }}Copy the code

2.2.2 the Download

When the crawler downloads, the loader creates a Maybe object in RxJava. Download speed limit is realized by RxJava compose and Transformer.

The following code shows the DownloaderDelayTransformer:

import cn.netdiscovery.core.domain.Request;
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;
import io.reactivex.MaybeTransformer;

import java.util.concurrent.TimeUnit;

/** * Created by tony on 2019-04-26. */
public class DownloaderDelayTransformer implements MaybeTransformer {

    private Request request;

    public DownloaderDelayTransformer(Request request) {
        this.request = request;
    }

    @Override
    public MaybeSource apply(Maybe upstream) {

        return request.getDownloadDelay() > 0? upstream.delay(request.getDownloadDelay(), TimeUnit.MILLISECONDS) : upstream; }}Copy the code

Downloader if using compose, DownloaderDelayTransformer, can realize the Download speed.

Take UrlConnectionDownloader as an example:

        Maybe.create(new MaybeOnSubscribe<InputStream>() {

                @Override
                public void subscribe(MaybeEmitter<InputStream> emitter) throws Exception {

                    emitter.onSuccess(httpUrlConnection.getInputStream());
                }
            })
             .compose(new DownloaderDelayTransformer(request))
             .map(new Function<InputStream, Response>() {

                @Override
                public Response apply(InputStream inputStream) throws Exception {...returnresponse; }});Copy the code

2.2.3 Domain

Domain speed limits are implemented using Scrapy, which stores each Domain name and its last access time into a ConcurrentHashMap. You can set the domainDelay attribute of a Request each time to limit the speed of a Request to a Domain.

import cn.netdiscovery.core.domain.Request;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/** * Created by tony on 2019-05-06. */
public class Throttle {

    private Map<String,Long> domains = new ConcurrentHashMap<String,Long>();

    private static class Holder {
        private static final Throttle instance = new Throttle();
    }

    private Throttle(a) {}public static final Throttle getInsatance(a) {
        return Throttle.Holder.instance;
    }

    public void wait(Request request) {

        String domain = request.getUrlParser().getHost();
        Long lastAccessed = domains.get(domain);

        if(lastAccessed! =null && lastAccessed>0) {
            long sleepSecs = request.getDomainDelay() - (System.currentTimeMillis() - lastAccessed);
            if (sleepSecs > 0) {
                try {
                    Thread.sleep(sleepSecs);
                } catch(InterruptedException e) { e.printStackTrace(); } } } domains.put(domain,System.currentTimeMillis()); }}Copy the code

When the Request is removed from the message queue, the system determines whether the Request needs to be paused first and then whether the access to the Domain needs to be paused.

        while(getSpiderStatus() ! = SPIDER_STATUS_STOPPED) {// Pause fetching.// Retrieve request from message queue
            final Request request = queue.poll(name);

            if (request == null) {

                waitNewRequest();
            } else {

                if (request.getSleepTime() > 0) {

                    try {
                        Thread.sleep(request.getSleepTime());
                    } catch(InterruptedException e) { e.printStackTrace(); } } Throttle.getInsatance().wait(request); . }}Copy the code

2.2.4 Pipeline

< span style = “box-sizing: border-box; line-height: 22px; word-break: inherit! Important; word-break: inherit! Important; word-break: inherit! Important;”

                // Request is being processed
                downloader.download(request)
                        .retryWhen(new RetryWithDelay(maxRetries, retryDelayMillis, request)) // A retry mechanism for network requests
                        .map(new Function<Response, Page>() {

                            @Override
                            public Page apply(Response response) throws Exception {
                                // Save response to page.return page;
                            }
                        })
                        .map(new Function<Page, Page>() {

                            @Override
                            public Page apply(Page page) throws Exception {

                                if(parser ! =null) {

                                    parser.process(page);
                                }

                                return page;
                            }
                        })
                        .map(new Function<Page, Page>() {

                            @Override
                            public Page apply(Page page) throws Exception {

                                if(! page.getResultItems().isSkip() && Preconditions.isNotBlank(pipelines)) { pipelines.stream() .forEach(pipeline -> { pipeline.process(page.getResultItems()); }); }return page;
                            }
                        })
                        .observeOn(Schedulers.io())
                        .subscribe(new Consumer<Page>() {

                            @Override
                            public void accept(Page page) throws Exception {

                                log.info(page.getUrl());

                                if(request.getAfterRequest() ! =null) { request.getAfterRequest().process(page); } signalNewRequest(); }},new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception { log.error(throwable.getMessage(), throwable); }});Copy the code

The speed limit of Pipeline is realized by the delay and block operators of RxJava.

map(new Function<Page, Page>() {

        @Override
        public Page apply(Page page) throws Exception {

               if(! page.getResultItems().isSkip() && Preconditions.isNotBlank(pipelines)) { pipelines.stream() .forEach(pipeline -> {if (pipeline.getPipelineDelay()>0) {

                                        // Pipeline Delay
                                        Observable.just("pipeline delay").delay(pipeline.getPipelineDelay(),TimeUnit.MILLISECONDS).blockingFirst();
                                 }

                                pipeline.process(page.getResultItems());
                          });
               }

                returnpage; }})Copy the code

In addition, NetDiscovery supports configuring crawlers by configuring application.yaml or application.properties files. Of course, you can also configure rate limiting parameters and use random values to configure corresponding rate limiting parameters.

2.3 Non-blocking crawler runs

In earlier versions, new requests could not be added after the crawler was running. Because the crawler exits the program by default after consuming the Request from the queue.

The new version makes use of Condition, which allows you to add a Request to a message queue even if a crawler is running.

Condition provides more precise control over the lock. It is used to replace the traditional wait() and notify() of Object to realize collaboration between threads. It is safer and more efficient to implement inter-thread collaboration using await() and signal() of Condition.

The ReentrantLock and Condition need to be defined in the Spider.

Then define waitNewRequest() and signalNewRequest() methods, which suspend the current crawler thread to wait for the new Request and wake up the crawler thread to consume the Request in the message queue respectively.

    private ReentrantLock newRequestLock = new ReentrantLock();
    privateCondition newRequestCondition = newRequestLock.newCondition(); .private void waitNewRequest(a) {
        newRequestLock.lock();

        try {
            newRequestCondition.await(sleepTime, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("waitNewRequest - interrupted, error {}", e);
        } finally{ newRequestLock.unlock(); }}public void signalNewRequest(a) {
        newRequestLock.lock();

        try {
            newRequestCondition.signalAll();
        } finally{ newRequestLock.unlock(); }}Copy the code

As you can see, if the Request is not fetched from the message queue, waitNewRequest() is run.

        while(getSpiderStatus() ! = SPIDER_STATUS_STOPPED) {// Pause fetching
            if(pause && pauseCountDown! =null) {
                try {
                    this.pauseCountDown.await();
                } catch (InterruptedException e) {
                    log.error("can't pause : ", e);
                }

                initialDelay();
            }

            // Retrieve request from message queue
            final Request request = queue.poll(name);

            if (request == null) {

                waitNewRequest();
            } else{... }}Copy the code

The Queue interface then contains a default method called pushToRunninSpider(), which internally calls spider.signalNewRequest() in addition to pushing the request to the Queue.

    /** * Adds the Request to the Queue of the crawler running without blocking the crawler running **@param request request
     */
    default void pushToRunninSpider(Request request, Spider spider) {

        push(request);
        spider.signalNewRequest();
    }
Copy the code

Finally, a Request can be added to the crawler’s Queue at any time, even if the crawler is already running.

        Spider spider = Spider.create(new DisruptorQueue())
                .name("tony")
                .url("http://www.163.com");

        CompletableFuture.runAsync(()->{
            spider.run();
        });

        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        spider.getQueue().pushToRunninSpider(new Request("https://www.baidu.com"."tony"),spider);

        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        spider.getQueue().pushToRunninSpider(new Request("https://www.jianshu.com"."tony"),spider);

        System.out.println("end....");
Copy the code

conclusion

Crawler framework Github address: github.com/fengzhizi71…

This paper summarizes how the common crawler framework uses multithreading in some specific scenarios. In the future, NetDiscovery will add even more general features.


Java and Android technology stack: update and push original technical articles every week, welcome to scan the qr code of the public account below and pay attention to, looking forward to growing and progress with you together.