Advanced use of Python

Python is a high-level language with a lot of syntax, and many are unique to the Python language. RQ, a popular distributed task framework, also uses many of Python’s syntactic sugar. Read the source code to understand how to better use these syntactic sugar and magic methods.

propery

  • Description: the @property syntax provides a much cleaner and more intuitive way to write property() functions. The method decorated with @property is the method that gets the value of the property, and the name of the decorated method is used as the property name. Methods decorated with @ property name.setter are methods that set property values. Methods decorated with @ property name.deleter are methods that delete property values.
## Job

    @property
    def func_name(self):
        return self._func_name

    @property
    def func(self):
        func_name = self.func_name
        if func_name is None:
            return None

        module_name, func_name = func_name.rsplit('.', 1)
        module = importlib.import_module(module_name)
        return getattr(module, func_name)

    @property
    def args(self):
        return self._args

    @property
    def kwargs(self):
        return self._kwargs
Copy the code
  • Summary: @Property encapsulates complex details, customizing the behavior of getting properties, setting properties, and deleting properties.

A decorator

Decorators can be used for classes as well as methods, to decorate methods and classes.

@total_ordering
class Queue(object):
    redis_queue_namespace_prefix = 'rq:queue:'


def total_ordering(cls):
    """Class decorator that fills in missing ordering methods"""
    # Find user-defined comparisons (not those inherited from object).
    roots = {op for op in _convert if getattr(cls, op, None) is not getattr(object, op, None)}
    if not roots:
        raise ValueError('must define at least one ordering operation: < > <= >=')
    root = max(roots)       # prefer __lt__ to __le__ to __gt__ to __ge__
    for opname, opfunc in _convert[root]:
        if opname not in roots:
            opfunc.__name__ = opname
            setattr(cls, opname, opfunc)
    return cls
Copy the code

Context manager

The context manager is python-specific and is designed to simplify code operations, make code simpler and maintainable, reduce redundancy, and make it more reusable.

Try: VAR = XXXXX except XXX as e: PASS finally: DO SOMETHING with EXPR as VAR: BLOCKCopy the code

Here is a standard context manager usage logic, with a little explanation of the runtime logic:

(1) Execute EXPR statement, obtain Context Manager

(2) Call the __enter__ method in the context manager, which does some preprocessing.

(3) As VAR can be omitted; if not, the return value of the __enter__ method is assigned to VAR.

(4) Execute the code BLOCK, where VAR can be used as a normal variable.

(5) Finally call the __exit__ method in the context manager.

The exit__ method takes three parameters: exc_type, exc_val, exc_tb. If a code BLOCK fails and exits, the type, value, and traceback of the exception correspond. Otherwise, all three parameters are None. If the __exit() method returns false, the exception is rethrown; If the return value is true, the exception is considered handled and the program continues.

class death_pentalty_after(object):
    def __init__(self, timeout):
        self._timeout = timeout

    def __enter__(self):
        self.setup_death_penalty()

    def __exit__(self, type, value, traceback):
        # Always cancel immediately, since we're done
        try:
            self.cancel_death_penalty()
        except JobTimeoutException:
            pass
        return False

    def handle_death_penalty(self, signum, frame):
        raise JobTimeoutException('Job exceeded maximum timeout '
                'value (%d seconds).' % self._timeout)

    def setup_death_penalty(self):
        signal.signal(signal.SIGALRM, self.handle_death_penalty)
        signal.alarm(self._timeout)

    def cancel_death_penalty(self):
        signal.alarm(0)
        signal.signal(signal.SIGALRM, signal.SIG_DFL)
Copy the code

RQ uses the context manager to manage the timeout of the job. It works with the signal module to complete the timeout control of the job. It registers the signal handler at the beginning, sets the timing signal, and cancells the timing signal and ignores the signal at the end.

def perform_job(self, job):
    """Performs the actual work of a job.  Will/should only be called
    inside the work horse's process.
    """
    self.procline('Processing %s from %s since %s' % (
        job.func_name,
        job.origin, time.time()))

    try:
        with death_pentalty_after(job.timeout or 180):
            rv = job.perform()
    except Exception as e:
        fq = self.failed_queue
        self.log.exception(red(str(e)))
        self.log.warning('Moving job to %s queue.' % fq.name)

        fq.quarantine(job, exc_info=traceback.format_exc())
        return False

    if rv is None:
        self.log.info('Job OK')
    else:
        self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),))

    if rv is not None:
        p = self.connection.pipeline()
        p.hset(job.key, 'result', dumps(rv))
        p.expire(job.key, self.rv_ttl)
        p.execute()
    else:
        # Cleanup immediately
        job.delete()
Copy the code