This article appeared in:Walker AI

In the field of reinforcement learning, we often treat the real game with complex logic as a black box, and use network communication to interact with its data to achieve the purpose of training. In order to train efficiency, multiple processes are often used in Python to play the role of multiple actors to interact with the game black box for data. Therefore, in such an environment, it is highly possible for multiple processes to compete for the same communication port (race mode scenario), which may easily cause process errors.

Based on that background, this article first presents a general solution, then analyzes the limitations of the general approach, then introduces a locking based solution, and finally presents an actual Python routine. This paper is mainly divided into the following three parts:

(1) Conventional solutions

(2) Idle port query based on the locking mechanism

(3) Python code implementation

1. Conventional solutions

One of the most obvious solutions to this problem is to check each time a process calls a port to see if it is already in use, and if so, skip the search for the next port until it finds an unused port and returns it to the caller. Let’s simply implement this process in Python:

import socket 

def get_free_port(ip, init_port) : 
    while True: 
        try:
            sock = socket.socket()
            sock.bind((ip, init_port))
            ip, port = sock.getnameinfo()
            sock.close()
            return port
        except Exception:
            init_port += 1
            continue

Copy the code

After a little analysis of the above ideas, it can be roughly divided into three steps:

(1) Use sock. Bind () to automatically bind the port to determine whether the port is available;

(2) If the bond proves that the port is available, use sock.close() to release the port; If not, the port is already in use. Add 1 to the port number and continue to try to bind until it succeeds. Then use socket.close () to release the port again.

(3) Return the released port number to the program that needs to be called.

These ideas seem to work, but they’re not enough. This idea can only make the port unbound in a very short period of time, which is difficult to be used in the race scenario. In a race environment, as the number of processes increases, there is a high probability that the following will happen:



As shown in the figure above, the same port number X is looked up by process A and process B. Process B is A little slower than process A. When process A releases port X and prepares to return to the program for binding, process B binds port X for checking. In this case, process A cannot bind port X any more, causing an error.

Therefore, it can be seen that it is not easy to return a safe port number in the race scenario, even if using random seed random initial port number, this situation can not be completely avoided; We need a mechanism to ensure that an unbound port number cannot be arbitrarily bound by another process.

2. Query idle ports based on the locking mechanism

Using a Lock file ensures that even if the port number is unbound, it will not be bound by another process until the lock is acquired. The fasteners. Process_lock.InterProcessLock is used to lock a port.

(1) sock. Bind () Automatically bind to check whether the port is available;

(2) Lock the port with InterProcessLock if it is available; Otherwise, continue to check until an available port is detected and then lock it;

(3) Unbind the port.

(4) Safely return this port number;

(5) Unlock the port.

The process is as follows:

To review the above process, the port is secure between each step after port X is bound.

3. Python code implementation

First, define two classes: class BindFreePort(), which searches for and detects available ports, and class FreePort(). The latter inputs the detection results of the former and uses the mechanism described in Chapter 2 to output port security to the desired program.

import os
import fasteners
import threading


class BindFreePort(object) :
    def __init__(self, start, stop) :
        self.port = None

        import random, socket

        self.sock = socket.socket()

        while True:
            port = random.randint(start, stop)
            try:
                self.sock.bind(('127.0.0.1', port))
                self.port = port
				import time
				time.sleep(0.01)
                break
            except Exception:
                continue

    def release(self) :
        assert self.port is not None
        self.sock.close()
    

class FreePort(object) :
    used_ports = set(a)def __init__(self, start=4000, stop=6000) :
        self.lock = None
        self.bind = None
        self.port = None

        from fasteners.process_lock import InterProcessLock
        import time
        pid = os.getpid()

        while True:
            bind = BindFreePort(start, stop)

            print(f'{pid} got port : {bind.port}')

            if bind.port in self.used_ports:
                print(f'{pid} will release port : {bind.port}')
                bind.release()
                continue

            ''' Since we cannot be certain the user will bind the port 'immediately' (actually it is not possible using this flow. We must ensure that the port will not be reacquired even it is not bound to anything '''
            lock = InterProcessLock(path='/tmp/socialdna/port_{}_lock'.format(bind.port))
            success = lock.acquire(blocking=False)

            if success:
                self.lock = lock
                self.port = bind.port
                self.used_ports.add(bind.port)
                bind.release()
                break

            bind.release()
            time.sleep(0.01)

    def release(self) :
        assert self.lock is not None
        assert self.port is not None
        self.used_ports.remove(self.port)
        self.lock.release()
Copy the code

Then, based on the functionality of these two classes, let’s take a simple test:

def get_and_bind_freeport(*args) :
    freeport = FreePort(start=4000, stop=4009)
	import time
	time.sleep(0.5)
    return freeport.port

def test() :
    from multiprocessing.pool import Pool
    jobs = 10
    p = Pool(jobs)
    ports = p.map(get_and_bind_freeport, range(jobs))
    print(f'[ports]: {ports}')
    assert len(ports) == len(set(ports))
    p.close()

if __name__ == '__main__':
    test()
Copy the code

In the above code, we built the function get_and_bind_freeport() to return a port as described in Chapter 2, simulating the in-process time disturbance with time.sleep(0.5), where the port search range is 4000 to 4009; The function test() starts 10 processes from the process pool, and each process maps a function get_and_bind_freeport() that searches for a port number from 4000 to 4009 and returns it safely.

If the port number is safe during the whole process, the return result should be len(ports) == len(set(ports)), that is, the 10 ports are checked by 10 processes respectively, and there is no case where multiple processes return the same port number.

4. To summarize

This paper compares two methods to solve the phenomenon of multi-process competing communication ports in reinforcement learning training in real games. Through the analysis and comparison of principles and practical experiments, we come to the conclusion that when looking up the communication port number in the race scenario, compared with the conventional thinking based on the lock mechanism, the port can be more safely applied to this scenario. At the same time, it also reminds us that in the process of reinforcement learning in the field of games, we will face many practical problems, and only in the actual engineering practice and continuous summary, can we achieve the goal.

Reference 5.

[1] Fasteners

[2] Synchronization between processes


PS: more dry technology, pay attention to the public, | xingzhe_ai 】, and walker to discuss together!