Tornado and Celery

1.Tornado

Tornado is a powerful, extensible asynchronous HTTP server written in Python and also a web development framework. Tornado is a non-blocking web server that is quite fast. With its non-blocking approach and use of ePoll, Tornado can handle thousands of connections per second, which means that Tornado is an ideal web framework for real-time web services. It is robust enough to handle heavy network traffic, but lightweight enough to be created and written to be used in a wide range of applications and tools.


Learn more about and learn Tornado movable steps:
Tornado Official Documents

2.Celery

Celery is a simple, flexible and reliable distributed system for processing large amounts of messages. It is a task queue that focuses on real-time processing and also supports task scheduling. There are two key concepts in Celery:

  • Worker: A Worker is an independent process that continuously monitors the queue for tasks that need to be processed;
  • Broker: Also known as the middleman or coordinator, the Broker coordinates the communication between clients and workers. The client adds a message to the queue and the broker is responsible for sending the message to the worker.

3.RabbitMQ

RabbitMQ is a kind of messaging middleware that implements AMQP (Advanced Message Queue Protocol). It originated from financial systems and is used to store and forward messages in distributed systems. It has excellent performance in terms of ease of use, scalability and high availability.

RabbitMQ is mainly implemented to achieve two-way decoupling between systems. When producers produce a lot of data, consumers cannot consume it quickly, so an intermediate layer is needed. Save this data.

For example, in a logging system, it is easy to use RabbitMQ to simplify the workload, where one Consumer can handle the normal processing of the messages and another Consumer can log the messages. As long as you specify in your program that the queues that both consumers are listening to are bound to the same exchange in the same way, RabbitMQ does the rest of the message distribution.

In general, a tool library or framework is self-contained, with its own features or points of functionality, and may depend on other libraries, but never on other services. But Celery is a special case. If Celery doesn’t have Broker as a service, it’s completely unusable. Celery supports a variety of brokers, but the main ones are RabbitMQ and Redis. The others are experimental and can be used, but there is no maintainer. RabbitMQ is the recommended broker for production environments. Although Redis is one of the officially named brokers, there are some unexplained problems with using RabbitMQ.

How to configure and use Celery is described in the official documentation

Let’s start with Tornado’s asynchrony

Synchronous blocking of Tornado

During web development with Tornado (and this happens in virtually any language or framework), developers may find that sometimes Tornado’s response is slow, and one of the reasons can be traced back to the source because the request is blocked by other requests. That’s a problem!! Doesn’t Tornado advertise itself as an asynchronous HTTP Web Server? Didn’t you claim to have solved the C10K problem? This is cheating consumers!! However, after further understanding Tornado, it was found that the asynchronous non-blocking described by others is conditional, and only according to what Tornado says can real asynchronous non-blocking be achieved… Let’s start with a small example:

#! /bin/env python import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web import tornado.httpclient import torndb import time from tornado.options import define, options define("port", default=8000, Help ="run on the given port", type=int) db = TornDB.Connection('127.0.0.1:3306', 'user_db', 'username', 'passwd') class MysqlHandler(tornado.web.RequestHandler): def get(self, flag): self.write(db.query('select * from table where flag=%s', flag)) class NowHandler(tornado.web.RequestHandler): def get(self): self.write("i want you, right now!" ) if __name__ == "__main__": tornado.options.parse_command_line() app = tornado.web.Application(handlers=[ (r"/mysql_query/(\d+)", MysqlHandler), (r"/i_want_you_now", NowHandler)]) http_server = tornado.httpserver.HTTPServer(app) http_server.listen(options.port) tornado.ioloop.IOLoop.instance().start()

When we first request /mysql_query and then /i_want_you_now, we find that the second request, which was expected to be returned immediately, is blocked until the first one has finished executing. Why is that? Because most web frameworks use the synchronous blocking model to handle requests, Tornado’s default model is no exception. But Tornado is an asynchronous HTTP server, not so weak is it? In addition, there are some very time-consuming operations in both scenarios, which will block other common requests. How to solve this problem?

I’m sure many of you who have used Tornado will think of @Tornad.Web. asynchronous as the decorator, but that’s where Tornado’s official chickentrap comes in!! Decorator web. Asynchronous can only be used before the verb function (namely the get/post/delete etc.), and need to match the tornado asynchronous client USES, such as httpclient. AsyncHTTPClient, or, The function (operation) that you need to perform asynchronously must also be asynchronous… (I am complain read full bold!!) “, and the developer must explicitly call RequestHandler. Finish in the asynchronous callback function to finish the HTTP request. (because Tornado closes the client’s connection automatically when the function returns)

What does that mean? That is, Tornado: I will only provide you with an asynchronous entry, if you really want to do it asynchronously, either you use some of the asynchronous clients I provided, or you implement an asynchronous operation yourself.

In the case of working with MongoDB, if your function contains a call to mongo (using the Pymongo library) then you can’t use the asynchronous decorator because your mongo call itself is synchronous. If you want to have an asynchronous non-blocking effect, then you can use the asynchronous decorator asynchronous. This driver supports asynchronous manipulation of Mongo. In this case, you can add an asynchronous decorator to the driver and manipulate Mongo in order to achieve an asynchronous non-blocking effect.

Asynchronous non-blocking implementation

So, if you want to use tornado asynchronous call, the first, use the tornado built-in asynchronous client such as httpclient. AsyncHTTPClient, etc. Second, we can refer to the built-in asynchronous client and use Tornad.ioloop. Ioloop to encapsulate an asynchronous client of our own, but the development cost is not small.

However, there is a way to make Tornado asynchronous non-blocking at a lower cost, and that is through the Celery project. As mentioned above, it is a distributed real-time processing message queue scheduling system. When Tornado receives a request, it can hand over all complex business logic processing, database operations, IO and other time-consuming synchronous tasks to Celery, which will be asynchronously processed by the task queue and then returned to Tornado. So as long as the interaction between Tornado and Celery is asynchronous, then the entire service is fully asynchronous. The Tornado-Celery adapter is used to ensure that the interaction between Tornado-Celery is asynchronous.

Celery’s RabbitMQ workflow is as follows:

Here we use these components to override the previous synchronization blocking example:

#! /bin/env python import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web import tornado.httpclient import time import tcelery, tasks from tornado.options import define, options tcelery.setup_nonblocking_producer() define("port", default=8000, help="run on the given port", type=int) class AsyncMysqlHandler(tornado.web.RequestHandler): @tornado.web.asynchronous @tornado.gen.coroutine def get(self, flag): res = yield tornado.gen.Task(tasks.query_mysql.apply_async, args=[flag]) self.write(res.result) self.finish() class NowHandler(tornado.web.RequestHandler): def get(self): self.write("i want you, right now!" ) if __name__ == "__main__": tornado.options.parse_command_line() app = tornado.web.Application(handlers=[ (r"/mysql_query/(\d+)", AsyncMysqlHandler), (r"/i_want_you_now", NowHandler)]) http_server = tornado.httpserver.HTTPServer(app) http_server.listen(options.port) tornado.ioloop.IOLoop.instance().start()

There is a new Tornad.Gen. Coroutine decorator, which was added after 3.0. The previous method is to use a callback function to make an asynchronous call. If you use a callback function, the code is as follows:

#! /bin/env python import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web import tornado.httpclient import time import tcelery, tasks from tornado.options import define, options tcelery.setup_nonblocking_producer() define("port", default=8000, help="run on the given port", type=int) class AsyncMysqlHandler(tornado.web.RequestHandler): @tornado.web.asynchronous def get(self, flag): tasks.query_mysql.apply_async(args=[flag], callback=self.on_result) def on_result(self, response): self.write(response.result) self.finish() class NowHandler(tornado.web.RequestHandler): def get(self): self.write("i want you, right now!" ) if __name__ == "__main__": tornado.options.parse_command_line() app = tornado.web.Application(handlers=[ (r"/mysql_query/(\d+)", AsyncMysqlHandler), (r"/i_want_you_now", NowHandler)]) http_server = tornado.httpserver.HTTPServer(app) http_server.listen(options.port) tornado.ioloop.IOLoop.instance().start()

If you have a large number of asynchronous callbacks and write a callback function for each one of them, the structure of your code will become less clear and elegant. Callbacks are not a popular way to write callback functions, but it depends on your personal preferences. Those who dislike the callback style can use yield to make an asynchronous call.

Tasks. Py concentrates on the functions that developers need to execute asynchronously.

Import time import torndb from celery import celery db = torndb.Connection('127.0.0.1:3306', 'user_db', 'username') 'passwd') app = Celery("tasks", broker="amqp://guest:guest@localhost:5672") app.conf.CELERY_RESULT_BACKEND = "amqp://guest:guest@localhost:5672" @app.task(name='task.query_users') def query_mysql(flag): return db.query('select * from table where flag=%s', flag) if __name__ == "__main__": app.start()

Then start the Celery worker to listen to the task queue (the consumer picks up one task after another from the task queue and executes it) :

celery -A tasks worker --loglevel=info

Since then, it is possible to make Tornado fully asynchronous calls to handle requests based on this architecture.

Problems and Optimization

1. The queue is too long

Asynchronous non-blocking using the above scheme may depend on the length of the Celery task queue, which can lead to long waits and reduced efficiency if there are too many tasks in the queue. Solution:

  • Start multiple celery workers to listen to the task queue and use multiple processes to concurrency the task queue. The celery command can use the -concurrency parameter to specify the prefork of the worker process to execute the task. If all workers are executing the task, the celery command can use the -concurrency parameter to specify the prefork of the worker process. The default concurrency is the number of CPUs on the machine. The default concurrency is the number of CPUs on the machine. In addition, Celery supports several concurrency modes, such as prefork, threading, and coroutines (gevent, eventlet). Prefork uses multiprocess by default. You can specify other concurrency models with the -p parameter, such as gevent (you need to configure the gevent environment yourself).
  • Create a multi-task queue to distribute a large number of tasks to different queues to reduce the number of tasks that might occur in a single queue.

2. Horizontal scaling optimization

As mentioned above, Celery is a distributed system, that is to say, projects based on Celery can achieve distributed expansion painlessly. The Tornado and Celery Demo written above can also achieve independent deployment, that is, Tornado Server and Celery Server can be deployed separately. That is, on different servers, Celery Server deploys its own tasks.py task and starts Celery Worker listening, then adds the following code to Tornado Server:

from celery import Celery
app = Celery(broker = "amqp://",)

And invoke the task using Celery’s send_task function:

app.send_task('function_name', args=[param1, param2, param3...] )

Tornado and Celery can be fully decoupled.

Follow-up:

In addition, learn about the tornado. Concurrent. Futures (py3 bring this library, py2 require separate installation) this module can realize asynchronous custom function, there is no insight into this stuff, have the time to research this thing, Have experience to share this module related knowledge.