In the previous article Golang simple crawler framework (2) — single task version crawler we implemented a simple single task version crawler, for the single task version crawler, each time to request the page, and then parse the data, and then request the next page. In the whole process, the speed of obtaining web data is relatively slow, so we will make the data acquisition module concurrent execution. Based on the project, multi-task concurrent version crawler is realized.

Github address: github.com/NovemberCho… Roll back to the corresponding record for better effect.

1. Project architecture

First, we combine the Fetcher module and Parser module in the crawler architecture of task version into a Worker module, and then execute the Worker module concurrently

Then we get the architecture diagram of the concurrent version:

  • In the crawler of concurrent version, multiple workers will be executed at the same time. Each Worker task accepts a Request Request, and then the Request page parses the data and outputs the parsed Requests and Item

  • As there are many requests and workers, the Scheduler module is also needed to schedule and process the Request tasks

  • The Engine module accepts Requests and Items sent by the Worker. For now, we’ll print out Items and send the parsed Request to the scheduler

  • Engine and Scheduler are one Goroutine, while Worker contains multiple Goroutines. Each module is connected by channel

    Let’s start with the refactored project file structure:

2. Worker implementation

We extracted the following functionality from engine.go as the Worker module and renamed engine.go to simple.go. Please adjust the simple. Go file after modification, or go to github project source code rollback to view.

engine/worker.go

package engine

import (
	"crawler/fetcher"
	"log"
)

// Enter Request to return ParseResult
func worker(request Request) (ParseResult, error) {
	log.Printf("Fetching %s\n", request.Url)
	content, err := fetcher.Fetch(request.Url)
	iferr ! =nil {
		log.Printf("Fetch error, Url: %s %v\n", request.Url, err)
		return ParseResult{}, err
	}
	return request.ParseFunc(content), nil
}
Copy the code

Accept a request for each Worker and return the parsed content

3. Concurrent engine implementation

Please see according to the architecture diagram, the effect will be better.

package engine

import "log"// Concurrency enginetypeConcurrendEngine struct {Scheduler WorkerCount int number of concurrent tasks} // Task SchedulertypeScheduler Interface {Submit(Request request) // Submit the task ConfigMasterWorkerChan(chan Request) // Configure the initial request task} func (e) *ConcurrendEngine) Run(seeds ... Request) {in: = make (chan Request) / / scheduler input out: = make (chan ParseResult) / / worker output e.S cheduler. ConfigMasterWorkerChan (in// Submits the initial request to scheduler // creates Goruntinefor i := 0; i < e.WorkerCount; i++ {
      createWorker(in, out)} // Engine submits requests to the Schedulerfor _, request := range seeds {
      e.Scheduler.Submit(request)
   }

   itemCount := 0
   for{// Accept Worker's parse result := <-outfor _, item := range result.Items {
         log.Printf("Got item: #%d: %v\n", itemCount, item) itemCount++} // Then sends the Request resolved by the Worker to the SchedulerforRequest := range result.Requests {e.cheduler. Submit(request)}}}in chan Request, out chan ParseResult) {
   go func() {
      for {
         request := <-in
         result, err := worker(request)
         iferr ! = nil {continue
         }
         out <- result
      }
   }()
}
Copy the code

4. Task Scheduler Scheduler implementation

package scheduler

import "crawler/engine"

type SimpleScheduler struct {
	workerChan chan engine.Request
}

func (s *SimpleScheduler) Submit(request engine.Request) {
	// Create a Goroutine for each Request
	go func(a) {
		s.workerChan <- request
	}()
}

// Sends the initial request to the Scheduler
func (s *SimpleScheduler) ConfigMasterWorkerChan(in chan engine.Request) {
	s.workerChan = in
}

Copy the code

5. Main function

package main

import (
	"crawler/engine"
	"crawler/scheduler"
	"crawler/zhenai/parser"
)

func main(a) {
	e := engine.ConcurrendEngine{	// Configure the crawler engine
		Scheduler:   &scheduler.SimpleScheduler{},
		WorkerCount: 50,
	}
	e.Run(engine.Request{		// Configure crawler target information
		Url:       "http://www.zhenai.com/zhenghun",
		ParseFunc: parser.ParseCityList,
	})
}
Copy the code

6, summary

In this blog, we implement a simple concurrent version of crawler. The scheduler receives tasks continuously and assigns tasks to a worker once there is idle. One disadvantage of this is that we do not know the working situation when we distribute so many workers, so we have weak control over workers, so we will use queue to realize task scheduling in the next blog.

If you want to get an in-depth go video from Google engineers, you can leave your email in the comments section.

The source code of the project has been hosted to Github, for each version of the record, welcome to check, remember to give a star, in this first thank you