sequence

This paper focuses on TunNY

Worker

type Worker interface { // Process will synchronously perform a job and return the result. Process(interface{}) interface{} // BlockUntilReady is called before each job is processed and must block the // calling goroutine until the Worker is ready to process the next job. BlockUntilReady() // Interrupt is called when a job is cancelled. The worker is  responsible // for unblocking the Process implementation. Interrupt() // Terminate is called when a Worker is removed from the processing pool // and is responsible for cleaning up any held resources. Terminate() }Copy the code

The Worker interface defines the Process, BlockUntilReady, Interrupt, and Terminate methods

closureWorker

type closureWorker struct {
	processor func(interface{}) interface{}
}

func (w *closureWorker) Process(payload interface{}) interface{} {
	return w.processor(payload)
}

func (w *closureWorker) BlockUntilReady() {}
func (w *closureWorker) Interrupt()       {}
func (w *closureWorker) Terminate()       {}
Copy the code

ClosureWorker defines the Processor attribute, which implements the Process, BlockUntilReady, Interrupt, and Terminate methods of the Worker interface, where the Process method delegates to the processor

callbackWorker

type callbackWorker struct{} func (w *callbackWorker) Process(payload interface{}) interface{} { f, ok := payload.(func()) if ! ok { return ErrJobNotFunc } f() return nil } func (w *callbackWorker) BlockUntilReady() {} func (w *callbackWorker) Interrupt() {} func (w *callbackWorker) Terminate() {}Copy the code

The callbackWorker defines the processor attribute, which implements the Process, BlockUntilReady, Interrupt, and Terminate methods of the Worker interface. The Process method performs the payload function

Pool

type Pool struct {
	queuedJobs int64

	ctor    func() Worker
	workers []*workerWrapper
	reqChan chan workRequest

	workerMut sync.Mutex
}

func New(n int, ctor func() Worker) *Pool {
	p := &Pool{
		ctor:    ctor,
		reqChan: make(chan workRequest),
	}
	p.SetSize(n)

	return p
}

func NewFunc(n int, f func(interface{}) interface{}) *Pool {
	return New(n, func() Worker {
		return &closureWorker{
			processor: f,
		}
	})
}

func NewCallback(n int) *Pool {
	return New(n, func() Worker {
		return &callbackWorker{}
	})
}
Copy the code

Pool defines queuedJobs, cTOR, workers, reqChan, workerMut attributes. The New method creates a Pool based on n and cTOR. The NewFunc method creates the closureWorker based on n and f; The NewCallback method creates the callbackWorker

Process

func (p *Pool) Process(payload interface{}) interface{} { atomic.AddInt64(&p.queuedJobs, 1) request, open := <-p.reqChan if ! open { panic(ErrPoolNotRunning) } request.jobChan <- payload payload, open = <-request.retChan if ! open { panic(ErrWorkerClosed) } atomic.AddInt64(&p.queuedJobs, -1) return payload }Copy the code

The Process method increments queuedJobs, reads the request from reqChan, writes payload to jobChan, waits for retChan, and decrement queuedJobs

SetSize

func (p *Pool) SetSize(n int) {
	p.workerMut.Lock()
	defer p.workerMut.Unlock()

	lWorkers := len(p.workers)
	if lWorkers == n {
		return
	}

	// Add extra workers if N > len(workers)
	for i := lWorkers; i < n; i++ {
		p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor()))
	}

	// Asynchronously stop all workers > N
	for i := n; i < lWorkers; i++ {
		p.workers[i].stop()
	}

	// Synchronously wait for all workers > N to stop
	for i := n; i < lWorkers; i++ {
		p.workers[i].join()
	}

	// Remove stopped workers from slice
	p.workers = p.workers[:n]
}
Copy the code

The SetSize method first locks the workerMut, then creates newWorkerWrapper from lWorkers, then executes worker.stop, then worker.join(), then emptying workers

Close

func (p *Pool) Close() {
	p.SetSize(0)
	close(p.reqChan)
}
Copy the code

Close SetSize(0) and Close (p.reqchan)

The instance

func TestFuncJob(t *testing.T) { pool := NewFunc(10, func(in interface{}) interface{} { intVal := in.(int) return intVal * 2 }) defer pool.Close() for i := 0; i < 10; i++ { ret := pool.Process(10) if exp, act := 20, ret.(int); exp ! = act { t.Errorf("Wrong result: %v ! = %v", act, exp) } } }Copy the code

TestFuncJob creates a pool with NewFunc,

summary

Tunny’s Worker interface defines the Process, BlockUntilReady, Interrupt, and Terminate methods. The NewFunc method creates the closureWorker and the NewCallback method creates the callbackWorker.

doc

  • tunny