• Defining structure

    type RestChan struct {
    	Status uint32   //true: closed
    	Rw sync.RWMutex  
    	Ch chan *azmode.ResultMode
    	Wg sync.WaitGroup
    }Copy the code

  • Construct new method

    func NewRestChan(len int16) *RestChan  {
    	var ch chan *azmode.ResultMode
    	ch = make(chan *azmode.ResultMode, len)
    	return &RestChan{Ch:ch}
    }Copy the code

  • Structure method get

    // Get data from chan
    func (r *RestChan)Get(a)(bool, *azmode.ResultMode)  {
    	var(
    		ok bool
    		rest *azmode.ResultMode
    	)
    	select {
    	case rest, ok = <- r.Ch:
    		if! ok{ fmt.Println("***********RestChan is closed")
    			return false.nil
    		}
    	case <-time.After(time.Second*1) :return false.nil
    	//default:
    	// // FMT.Println("*********** no data in RestChan ")
    	//	return false, nil
    	}
    
    	return true, rest
    }Copy the code

  • The constructor method put

    // Send data to chan
    func (r *RestChan)put(rest *azmode.ResultMode) bool  {
    
    	select {
    	case r.Ch <- rest:
    		return true
    	default:
    		/ / FMT. Println (" * * * * * * * * * * * RestChan is full ")
    		return false}}Copy the code

  • Struct method close

    / / close chan
    func (r *RestChan)Close(a)  {
    	var(
    		state uint32
    	)
    	r.Rw.Lock()
    	state = atomic.LoadUint32(&r.Status)
            // Check whether chan is disabled
    	ifstate! =1{
    		if atomic.CompareAndSwapUint32(&r.Status, state, 1){
    			fmt.Println("*********** start shutting down RestChan")
    			close(r.Ch)
    		}
    	}
    	defer r.Rw.Unlock()
    }Copy the code

  • The practice data is sent to Chan

    // Continue sending data to restChan until LT24SChan is closed
    func (r *RestChan)PushSRest(a)  {
    	var(
    		bl bool
    		rest *azmode.ResultMode
    	)
    	for{
    		ifbl, rest = LT24SChan.Get(); ! bl{// Check whether chan is closed
    			ifatomic.LoadUint32(&LT24SChan.Status) ! =1{
    				//fmt.Println("**************LT24SChan not available yet ")
    				continue
    			}
    			fmt.Println("***************** closed lt24schan.ch")
    			break
    
    		}else {
    			ifrest ! =nil{
    				// Send data to RestChan
    				ifbl = RestCh.put(rest); ! bl{// Send asynchronously, synchronization will cause blocking here
    					r.Wg.Add(1)
    					go func(rest *azmode.ResultMode) {
    						var bl bool
    						for{
    							ifbl = RestCh.put(rest); ! bl{continue
    							}
    							break
    						}
    						defer r.Wg.Done()
    					}(rest)
    				}
    			}
    			// FMT.Println("key already exists ")
    			continue}}}Copy the code

  • Practice closing Chan

    go func(a) {
    		// Make sure that all data sent to chan is complete
    		chanpip.RestCh.Wg.Wait()
    		chanpip.RestCh.Close()
    	}()Copy the code