sequence

In this paper, we study the rocketmq ListenerContainerConfiguration

ListenerContainerConfiguration

Rocketmq – spring – the boot – 2.0.3 – sources. The jar! /org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java

@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
    private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);

    private ConfigurableApplicationContext applicationContext;

    private AtomicLong counter = new AtomicLong(0);

    private StandardEnvironment environment;

    private RocketMQProperties rocketMQProperties;

    private ObjectMapper objectMapper;

    public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper,
        StandardEnvironment environment,
        RocketMQProperties rocketMQProperties) {
        this.objectMapper = rocketMQMessageObjectMapper;
        this.environment = environment;
        this.rocketMQProperties = rocketMQProperties;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }

    @Override
    public void afterSingletonsInstantiated() {
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);

        if(Objects.nonNull(beans)) { beans.forEach(this::registerContainer); } } private void registerContainer(String beanName, Object bean) { Class<? > clazz = AopProxyUtils.ultimateTargetClass(bean);if(! RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz +" is not instance of " + RocketMQListener.class.getName());
        }

        RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
        validate(annotation);

        String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
            counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;

        genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
            () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
        DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
            DefaultRocketMQListenerContainer.class);
        if(! container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e);
                throw new RuntimeException(e);
            }
        }

        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
    }

    private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) {
        DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();

        String nameServer = environment.resolvePlaceholders(annotation.nameServer());
        nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
        String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
        container.setNameServer(nameServer);
        if(! StringUtils.isEmpty(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener) bean); container.setObjectMapper(objectMapper); container.setName(name); // REVIEW ME, use the same clientId or multiple?return container;
    }

    private void validate(RocketMQMessageListener annotation) {
        if (annotation.consumeMode() == ConsumeMode.ORDERLY &&
            annotation.messageModel() == MessageModel.BROADCASTING) {
            throw new BeanDefinitionValidationException(
                "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!"); }}}Copy the code
  • ListenerContainerConfiguration ApplicationContextAware, SmartInitializingSingleton interface is realized
  • Its setApplicationContext method holds the applicationContext; Its afterSingletonsInstantiated method will get marked RocketMQMessageListener annotations of bean, then each registerContainer execution
  • The registerContainer method first determines whether the bean is an implementation class of RocketMQListener and throws an IllegalStateException if it is not; The RocketMQMessageListener annotation is then retrieved to determine whether an unsupported attribute is set; After DefaultRocketMQListenerContainer createRocketMQListenerContainer created and registered with the applicationContext, Then execute the start method for containers that are not running

RocketMQMessageListener

Rocketmq – spring – the boot – 2.0.3 – sources. The jar! /org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {

    String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}"; /** * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve *  load balance. It's required and needs to be globally unique. * * * See here for further discussion. */ String consumerGroup(); /** * Topic name. */ String topic(); /** * Control how to selector message. * * @see SelectorType */ SelectorType selectorType() default SelectorType.TAG; /** * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92} */  String selectorExpression() default "*"; /** * Control consume mode, you can choice receive message concurrently or orderly. */ ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; /** * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice. */ MessageModel messageModel() default MessageModel.CLUSTERING; /** * Max consumer thread number. */ int consumeThreadMax() default 64; /** * Max consumer timeout, default 30s. */ long consumeTimeout() default 30000L; /** * The property of "access-key". */ String accessKey() default ACCESS_KEY_PLACEHOLDER; /** * The property of "secret-key". */ String secretKey() default SECRET_KEY_PLACEHOLDER; /** * Switch flag instance for message trace. */ boolean enableMsgTrace() default true; /** * The name value of message trace topic.If you don't config,you can use the default trace topic name.
     */
    String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;

    /**
     * The property of "name-server".
     */
    String nameServer() default NAME_SERVER_PLACEHOLDER;

    /**
     * The property of "access-channel".
     */
    String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}
Copy the code
  • RocketMQMessageListener annotations define consumerGroup, Topic, selectorType, selectorExpression, consumeMode, messageModel, and consumeThreadMa X, consumeTimeout, accessKey, secretKey, enableMsgTrace, customizedTraceTopic, nameServer, accessChannel attributes

summary

  • ListenerContainerConfiguration ApplicationContextAware, SmartInitializingSingleton interface is realized
  • Its setApplicationContext method holds the applicationContext; Its afterSingletonsInstantiated method will get marked RocketMQMessageListener annotations of bean, then each registerContainer execution
  • The registerContainer method first determines whether the bean is an implementation class of RocketMQListener and throws an IllegalStateException if it is not; The RocketMQMessageListener annotation is then retrieved to determine whether an unsupported attribute is set; After DefaultRocketMQListenerContainer createRocketMQListenerContainer created and registered with the applicationContext, Then execute the start method for containers that are not running

doc

  • ListenerContainerConfiguration