Download:Python3 Advanced Core Technology lecture 97

It is no exaggeration to say that this course is a must for beginner/intermediate Python developers as they move up the ladder. Many Pythoners prefer to pursue new frameworks, but do not focus on the basics of Python itself. What they do not know is that the language itself takes precedence over the framework, and the larger companies are more focused on the language itself. Great oaks grow from little acorns, learn the basic knowledge of Python senior to learn other framework, will get twice the result with half the effort, to better understand and use these frameworks for the crowd and the technical reserve requirements this course can be used as a lesson for net July teacher introduction to the entire network hottest Python3 + advanced “the advanced course, if you study the teacher’s course of July After learning this course, you will be able to complete the whole process of Python3 entry, advanced, in-depth, you can learn Python3 all knowledge points, learn through, for the future use of Python3 and Python framework for actual development to lay a very solid foundation, master these knowledge, you will be invincible, without going to the enemy. In today’s world of micro services, the application of message queues is very common. But before that, my cognition of message queues was only about what they are and what they can do. I have never used a single voice queue, nor have any knowledge of its practical application, but given the state of the technology in the market today, the voice queue used to be one of the central components of web backend development that must be controlled, so I took the time to understand. Today we are sharing rocketMQ. It is also a component of Alibaba open Source. In 2016, Alibaba donated it to Apache Open Source Foundation, and it is currently one of the top projects of the foundation.

Before we start the body of the text, let’s take a look at some of the science of message queues.

What is queue of sound message queue will tell simply, it is similar to the container that registers sound message, sound message consumer puts sound message into sound message queue, consumer takes out sound message from sound message queue to stop consumption (write order for example). Voice queue is an important component in distributed system. The main purpose of using voice queue is to improve system performance, reduce peak load and reduce system coupling through asynchronous processing. The most popular queues are ActiveMQ, RabbitMQ, Kafka and RocketMQ.

With the development of Internet business needs from time to time, the traditional architecture has been unable to meet the needs, especially in the current development of e-commerce business is more so. In response to the millions of concurrent and thousands of concurrent and concurrent number of the concurrent characteristic is the concurrency value in the form of concentration in a certain period of time, such as double tenth, 12-12 or promotions, discounts, and other times the concurrency value is one of the very peak of concurrency, down to one percent, if according to the peak of the concurrency value to design the system architecture, Not only the initial investment consumes huge sums of money, but also in the daily operation and maintenance is very expensive. In order to balance the amount of concurrency in these two situations and reduce the cost of establishing the system, the system component such as the voice queue came into being.

Simply put, the message queue is a system component that balances the concurrency of the system, and its main function is to cut the peak and fill the valley. Let’s assume that red represents the concurrent timing diagram of the system that does not participate in the voice queue component (the diagram here is drawn arbitrarily, just to clarify the issue) and blue represents the architecture that does participate in the voice queue component:

! [] ()

After comparison we can find, we found that the introduction of the audio queue components, reducing the peak value of the order system, as we will be a local peak solicit business disposal, the system pressure hours to disposal, and of course practice in haphazard order system in process of the disposition of the audio queue in order information, until all orders for disposal. Of course, it is mainly aimed at some businesses with weak real-time request but large concurrency, such as ticket purchase, purchase order and other businesses that allow pushing disposal results later.

To reduce the coupling between systems and improve user experience, let’s imagine such an application scenario. We want to make a ticket purchasing system, and after winning the ticket, we need to inform the user of the ticket purchasing result by text message. If the serial method is adopted (that is, using at the same pace), our call method is as follows: After the user submits the ticket purchase order, the order system accepts the ticket purchase order and calls the SMS system to send the ticket purchase result. Let’s put a picture here to make it clearer:

! [] ()

Serial architecture of the system response time is 150 + 150 ms, ms 300 ms, demand from the business process, after the victory of order acceptance to send the ticket as a result, it is no problem, but if the messaging system in a certain time period system downtime, short message service is not available, the ticket message can’t send, finally incur as a result of the user is unable to place an order, Not only does it affect the user experience, it also affects the business benefit of the company.

And to deepen the analysis under the business process, you will find that the message can send has nothing to do with the user submit orders operation is, the user needs know whether its order disposal of victory, and as for SMS send tickets into results, for the entire business center of concurrent business, even if the message sent failure, should not affect the ticket business.

So we have a more reasonable business disposal way is that the order system to accept the victory, will accept the results back to the user directly, as for the disposal result of orders, can such as after completion of the entire business is generated, and then order system to send the order to the SMS system disposal from the results, text messaging system to send the ticket to users as a result, after receipt of the audio Here, the container for storing messages is message queue. At this time, our system mechanism is like this:

! [] ()

This architecture, we send a text message service is asynchronous, such not only can reduce the user waiting time, improve service response efficiency, if save message system disposal time, so the response time of the final business was reduced to 150 ms, and the architecture of the system coupling is lower and send changes, for example if the future business requirements, Not only need to send short messages to users, but also push the results to the wechat public account, which takes 100ms for the service, and even need to send the results to users by email, which takes 150ms for the service. If the first architecture is still used, then the response time of users is 150ms + 150ms + 100ms + 150ms. But for the second architecture, the user response time is always 150ms:

! [] ()

The latest version of RocketMQ is 4.8, the following portal:

www.apache.org/dyn/closer…. ! [] ()

Click the address above, the address in the red box is the recommended domestic mirror address, download faster

The device unzips the downloaded ZIP file

Windows directly austerity software decompression can be

linux

Unzip rocketmq-all-4.8.0-source-release.zip CD Rocketmq-all-4.8.0 / mvn-prelease – all-dskiptests clean install -u CD rocketmq-all-4.8.0/ mvn-prelease – all-dskiptests clean install -u CD Distribution/target/rocketmq – 4.8.0 / rocketmq – 4.8.0 set the environment variable Set Windows demand here, Linux doesn’t demand, of course, I don’t have a test in the Linux environment, have fun little fellow himself to experiment. Add the following environment variables in Windows:

ROCKETMQ_HOME = “D: \ workspace \ tools \ rocketmq – all – 4.8.0 – bin – release” NAMESRV_ADDR = “localhost: 9876”! [] ()

! [image-20210310230651594]()

Or set it in the Powershell window before startup. This is temporary.

Env: ROCKETMQ_HOME = “D: workspacetoolsrocketmq – all – 4.8.0 – bin – release” Env: ROCKETMQ \ _HOME = “D: \ \ workspace \ \ tools \ \ ROCKETMQ – all – 4.8.0 – bin – release “Env: ROCKETMQ_HOME =” D: workspacetoolsrocketmq – all – 4.8.0 – bin – release “Env: NAMESRV_ADDR =” localhost: 9876″ Start Start Name Service Linux

nohup sh bin/mqnamesrv &

tail -f ~/logs/rocketmqlogs/namesrv.log

The Name Server boot success…

windows

Start powerShell. If environment variables are not set, perform the following operations to stop setting environment variables

Env: ROCKETMQ_HOME = “D: workspacetoolsrocketmq – all – 4.8.0 – bin – release” Env: ROCKETMQ \ _HOME = “D: \ \ workspace \ \ tools \ \ ROCKETMQ – all – 4.8.0 – bin – release “Env: ROCKETMQ_HOME =” D: workspacetoolsrocketmq – all – 4.8.0 – bin – release “Env: NAMESRV_ADDR =” localhost: 9876″ Then go to the rocketMQ appliance directory and do the following

CD D: \ workspace \ tools \ rocketmq – all – 4.8.0 – bin – release. \ bin \ mqnamesrv CMD! [] ()

! [] ()

Start agent Service Linux

nohup sh bin/mqbroker -n localhost:9876 & tail -f ~/logs/rocketmqlogs/broker.log The broker[%s, 172.30.30.233:10911] the boot success… windows

Just like starting the Name service above, the previous two lines of setting the environment variable are performed before setting the environment variable

Env: ROCKETMQ_HOME = “D: workspacetoolsrocketmq – all – 4.8.0 – bin – release” Env: ROCKETMQ \ _HOME = “D: \ \ workspace \ \ tools \ \ ROCKETMQ – all – 4.8.0 – bin – release “Env: ROCKETMQ_HOME =” D: workspacetoolsrocketmq – all – 4.8.0 – bin – release “Env: NAMESRV_ADDR =” localhost: 9876″ Then perform the startup operation

CD D:\workspace\tools\ rocketmq-all-4.8.0-bin-release.\ bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true Before operation, be sure to enter the rocketMQ device directory, otherwise reward the following red error

! [] ()

Accept & Send Voice Messages Send voice messages Linux

export NAMESRV_ADDR=localhost:9876

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

SendResult [sendStatus=SEND_OK, msgId= …

windows

Similarly, if you do not set environment variables, remember to set them first. If you do not bother to set eternal environment variables, refer to Setting environment variables

CD D: \ workspace \ tools \ rocketmq – all – 4.8.0 – bin – release. \ bin \ tools. CMD org. Apache. Rocketmq. Example. The quickstart. Producer After executing the command, you will see that we have sent many messages to the message queue

! [] ()

Accept voice Linux

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

ConsumeMessageThread_%d Receive New Messages: [MessageExt…

windows

Also, if you haven’t set environment variables, set them first

CD D: \ workspace \ tools \ rocketmq – all – 4.8.0 – bin – release. \ bin \ tools. CMD org. Apache. Rocketmq. Example. The quickstart. Consumer After executing the command above, you can see the message received by the console

! [] ()

Java simple demo here in the official website can be seen, are also very simple, requirements supplement clarify, I will stop to explain. Before starting the project, introduce the following dependencies:

Org. Apache. Rocketmq rocketmq - client 4.3.0Copy the code

Send a synchronous audio import org. Apache. Rocketmq. Client. Producer. DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; / * *

  • @program: rocketmq-demo

  • @description: Sends synchronous messages

  • @author: syske

  • @create: 2021-03-09 20:24

    / public class SyncProducer {public static void main(String[] args) throws Exception {// Instantiate the Producer DefaultMQProducer producer = new DefaultMQProducer(“please_rename_unique_group_name”); // Set the address of NameServer (producer.setNamesrvaddr (“localhost:9876”)); // Start Producer instance producer.start(); for (int i = 0; i < 100; Message MSG = new Message(“TopicTest”) {// Set Topic, Tag and Message body MSG = new Message(“TopicTest” /

    Topic

    /,

    “TagA” /

    Tag

    /,

    (“Hello RocketMQ ” + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /

    Message body */ ); // Send a message to a Broker SendResult SendResult = producer. Send (MSG); Printf (“%s%n”, sendResult); } // If no more messages are sent, close the Producer instance. producer.shutdown(); }} to send an asynchronous audio import org. Apache. Rocketmq. Client. Producer. DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.CountDownLatch2; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import java.util.concurrent.TimeUnit; / * *

  • @program: rocketmq-demo

  • @description: Asynchronous message consumer

  • @author: syske

  • @create: 2021-03-09 20:28 */ public class AsyncProducer { public static void main(String[] args) throws Exception { // Producer DefaultMQProducer = new DefaultMQProducer(“please_rename_unique_group_name”); // Set the address of NameServer (producer.setNamesrvaddr (“localhost:9876”)); // Start Producer instance producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; Final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount); for (int i = 0; i < messageCount; i++) { final int index = i; // Create a Topic. MSG = new Message(“TopicTest”, “TagA”, “OrderID188”, “Hello world”.getBytes(RemotingHelper.DEFAULT_CHARSET)); // SendCallback accepts the callback of the asynchronously returned result. new SendCallback() {br/>@Override public void onSuccess(SendResult sendResult) { System.out.printf(“%-10d OK %s %n”, index,br/>sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf(“%-10d Exception %s %n”, index, e); e.printStackTrace(); }}); } // wait 5s countdownlatch.await (5, timeunit.seconds); // If no more messages are sent, close the Producer instance. producer.shutdown(); }} Send unidirectional messages Unidirectional messages are messages with no return value; } @Override

;

}

@Override<br/)

[

import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; / * *

](mailto:br/>sendResult.getMsgId());

}

@Override<br/)

[* @program: rocketmq-demo

  • @description: One-way message consumer

  • @author: syske

  • @create: 2021-03-09 20:30

    / public class OnewayProducer {public static void main(String[] args) throws Exception{// Instantiate the Producer DefaultMQProducer producer = new DefaultMQProducer(“please_rename_unique_group_name”); // Set the address of NameServer (producer.setNamesrvaddr (“localhost:9876”)); // Start Producer instance producer.start(); for (int i = 0; i < 100; Message MSG = new Message(“TopicTest”) {// Set Topic, Tag and Message body MSG = new Message(“TopicTest” /

    Topic

    /,

    “TagA” /

    Tag

    /,

    (“Hello RocketMQ ” + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /

    Message body */ ); SendOneway (MSG); // Producer.sendoneway (MSG); } // If no more messages are sent, close the Producer instance. producer.shutdown(); }} from consumers to import org. Apache. Rocketmq. Client. Consumer. DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; / * *

  • @program: rocketmq-demo

  • @Description: Audio consumers

  • @author: syske](mailto:br/>sendResult.getMsgId());

    }

    @Override<br/)* [@create: 2021-03-09 20:26

    / public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate the consumer DefaultMQPushConsumer Consumer = new DefaultMQPushConsumer(“please_rename_unique_group_name”); SetNamesrvAddr (“localhost:9876”); // Set NameServer’s address consumer.setNamesrvaddr (“localhost:9876”); // Subscribe to one or more topics and tags to filter consumers. subscribe(“TopicTest”,”

    “); / / register callback complete to deal with from the broker to pull back from consumer. RegisterMessageListener (new MessageListenerConcurrently () {](mailto:br/>sendResult.getMsgId()); } @Override

    @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf(“%s Receive New Messages