Avoid future dilemmas by learning how to locate the pitfalls of concurrent processing.

When you’re doing tasks in a complex distributed system, you often need to do concurrent operations. At Mode.net, we deal with real-time, fast and flexible software every day. Without a highly concurrent system, it is impossible to build a global private network that dynamically routes packets in milliseconds. This dynamic routing is based on network state, and although there are many factors to consider in this process, our focus is on link metrics. In our environment, a link metric can be anything related to the state and current properties of the network link, such as link latency.

Concurrent probe link monitoring

Our dynamic Routing algorithm H.A.L.O. (hop-by-Hop Adaptive link-state Optimal Routing) partially relies on Link indicators to calculate Routing tables. These metrics are collected by separate components located on each PoP (Point of Presence). PoP is a machine that represents a single routed entity in our network, connected by links and distributed throughout our network topology. One component uses network packets to probe surrounding machines, which send packets back to the other. Link delays can be obtained from the received probe packets. Since each PoP has more than one neighboring node, this detection task is essentially concurrent: we need to measure the delay at each neighboring join point in real time. We can’t do it serially; To calculate this metric, each probe must be processed as quickly as possible.

latency computation graph

Serial number and reset: a rearrangement scenario

Our probe components send and receive packets to and from each other and rely on serial numbers for packet processing. This is intended to avoid dealing with duplicate packages or packages that are out of order. Our first implementation relied on the special sequence number 0 to reset the sequence number. This number is used only when the component is initialized. The main problem is that we take into account that increasing serial numbers always start at 0. After the component is restarted, the order of packages can be rearranged, and the serial number of a package can easily be replaced with a value that was used before the reset. This means that subsequent packets are ignored until the sequence value used before the reset is reached.

UDP handshake and finite state machine

The problem here is whether the serial number of the component is the same before and after the restart. There are several ways to solve this problem, and after discussion, we chose to implement a three-step handshake protocol with clear state definitions. The handshake process establishes a session over a link at initialization. This ensures that the nodes are communicating through the same session and using the appropriate sequence number.

To implement this process correctly, we must define a finite state machine with clear states and transitions. This way we can properly manage all the extremes of the handshake.

finite state machine diagram

The session ID is generated by the handshake initializer. A complete exchange order is as follows:

  1. The sender sends oneSYN(ID)Packets.
  2. The receiver stores the receivedIDAnd send aSYN-ACK(ID).
  3. The sender received itSYN-ACK(ID)And send aACK(ID). It also sends a packet starting with serial number 0.
  4. The receiver checks the last receivedIDIf the ID matches, it is acceptedACK(ID). It also started accepting packets with serial number 0.

Processing status timeout

Basically, you need to handle up to three types of events in each state: link events, packet events, and timeout events. These events occur concurrently, so you must handle concurrency correctly.

  • A link event involves a change in network connection or disconnection, the corresponding initialization of a link session or disconnection of an established session.
  • Packet events are control packets (SYN/SYN-ACK/ACK) or just probe the response.
  • The timeout event is triggered when the scheduled timeout of the current session state expires.

The main problem here is how to handle concurrent timeouts and other events. It’s easy to fall into the trap of deadlocks and resource competition.

The first way

The language used in this project is Golang. It does provide native synchronization mechanisms, such as built-in channels and locks, and the ability to use lightweight threads for concurrent processing.

gophers hacking together

The Gopher party

First, you can design two structures that represent our session and our timeout handler.

type Session struct {  
  State SessionState  
  Id SessionId  
  RemoteIp string  
}

type TimeoutHandler struct {  
  callback func(Session)  
  session Session  
  duration int  
  timer *timer.Timer  
}
Copy the code

Session Identifies the connection Session and contains fields that represent the Session ID, the IP address of the adjacent connection point, and the current Session status.

The TimeoutHandler contains the callback function, corresponding session, duration, and pointer to the scheduling timer.

Each session near the join point contains a global map that holds the schedule TimeoutHandler.

SessionTimeout map[Session]*TimeoutHandler
Copy the code

Register and cancel timeouts as follows:

// schedules the timeout callback function.
func (timeout* TimeoutHandler) Register(a) {  
  timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func(a) {  
    timeout.callback(timeout.session)  
  })  
}

func (timeout* TimeoutHandler) Cancel(a) {  
  if timeout.timer == nil {  
    return  
  }  
  timeout.timer.Stop()  
}
Copy the code

You can create and store timeouts using methods like the following:

func CreateTimeoutHandler(callback func(Session).session Session.duration int) *TimeoutHandler {  
  if sessionTimeout[session] == nil {  
    sessionTimeout[session] := new(TimeoutHandler)  
  }  
   
  timeout = sessionTimeout[session]  
  timeout.session = session  
  timeout.callback = callback  
  timeout.duration = duration  
  return timeout  
}
Copy the code

After the timeout handler is created, the callback function is executed after the specified duration (seconds) has elapsed. However, some events will cause you to reschedule a timeout handler (every 3 seconds, as in SYN state).

To do this, you can ask the callback to reschedule a timeout:

func synCallback(session Session) {  
  sendSynPacket(session)

  // reschedules the same callback.
  newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)  
  newTimeout.Register()

  sessionTimeout[state] = newTimeout  
}
Copy the code

This callback reschedules itself in the new timeout handler and updates the global mapping sessionTimeout.

Data competition and citation

Your solution is here. A simple test can be performed by checking whether the timeout callback executes after the timer expires. To do this, register a timeout, sleep duration for seconds, and then check to see if the callback processing is performed. After this test is executed, it is a good idea to cancel the scheduled timeout (because it will be rescheduled) so that there are no side effects on the next test.

Surprisingly, this simple test found a problem with this solution. Using the cancel method to cancel the timeout is not handled correctly. The following sequence of events can result in data resource contention:

  1. You have a scheduled timeout handler.
  2. Thread 1:
    1. You received a control packet, now you want to cancel the registered timeout and switch to the next session state (such as send)SYNAfter receiving oneSYN-ACK)
    2. Did you calltimeout.Cancel(), this function is calledtimer.Stop(). (Note that stopping the Golang timer does not terminate an expired timer.)
  3. Thread 2:
    1. Before canceling the call, the timer has expired and the callback is about to execute.
    2. A callback is executed, which schedules a new timeout and updates the global map.
  4. Thread 1:
    1. Switch to a new session state and register a new timeout, updating the global mapping.

Both threads update the timeout map concurrently. The net result is that you can’t unregister the timeout, and then you lose the reference to thread 2’s rescheduling timeout as well. This causes the handler to continue executing and rescheduling over a period of time, with unexpected behavior.

Locks don’t solve the problem

Using locks is not a complete solution. If you add the lock before processing all the events and executing the callback, it still won’t prevent an expired callback from running:

func (timeout* TimeoutHandler) Register(a) {  
  timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func(a) {  
    stateLock.Lock()  
    defer stateLock.Unlock()

    timeout.callback(timeout.session)  
  })  
}
Copy the code

The difference now is that updates to the global map are synchronous, but this still doesn’t prevent the callback from executing after you call timeout.cancel () — this happens when the scheduling timer has expired but the lock hasn’t been picked up. You will still lose a reference to a registered timeout.

Use cancel channel

You can use cancel channels instead of relying on the Golang function timer.stop (), which does not prevent expired timers from executing.

This is a slightly different approach. Now you don’t have to recursively reschedule through callbacks; Instead, it registers an infinite loop that terminates when a cancellation signal or timeout event is received.

The new Register() generates a new GO thread that executes your callback after the timeout and schedules the new timeout after the previous timeout. Returns a cancel channel to the caller to control the termination of the loop.

func (timeout *TimeoutHandler) Register(a) chan struct{} {  
  cancelChan := make(chan struct{})  
   
  go func (a) {  
    select {  
    case _ = <- cancelChan:  
      return  
    case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):  
      func (a) {  
        stateLock.Lock()  
        defer stateLock.Unlock()

        timeout.callback(timeout.session)  
      } ()  
    }  
  } ()

  return cancelChan  
}

func (timeout* TimeoutHandler) Cancel(a) {  
  if timeout.cancelChan == nil {  
    return  
  }  
  timeout.cancelChan <- struct{}{}  
}
Copy the code

This method provides cancellation channels for all timeouts you register. A cancel call sends an empty structure to the channel and triggers the cancel operation. However, this does not solve the preceding problem; The timeout may run out before you cancel through the channel and before the timeout thread gets the lock.

The solution here is to check for cancellation channels in the timeout range after the lock is picked up.

  case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):  
    func (a) {  
      stateLock.Lock()  
      defer stateLock.Unlock()  
     
      select {  
      case _ = <- handler.cancelChan:  
        return  
      default:  
        timeout.callback(timeout.session)  
      }  
    } ()  
  }
Copy the code

Finally, this ensures that a callback after the lock is acquired does not trigger a cancellation.

Be careful of the deadlock

The solution seems to work; But there’s still a catch: deadlocks.

Read the code above and try to find it yourself. Consider concurrent calls to all of the functions described.

The problem here is canceling the channel itself. We create an unbuffered channel that sends a blocking call. When you call a cancel function in a timeout handler, processing cannot continue until the handler has been canceled. The problem is that when you have multiple calls to the same cancellation channel, a cancellation request is only processed once. This can easily happen when multiple events cancel the same timeout handler at the same time, such as a connection disconnection or a control packet event. This causes a deadlock and can bring the application down.

gophers on a wire, talking

Is anyone listening?

(Licensed by Trevor Forrey.)

The solution here is to create the channel with a cache size of at least 1 so that sending data to the channel does not block and explicitly makes sending non-blocking, avoiding concurrent calls. This ensures that the cancel operation is sent only once and does not block subsequent cancellations.

func (timeout* TimeoutHandler) Cancel(a) {  
  if timeout.cancelChan == nil {  
    return  
  }  
   
  select {  
  case timeout.cancelChan <- struct{} {} :default:  
    // Can't send on the channel, someone has already requested the cancellation.}}Copy the code

conclusion

In practice you have learned common mistakes that occur with concurrent operations. Because of its uncertainty, it is not easy to find these problems, even with extensive testing. Here are the three main issues we encountered in our initial implementation:

Update shared data asynchronously

This may seem like an obvious problem, but it can be difficult to spot if concurrent updates occur in different locations. The result is a data race in which some updates are lost from multiple updates to the same data because one update overwrites another. In our case, we are updating scheduling timeout references in the same shared map at the same time. (Interestingly, Go throws a fatal error if it detects concurrent reads and writes on the same mapping object — you can try running Go’s data race detector). This eventually results in the loss of the timeout reference and the inability to cancel the given timeout. Never forget to use locks when necessary.

gopher assembly line

Don’t forget to synchronize gopher’s work

Missing condition check

In cases where lock exclusivity cannot be relied on alone, condition checking is required. The scenarios we encounter are slightly different, but the core idea is the same as condition variables. Given a classic scenario where one producer and multiple consumers use a shared queue, the producer can add an element to the queue and wake up all consumers. This wake up call means that the data in the queue is accessible, and since the queue is shared, consumers must access it synchronously through locks. Every consumer could get a lock; However, you still need to check if there are elements in the queue. Because you don’t know the state of the queue at the moment you pick up the lock, you still need to condition check.

In our example, the timeout handler receives a “wake up” call when the timer expires, but it still needs to check if it has been sent a cancel signal before it can continue executing the callback.

gopher boot camp

If you’re waking up more than one Gopher, you might want to do a condition check

A deadlock

This happens when a thread is stuck, waiting indefinitely for a wakeup signal that never arrives. Deadlocks can kill your application completely by shutting down your entire application.

In our case, this happens because multiple requests are sent to an unbuffered and blocked channel. This means that sending data to a channel can only be returned after receiving data from the channel. Our timeout thread loop quickly receives signals from the cancel channel; However, after receiving the first signal, it will jump out of the loop and never read from the channel again. Other calls are stuck all the time. To avoid this, you need to go through your code carefully, handle blocking calls carefully, and make sure that thread starvation doesn’t occur. The solution in our example is to make the cancel call non-blocking – we don’t need to block the call.


Via: opensource.com/article/19/…

Ferreira, Lujun9972 (Translator: LXBwolf

This article is originally compiled by LCTT and released in Linux China