The opening

  • This series examines the core commands common to MQadmin, including subscription grouping and topic creation and deletion, and topic permission changes.
  • This article is intended to analyze Topic creation and deletion.


Create a Topic

The core steps for creating a Topic are as follows

  • Mqadmin issues a command to the broker to create a Topic.
  • The topicConfig configuration corresponding to the Topic generated by the broker is stored in the TopicConfigManager of the broker.
  • Broker reports topicConfig information to all NamesRV.
  • TopicQueueTable of Namesrv RouteInfoManager saves topic QueueData.
  • 5. The Broker periodically sends heartbeat messages to NamesRV to update the topic configuration via a scheduled task.

updateTopic

usage: mqadmin updateTopic -b <arg> | -c <arg>  [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>] -t
       <arg> [-u <arg>] [-w <arg>]
 -b,--brokerAddr <arg>       create topic to which broker
 -c,--clusterName <arg>      create topic to which cluster
 -h,--help                   Print help-n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876; 192.168.0.2:9876 - o - order < arg >set topic's order(true|false) -p,--perm 
      
        set topic'
      s permission(2|4|6), intro[2:W 4:R; 6:RW]
 -r,--readQueueNums <arg>    set read queue nums
 -s,--hasUnitSub <arg>       has unit sub (true|false)
 -t,--topic <arg>            topic name
 -u,--unit <arg>             is unit topic (true|false)
 -w,--writeQueueNums <arg>   set write queue nums
Copy the code
  • Create a topic at the specified broker with –brokerAddr.
  • Create topics across the cluster with –clusterName.
  • Specify the namesRV address with –namesrvAddr.
  • The topic name is specified by –topic.
  • Specify permission management for a Topic with –perm.


UpdateTopicSubCommand

public class UpdateTopicSubCommand implements SubCommand {

    @Override
    public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); TopicConfig = new TopicConfig(); TopicConfig = new TopicConfig(); topicConfig.setReadQueueNums(8); topicConfig.setWriteQueueNums(8); topicConfig.setTopicName(commandLine.getOptionValue('t').trim());

            // readQueueNums
            if (commandLine.hasOption('r')) {
                topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
            }

            // writeQueueNums
            if (commandLine.hasOption('w')) {
                topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
            }

            // perm
            if (commandLine.hasOption('p')) {
                topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));
            }

            boolean isUnit = false;
            if (commandLine.hasOption('u')) {
                isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim());
            }

            boolean isCenterSync = false;
            if (commandLine.hasOption('s')) {
                isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim());
            }

            int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync);
            topicConfig.setTopicSysFlag(topicCenterSync);

            boolean isOrder = false;
            if (commandLine.hasOption('o')) {
                isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
            }
            topicConfig.setOrder(isOrder);

            if (commandLine.hasOption('b')) {
                String addr = commandLine.getOptionValue('b').trim();

                defaultMQAdminExt.start();
                defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);

                if (isOrder) {
                    String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);
                    String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();
                    defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);
                    System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]",
                        isOrder, orderConf.toString()));
                }
                System.out.printf("create topic to %s success.%n", addr);
                System.out.printf("%s", topicConfig);
                return;

            } else if (commandLine.hasOption('c')) {
                String clusterName = commandLine.getOptionValue('c').trim();

                defaultMQAdminExt.start();

                Set<String> masterSet =
                    CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
                for (String addr : masterSet) {
                    defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
                    System.out.printf("create topic to %s success.%n", addr);
                }

                if (isOrder) {
                    Set<String> brokerNameSet =
                        CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
                    StringBuilder orderConf = new StringBuilder();
                    String splitor = "";
                    for (String s : brokerNameSet) {
                        orderConf.append(splitor).append(s).append(":")
                            .append(topicConfig.getWriteQueueNums());
                        splitor = ";";
                    }
                    defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
                        orderConf.toString(), true);
                    System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf);
                }

                System.out.printf("%s", topicConfig);
                return;
            }

            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
        } catch (Exception e) {
            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally { defaultMQAdminExt.shutdown(); }}}Copy the code
  • The default read/write queue for Topic is 8.
  • For scenarios that specify brokers, topics are created only on the specified broker machine.
  • For cluster-specific scenarios, get all brokers under the cluster and create topics to all broker machines.


mqadmin MQClientAPIImpl

public class MQClientAPIImpl { public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topicConfig.getTopicName()); requestHeader.setDefaultTopic(defaultTopic); requestHeader.setReadQueueNums(topicConfig.getReadQueueNums()); requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums()); requestHeader.setPerm(topicConfig.getPerm()); requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name()); requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag()); requestHeader.setOrder(topicConfig.isOrder()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response ! = null; switch (response.getCode()) {case ResponseCode.SUCCESS: {
                return;
            }
            default:
                break; } throw new MQClientException(response.getCode(), response.getRemark()); }}Copy the code
  • Create a Topic with a RequestCode of UPDATE_AND_CREATE_TOPIC.


broker AdminBrokerProcessor

public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {

    private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {

        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final CreateTopicRequestHeader requestHeader =
            (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);

        if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
            String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
            log.warn(errorMsg);
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(errorMsg);
            return response;
        }

        if(! TopicValidator.validateTopic(requestHeader.getTopic(), response)) {return response;
        }

        try {
            response.setCode(ResponseCode.SUCCESS);
            response.setOpaque(request.getOpaque());
            response.markResponseType();
            response.setRemark(null);
            ctx.writeAndFlush(response);
        } catch (Exception e) {
            log.error("Failed to produce a proper response", e); } // TopicConfig TopicConfig = new TopicConfig(requestheader.gettopic ()); topicConfig.setReadQueueNums(requestHeader.getReadQueueNums()); topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums()); topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum()); topicConfig.setPerm(requestHeader.getPerm()); topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag()); / / broker save Topic configuration information enclosing brokerController. GetTopicConfigManager () updateTopicConfig (topicConfig); // The broker reports topic information to namesRV. this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());returnnull; }}Copy the code
  • Create a TopicConfig object and save it through TopicConfigManager.
  • Through brokerController# registerIncrementBrokerData to report to namesrv topic information.


broker TopicConfigManager

public class TopicConfigManager extends ConfigManager {

    private static final long LOCK_TIMEOUT_MILLIS = 3000;
    private transient final Lock lockTopicConfigTable = new ReentrantLock();

    private final ConcurrentMap<String, TopicConfig> topicConfigTable =
        new ConcurrentHashMap<String, TopicConfig>(1024);
    private final DataVersion dataVersion = new DataVersion();
    private final Set<String> systemTopicList = new HashSet<String>();
    private transient BrokerController brokerController;

    public void updateTopicConfig(final TopicConfig topicConfig) {
        TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        if(old ! = null) { log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);
        } else {
            log.info("create new topic [{}]", topicConfig); } this.dataVersion.nextVersion(); this.persist(); }}Copy the code
  • TopicConfigTable is used to store the Topic and its corresponding TopicConfig.


broker BrokerController

public class BrokerController { public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) { TopicConfig registerTopicConfig = topicConfig; // If the broker itself has unreadable and unwritable permissions, then the read and write permissions of the broker prevailif(! PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || ! PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { registerTopicConfig = new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission()); } ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig); TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); topicConfigSerializeWrapper.setDataVersion(dataVersion); topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable); // The logic to register topic information with namesRVdoRegisterBrokerAll(true.false, topicConfigSerializeWrapper);
    }


    private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) {/ / send the topic to all namesrv registration information List < RegisterBrokerResult > registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper, this.filterServerManager.buildNewFilterServerList(), oneway, this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isCompressedRegister());if (registerBrokerResultList.size() > 0) {
            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
            if(registerBrokerResult ! = null) {if(this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() ! = null) { this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); } this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());if (checkOrderConfig) {
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
            }
        }
    }
}
Copy the code
  • RegisterIncrementBrokerData first checks the broker itself the read and write access to regenerate new topicConfigSerializeWrapper.
  • Register the latest topic information with namesrv via brokerOuterAPI#registerBrokerAll.


broker BrokerOuterAPI

public class BrokerOuterAPI {

    private RegisterBrokerResult registerBroker(
        final String namesrvAddr,
        final boolean oneway,
        final int timeoutMills,
        final RegisterBrokerRequestHeader requestHeader,
        final byte[] body
    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
        InterruptedException {

        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        request.setBody(body);
        // oneway=false, this branch will not executeif (oneway) {
            try {
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            returnnull; } / / report to namesrv RemotingCommand response = this. RemotingClient. InvokeSync (namesrvAddr, request, timeoutMills); // Handle the result report and synchronize the master and slave assert Response! = null; switch (response.getCode()) {case ResponseCode.SUCCESS: {
                RegisterBrokerResponseHeader responseHeader =
                    (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                RegisterBrokerResult result = new RegisterBrokerResult();
                result.setMasterAddr(responseHeader.getMasterAddr());
                result.setHaServerAddr(responseHeader.getHaServerAddr());
                if(response.getBody() ! = null) { result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); }return result;
            }
            default:
                break; } throw new MQBrokerException(response.getCode(), response.getRemark()); }}Copy the code
  • The RequestCode for the topic registered with Namesrv is REGISTER_BROKER.
  • The broker initiates a topic registration to namesRV.


namesrv DefaultRequestProcessor

public class DefaultRequestProcessor implements NettyRequestProcessor {

    public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {

        final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
        final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
        final RegisterBrokerRequestHeader requestHeader =
            (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

        if(! checksum(ctx, request, requestHeader)) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("crc32 not match");
            return response;
        }

        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();

        if(request.getBody() ! = null) { try { registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed()); } catch (Exception e) { throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e); }}else{ registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0)); registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0); } // Namesrv registers both broker message and TopicConfig message RegisterBrokerResult Result = this.namesrvController.getRouteInfoManager().registerBroker( requestHeader.getClusterName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerName(), requestHeader.getBrokerId(), requestHeader.getHaServerAddr(), registerBrokerBody.getTopicConfigSerializeWrapper(), registerBrokerBody.getFilterServerList(), ctx.channel()); responseHeader.setHaServerAddr(result.getHaServerAddr()); responseHeader.setMasterAddr(result.getMasterAddr()); byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG); response.setBody(jsonValue); response.setCode(ResponseCode.SUCCESS); response.setRemark(null);returnresponse; }}Copy the code
  • GetRouteInfoManager ().registerBroker() is responsible for registering broker information with Namesrv.


namesrv RouteInfoManager

public class RouteInfoManager {

    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    public RouteInfoManager() {
        this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
        this.brokerAddrTable = new HashMap<String, BrokerData>(128);
        this.clusterAddrTable = new HashMap<String, Set<String>>(32);
        this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
        this.filterServerTable = new HashMap<String, List<String>>(256);
    }

    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();

        try {
            try {
                this.lock.writeLock().lockInterruptibly();

                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);

                boolean registerFirst = false;

                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
                //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
                //The same IP:PORT must only have one record in brokerAddrTable
                Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
                while (it.hasNext()) {
                    Entry<Long, String> item = it.next();
                    if(null ! = brokerAddr && brokerAddr.equals(item.getValue()) && brokerId ! = item.getKey()) { it.remove(); } } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); // Save topicConfig informationif(null ! = topicConfigWrapper && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if(tcTable ! = null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue());
                            }
                        }
                    }
                }

                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
                }

                if(filterServerList ! = null) {if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else{ this.filterServerTable.put(brokerAddr, filterServerList); }}if(MixAll.MASTER_ID ! = brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if(masterAddr ! = null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if(brokerLiveInfo ! = null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e);
        }

        returnresult; }}Copy the code
  • RouteInfoManager is the core data management center of NamesRV. When a broker reports a Topic, it updates the live information of the broker.
  • The core createAndUpdateQueueData is responsible for registering the topic configuration information.


namesrv createAndUpdateQueueData

public class RouteInfoManager {

    private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {

        QueueData queueData = new QueueData();
        queueData.setBrokerName(brokerName);
        queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
        queueData.setReadQueueNums(topicConfig.getReadQueueNums());
        queueData.setPerm(topicConfig.getPerm());
        queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());

        List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
        if (null == queueDataList) {
            queueDataList = new LinkedList<QueueData>();
            queueDataList.add(queueData);
            this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
            log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
        } else {
            boolean addNewOne = true;

            Iterator<QueueData> it = queueDataList.iterator();
            while (it.hasNext()) {
                QueueData qd = it.next();
                if (qd.getBrokerName().equals(brokerName)) {
                    if (qd.equals(queueData)) {
                        addNewOne = false;
                    } else {
                        log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd, queueData); it.remove(); }}}if(addNewOne) { queueDataList.add(queueData); }}}}Copy the code
  • Responsible for creating QueueData and saving it to topicQueueTable.
  • CreateAndUpdateQueueData handles all broker reports, so brokerName needs to be identified to handle the corresponding broker’s QueueData during QueueData processing.


Timed reporting BrokerController

public class BrokerController {

    public void start() throws Exception {

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true.false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
}
Copy the code
  • The broker periodically reports broker information to NamesRV and includes registration information for the topic.


Delete the Topic

The core logic of deleting a Topic is as follows

  • Mqadmin is responsible for notifying all brokers to delete the configuration and message files corresponding to the topic.
  • Mqadmin is responsible for notifying all NamesRV deletion topics corresponding to the configuration.


deleteTopic

usage: mqadmin deleteTopic -c <arg> [-h] [-n <arg>] -t <arg>
 -c,--clusterName <arg>   delete topic from which cluster
 -h,--help                Print help-n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876; 192.168.0.2:9876 -t,--topic <arg> topic nameCopy the code
  • –topic Specifies the topic information to be deleted.
  • –clusterName Specifies the cluster to which the Topic to be deleted belongs.


mqadmin DeleteTopicSubCommand

public class DeleteTopicSubCommand implements SubCommand {

    public static void deleteTopic(final DefaultMQAdminExt adminExt,
        final String clusterName,
        final String topic
    ) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {

        Set<String> brokerAddressSet = CommandUtil.fetchMasterAndSlaveAddrByClusterName(adminExt, clusterName);
        adminExt.deleteTopicInBroker(brokerAddressSet, topic);
        System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName);

        Set<String> nameServerSet = null;
        if(adminExt.getNamesrvAddr() ! = null) { String[] ns = adminExt.getNamesrvAddr().trim().split(";");
            nameServerSet = new HashSet(Arrays.asList(ns));
        }

        adminExt.deleteTopicInNameServer(nameServerSet, topic);
        System.out.printf("delete topic [%s] from NameServer success.%n", topic);
    }

    @Override
    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
        DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
        adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
        try {
            String topic = commandLine.getOptionValue('t').trim();

            if (commandLine.hasOption('c')) {
                String clusterName = commandLine.getOptionValue('c').trim();

                adminExt.start();
                deleteTopic(adminExt, clusterName, topic);
                return;
            }

            ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
        } catch (Exception e) {
            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally { adminExt.shutdown(); }}}Copy the code
  • DeleteTopicInBroker is responsible for Topic information on all brokers in the current cluster.
  • DeleteTopicInNameServer is responsible for deleting the topic information in namesRV


mqadmin DefaultMQAdminExtImpl

public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {

    public void deleteTopicInBroker(Set<String> addrs,
        String topic) throws RemotingException, MQBrokerException, InterruptedException,
        MQClientException {
        for(String addr : addrs) { this.mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, timeoutMillis); }}}Copy the code
  • Execute the deleteTopicInBroker operation in turn through all brokers.


mqadmin MQClientAPIImpl

public class MQClientAPIImpl { public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader(); requestHeader.setTopic(topic); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response ! = null; switch (response.getCode()) {case ResponseCode.SUCCESS: {
                return;
            }
            default:
                break; } throw new MQClientException(response.getCode(), response.getRemark()); }}Copy the code
  • The RequestCode for deleteTopicInBroker is DELETE_TOPIC_IN_BROKER.


broker AdminBrokerProcessor

public class AdminBrokerProcessor implements NettyRequestProcessor {

    private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        DeleteTopicRequestHeader requestHeader =
            (DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);

        log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); / / remove this configuration information. BrokerController. GetTopicConfigManager () deleteTopicConfig (requestHeader. GetTopic ()); / / remove this message storage file. The brokerController. GetMessageStore () .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null);returnresponse; } } public class TopicConfigManager extends ConfigManager { public void deleteTopicConfig(final String topic) { // Delete the topic configuration information from topicConfigTable TopicConfig old = this. TopicConfigTable. Remove (topic);if(old ! = null) { log.info("delete topic config OK, topic: {}", old);
            this.dataVersion.nextVersion();
            this.persist();
        } else {
            log.warn("delete topic config failed, topic: {} not exists", topic); }}Copy the code
  • The deleteTopic operation involves deleting the configuration information for the topic on the broker and the corresponding message store file for the topic.


mqadmin DefaultMQAdminExtImpl

public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {

    @Override
    public void deleteTopicInNameServer(Set<String> addrs,
        String topic) throws RemotingException, MQBrokerException, InterruptedException,
        MQClientException {
        if (addrs == null) {
            String ns = this.mqClientInstance.getMQClientAPIImpl().fetchNameServerAddr();
            addrs = new HashSet(Arrays.asList(ns.split(";")));
        }
        for(String addr : addrs) { this.mqClientInstance.getMQClientAPIImpl().deleteTopicInNameServer(addr, topic, timeoutMillis); } } } public class MQClientAPIImpl { public void deleteTopicInNameServer(final String addr, final String topic, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader(); requestHeader.setTopic(topic); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response ! = null; switch (response.getCode()) {case ResponseCode.SUCCESS: {
                return;
            }
            default:
                break; } throw new MQClientException(response.getCode(), response.getRemark()); }}Copy the code
  • Iterate over all namesRVs and perform topic deletion in turn.
  • RequestCode DELETE_TOPIC_IN_NAMESRV.


namesrv DefaultRequestProcessor

public class DefaultRequestProcessor implements NettyRequestProcessor {

    private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final DeleteTopicInNamesrvRequestHeader requestHeader =
            (DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);

        this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic());

        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
}


public class RouteInfoManager {

    public void deleteTopic(final String topic) {
        try {
            try {
                this.lock.writeLock().lockInterruptibly();
                this.topicQueueTable.remove(topic);
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("deleteTopic Exception", e); }}}Copy the code
  • The topic deletion of Namesrv is responsible for removing the corresponding QueueData from RouteInfoManager.