sequence

This article focuses on storm reportError

IErrorReporter

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/task/IErrorReporter Java

public interface IErrorReporter {
    void reportError(Throwable error);
}
Copy the code
  • The ISpoutOutputCollector, IOutputCollector, and IBasicOutputCollector interfaces inherit the IErrorReporter interface

ISpoutOutputCollector

Storm – core / 1.2.2 / storm – core – 1.2.2 – sources. The jar! /org/apache/storm/spout/ISpoutOutputCollector.java

public interface ISpoutOutputCollector extends IErrorReporter{
    /**
        Returns the task ids that received the tuples.
    */
    List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
    void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
    long getPendingCount();
}
Copy the code
  • ISpoutOutputCollector implements SpoutOutputCollector, SpoutOutputCollectorImpl, etc

IOutputCollector

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/task/IOutputCollector Java

public interface IOutputCollector extends IErrorReporter {
    /**
     * Returns the task ids that received the tuples.
     */
    List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple);

    void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);

    void ack(Tuple input);

    void fail(Tuple input);

    void resetTimeout(Tuple input);

    void flush();
}
Copy the code
  • The implementation classes of IOutputCollector include OutputCollector, BoltOutputCollectorImpl, etc

IBasicOutputCollector

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/topology/IBasicOutputCollector Java

public interface IBasicOutputCollector extends IErrorReporter { List<Integer> emit(String streamId, List<Object> tuple);  void emitDirect(int taskId, String streamId, List<Object> tuple); void resetTimeout(Tuple tuple); }Copy the code
  • The IBasicOutputCollector implementation class has BasicOutputCollector

reportError

SpoutOutputCollectorImpl.reportError

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/executor/spout/SpoutOutputCollectorImpl. Java

    @Override
    public void reportError(Throwable error) {
        executor.getErrorReportingMetrics().incrReportedErrorCount();
        executor.getReportError().report(error);
    }
Copy the code

BoltOutputCollectorImpl.reportError

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/executor/bolt/BoltOutputCollectorImpl. Java

    @Override
    public void reportError(Throwable error) {
        executor.getErrorReportingMetrics().incrReportedErrorCount();
        executor.getReportError().report(error);
    }
Copy the code

SpoutOutputCollectorImpl and BoltOutputCollectorImpl, reportError, both call executor.getrePorterror ().report(error);

ReportError.report

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/executor/error/ReportError Java

public class ReportError implements IReportError {

    private static final Logger LOG = LoggerFactory.getLogger(ReportError.class);

    private final Map<String, Object> topoConf;
    private final IStormClusterState stormClusterState;
    private final String stormId;
    private final String componentId;
    private final WorkerTopologyContext workerTopologyContext;

    private int maxPerInterval;
    private int errorIntervalSecs;
    private AtomicInteger intervalStartTime;
    private AtomicInteger intervalErrors;

    public ReportError(Map<String, Object> topoConf, IStormClusterState stormClusterState, String stormId, String componentId,
                       WorkerTopologyContext workerTopologyContext) {
        this.topoConf = topoConf;
        this.stormClusterState = stormClusterState;
        this.stormId = stormId;
        this.componentId = componentId;
        this.workerTopologyContext = workerTopologyContext;
        this.errorIntervalSecs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS));
        this.maxPerInterval = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL));
        this.intervalStartTime = new AtomicInteger(Time.currentTimeSecs());
        this.intervalErrors = new AtomicInteger(0);
    }

    @Override
    public void report(Throwable error) {
        LOG.error("Error", error);
        if (Time.deltaSecs(intervalStartTime.get()) > errorIntervalSecs) {
            intervalErrors.set(0);
            intervalStartTime.set(Time.currentTimeSecs());
        }
        if(intervalErrors.incrementAndGet() <= maxPerInterval) { try { stormClusterState.reportError(stormId, componentId, Utils.hostname(), workerTopologyContext.getThisWorkerPort().longValue(), error); } catch (UnknownHostException e) { throw Utils.wrapInRuntime(e); }}}}Copy the code
  • Can see here to determine whether the interval first need to reset, and then determine whether the error than the interval of the largest number, more than words, call the stormClusterState. ReportError written to storage, such as zk

StormClusterStateImpl.reportError

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/cluster/StormClusterStateImpl Java

    @Override
    public void reportError(String stormId, String componentId, String node, Long port, Throwable error) {
        String path = ClusterUtils.errorPath(stormId, componentId);
        String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
        ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.stringifyError(error), Time.currentTimeSecs());
        errorInfo.set_host(node);
        errorInfo.set_port(port.intValue());
        byte[] serData = Utils.serialize(errorInfo);
        stateStorage.mkdirs(path, defaultAcls);
        stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, defaultAcls);
        stateStorage.set_data(lastErrorPath, serData, defaultAcls);
        List<String> childrens = stateStorage.get_children(path, false);

        Collections.sort(childrens, new Comparator<String>() {
            public int compare(String arg0, String arg1) {
                returnLong.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1))); }});while (childrens.size() > 10) {
            String znodePath = path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0);
            try {
                stateStorage.delete_node(znodePath);
            } catch (Exception e) {
                if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
                    // if the node is already deleted, do nothing
                    LOG.warn("Could not find the znode: {}", znodePath);
                } else{ throw e; }}}}Copy the code
  • Here use ClusterUtils. ErrorPath (stormId componentId) write directory, again through the ClusterUtils. LastErrorPath (stormId componentId) access to the path of the writing
  • As ZK is not suitable for storing large amounts of data, if Childrens exceeds 10, the redundant nodes will be deleted in ascending order based on substring(1)

ClusterUtils.errorPath

Storm – 2.0.0 / storm/org – client/SRC/JVM/apache/storm/cluster/ClusterUtils Java

    public static final String ZK_SEPERATOR = "/";

    public static final String ERRORS_ROOT = "errors";

    public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT;

    public static String errorPath(String stormId, String componentId) {
        try {
            return errorStormRoot(stormId) + ZK_SEPERATOR + URLEncoder.encode(componentId, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public static String lastErrorPath(String stormId, String componentId) {
        return errorPath(stormId, componentId) + "-last-error";
    }

    public static String errorStormRoot(String stormId) {
        return ERRORS_SUBTREE + ZK_SEPERATOR + stormId;
    }
Copy the code
  • The path of errorPath is /errors/{stormId}/{componentId}. In this directory, the EPHEMERAL_SEQUENTIAL node starting with E is created. The error message is appended to this directory first, and then the old node is deleted if there are more than 10
  • The path of lastErrorPath is /errors/{stormId}/{componentId}-last-error, which stores the last error of this componentId

ZkCli view

[zk: localhost:2181(CONNECTED) 21] ls /storm/errors
[DRPCStateQuery-1-1540185943, reportErrorDemo-1-1540260375]
[zk: localhost:2181(CONNECTED) 22] ls /storm/errors/reportErrorDemo-1-1540260375
[print.print-last-error]
[zk: localhost:2181(CONNECTED) 23] ls /storm/errors/reportErrorDemo-1-1540260375/print
[e0000000291, e0000000290, e0000000295, e0000000294, e0000000293, e0000000292, e0000000299, e0000000298, e0000000297, e0000000296]
[zk: localhost:2181(CONNECTED) 24] ls /storm/errors/reportErrorDemo-1-1540260375/print/e0000000299
[]
[zk: localhost:2181(CONNECTED) 25] ls /storm/errors/reportErrorDemo-1-1540260375/print-last-error
[]
Copy the code

storm-ui

The curl -i http://192.168.99.100:8080/api/v1/topology/reportErrorDemo-1-1540260375? sys=false

Copy the code
  • Storm-ui requests the interface above to retrieve the topology data, where spout or Bolt contains lastError, showing the latest error message

StormApiResource

Storm – 2.0.0 / storm – webapp/SRC/main/Java/org/apache/storm/daemon/UI/resources/StormApiResource. Java

    /**
     * /api/v1/topology -> topo.
     */
    @GET
    @Path("/topology/{id}")
    @AuthNimbusOp(value = "getTopology", needsTopoId = true)
    @Produces("application/json")
    public Response getTopology(@PathParam("id") String id,
                                @DefaultValue(":all-time") @QueryParam("window") String window,
                                @QueryParam("sys") boolean sys,
                                @QueryParam(callbackParameterName) String callback) throws TException {
        topologyPageRequestMeter.mark();
        try (NimbusClient nimbusClient = NimbusClient.getConfiguredClient(config)) {
            returnUIHelpers.makeStandardResponse( UIHelpers.getTopologySummary( nimbusClient.getClient().getTopologyPageInfo(id, window, sys), window, config, servletRequest.getRemoteUser() ), callback ); }}Copy the code
  • The nimbusClient.getClient().gettopologypageInfo (ID, window, sys) method is called

Nimbus.getTopologyPageInfo

Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/daemon/nimbus nimbus. Java

    @Override
    public TopologyPageInfo getTopologyPageInfo(String topoId, String window, boolean includeSys)
        throws NotAliveException, AuthorizationException, TException {
        try {
            getTopologyPageInfoCalls.mark();
            CommonTopoInfo common = getCommonTopoInfo(topoId, "getTopologyPageInfo");
            String topoName = common.topoName;
            IStormClusterState state = stormClusterState;
            int launchTimeSecs = common.launchTimeSecs;
            Assignment assignment = common.assignment;
            Map<List<Integer>, Map<String, Object>> beats = common.beats;
            Map<Integer, String> taskToComp = common.taskToComponent;
            StormTopology topology = common.topology;
            Map<String, Object> topoConf = Utils.merge(conf, common.topoConf);
            StormBase base = common.base;
            if (base == null) {
                throw new WrappedNotAliveException(topoId);
            }
            Map<WorkerSlot, WorkerResources> workerToResources = getWorkerResourcesForTopology(topoId);
            List<WorkerSummary> workerSummaries = null;
            Map<List<Long>, List<Object>> exec2NodePort = new HashMap<>();
            if(assignment ! = null) { Map<List<Long>, NodeInfo>execToNodeInfo = assignment.get_executor_node_port();
                Map<String, String> nodeToHost = assignment.get_node_host();
                for (Entry<List<Long>, NodeInfo> entry : execToNodeInfo.entrySet()) {
                    NodeInfo ni = entry.getValue();
                    List<Object> nodePort = Arrays.asList(ni.get_node(), ni.get_port_iterator().next());
                    exec2NodePort.put(entry.getKey(), nodePort);
                }

                workerSummaries = StatsUtil.aggWorkerStats(topoId,
                                                           topoName,
                                                           taskToComp,
                                                           beats,
                                                           exec2NodePort,
                                                           nodeToHost,
                                                           workerToResources,
                                                           includeSys,
                                                           true); //this is the topology page, so we know the user is authorized
            }

            TopologyPageInfo topoPageInfo = StatsUtil.aggTopoExecsStats(topoId,
                                                                        exec2NodePort, taskToComp, beats, topology, window, includeSys, state); / /...return topoPageInfo;
        } catch (Exception e) {
            LOG.warn("Get topo page info exception. (topology id='{}')", topoId, e);
            if(e instanceof TException) { throw (TException) e; } throw new RuntimeException(e); }}Copy the code
  • Here call StatsUtil. To obtain TopologyPageInfo aggTopoExecsStats

StatsUtil.aggTopoExecsStats

Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/stats/StatsUtil Java

    /**
     * aggregate topo executors stats.
     *
     * @param topologyId     topology id
     * @param exec2nodePort  executor -> host+port
     * @param task2component task -> component
     * @param beats          executor[start, end] -> executor heartbeat
     * @param topology       storm topology
     * @param window         the window to be aggregated
     * @param includeSys     whether to include system streams
     * @param clusterState   cluster state
     * @return TopologyPageInfo thrift structure
     */
    public static TopologyPageInfo aggTopoExecsStats(
        String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>, Map<String, Object>> beats,
        StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) {
        List<Map<String, Object>> beatList = extractDataFromHb(exec2nodePort, task2component, beats, includeSys, topology);
        Map<String, Object> topoStats = aggregateTopoStats(window, includeSys, beatList);
        return postAggregateTopoStats(task2component, exec2nodePort, topoStats, topologyId, clusterState);
    }
Copy the code
  • StatsUtil aggTopoExecsStats method finally call postAggregateTopoStats

StatsUtil.postAggregateTopoStats

Storm – 2.0.0 / storm – server/SRC/main/Java/org/apache/storm/stats/StatsUtil Java

    private static TopologyPageInfo postAggregateTopoStats(Map task2comp, Map exec2nodePort, Map<String, Object> accData,
                                                          String topologyId, IStormClusterState clusterState) {
        TopologyPageInfo ret = new TopologyPageInfo(topologyId);

        ret.set_num_tasks(task2comp.size());
        ret.set_num_workers(((Set) accData.get(WORKERS_SET)).size());
        ret.set_num_executors(exec2nodePort ! = null ?exec2nodePort.size() : 0);

        Map bolt2stats = ClientStatsUtil.getMapByKey(accData, BOLT_TO_STATS);
        Map<String, ComponentAggregateStats> aggBolt2stats = new HashMap<>();
        for (Object o : bolt2stats.entrySet()) {
            Map.Entry e = (Map.Entry) o;
            Map m = (Map) e.getValue();
            long executed = getByKeyOr0(m, EXECUTED).longValue();
            if (executed > 0) {
                double execLatencyTotal = getByKeyOr0(m, EXEC_LAT_TOTAL).doubleValue();
                m.put(EXEC_LATENCY, execLatencyTotal / executed);

                double procLatencyTotal = getByKeyOr0(m, PROC_LAT_TOTAL).doubleValue();
                m.put(PROC_LATENCY, procLatencyTotal / executed);
            }
            m.remove(EXEC_LAT_TOTAL);
            m.remove(PROC_LAT_TOTAL);
            String id = (String) e.getKey();
            m.put("last-error", getLastError(clusterState, topologyId, id)); aggBolt2stats.put(id, thriftifyBoltAggStats(m)); } / /...return ret;
    }

    private static ErrorInfo getLastError(IStormClusterState stormClusterState, String stormId, String compId) {
        return stormClusterState.lastError(stormId, compId);
    }
Copy the code
  • Here we add last-error, called by getLastError, and then converted to the thrift object by thriftifyBoltAggStats
  • Here call stormClusterState. LastError (stormId, compId) for the last error

UIHelpers.getTopologySummary

Storm – 2.0.0 / storm – webapp/SRC/main/Java/org daemon/UI / / apache/storm/UIHelpers. Java

    /**
     * getTopologySummary.
     * @param topologyPageInfo topologyPageInfo
     * @param window window
     * @param config config
     * @param remoteUser remoteUser
     * @return getTopologySummary
     */
    public static Map<String, Object> getTopologySummary(TopologyPageInfo topologyPageInfo,
                                                         String window, Map<String, Object> config, String remoteUser) {
        Map<String, Object> result = new HashMap();
        Map<String, Object> topologyConf = (Map<String, Object>) JSONValue.parse(topologyPageInfo.get_topology_conf());
        long messageTimeout = (long) topologyConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
        Map<String, Object> unpackedTopologyPageInfo =
                unpackTopologyInfo(topologyPageInfo, window, config);
        result.putAll(unpackedTopologyPageInfo);
        result.put("user", remoteUser);
        result.put("window", window);
        result.put("windowHint", getWindowHint(window));
        result.put("msgTimeout", messageTimeout);
        result.put("configuration", topologyConf);
        result.put("visualizationTable", new ArrayList());
        result.put("schedulerDisplayResource", config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));
        return result;
    }
Copy the code
  • After get TopologyPageInfo UIHelpers. GetTopologySummary unpackTopologyInfo on it

UIHelpers.unpackTopologyInfo

Storm – 2.0.0 / storm – webapp/SRC/main/Java/org daemon/UI / / apache/storm/UIHelpers. Java

    /**
     * unpackTopologyInfo.
     * @param topologyPageInfo topologyPageInfo
     * @param window window
     * @param config config
     * @return unpackTopologyInfo
     */
    private static Map<String,Object> unpackTopologyInfo(TopologyPageInfo topologyPageInfo, String window, Map<String,Object> config) {
        Map<String, Object> result = new HashMap();
        result.put("id", topologyPageInfo.get_id()); / /... Map<String, ComponentAggregateStats> spouts = topologyPageInfo.get_id_to_spout_agg_stats(); List<Map> spoutStats = new ArrayList();for (Map.Entry<String, ComponentAggregateStats> spoutEntry : spouts.entrySet()) {
            spoutStats.add(getTopologySpoutAggStatsMap(spoutEntry.getValue(), spoutEntry.getKey()));
        }
        result.put("spouts", spoutStats);

        Map<String, ComponentAggregateStats> bolts = topologyPageInfo.get_id_to_bolt_agg_stats();
        List<Map> boltStats = new ArrayList();

        for (Map.Entry<String, ComponentAggregateStats> boltEntry : bolts.entrySet()) {
            boltStats.add(getTopologyBoltAggStatsMap(boltEntry.getValue(), boltEntry.getKey()));
        }
        result.put("bolts", boltStats); / /... result.put("samplingPct", samplingPct);
        result.put("replicationCount", topologyPageInfo.get_replication_count());
        result.put("topologyVersion", topologyPageInfo.get_topology_version());
        result.put("stormVersion", topologyPageInfo.get_storm_version());
        return result;
    }

    /**
     * getTopologySpoutAggStatsMap.
     * @param componentAggregateStats componentAggregateStats
     * @param spoutId spoutId
     * @return getTopologySpoutAggStatsMap
     */
    private static Map<String, Object> getTopologySpoutAggStatsMap(ComponentAggregateStats componentAggregateStats,
                                                                   String spoutId) {
        Map<String, Object> result = new HashMap();
        CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
        result.putAll(getCommonAggStatsMap(commonStats));
        result.put("spoutId", spoutId);
        result.put("encodedSpoutId", URLEncoder.encode(spoutId));
        SpoutAggregateStats spoutAggregateStats = componentAggregateStats.get_specific_stats().get_spout();
        result.put("completeLatency", spoutAggregateStats.get_complete_latency_ms());
        ErrorInfo lastError = componentAggregateStats.get_last_error();
        result.put("lastError", Objects.isNull(lastError) ?  "" : getTruncatedErrorString(lastError.get_error()));
        return result;
    }

    /**
     * getTopologyBoltAggStatsMap.
     * @param componentAggregateStats componentAggregateStats
     * @param boltId boltId
     * @return getTopologyBoltAggStatsMap
     */
    private static Map<String, Object> getTopologyBoltAggStatsMap(ComponentAggregateStats componentAggregateStats,
                                                                  String boltId) {
        Map<String, Object> result = new HashMap();
        CommonAggregateStats commonStats = componentAggregateStats.get_common_stats();
        result.putAll(getCommonAggStatsMap(commonStats));
        result.put("boltId", boltId);
        result.put("encodedBoltId", URLEncoder.encode(boltId));
        BoltAggregateStats boltAggregateStats = componentAggregateStats.get_specific_stats().get_bolt();
        result.put("capacity", StatsUtil.floatStr(boltAggregateStats.get_capacity()));
        result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms()));
        result.put("executed", boltAggregateStats.get_executed());
        result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms()));
        ErrorInfo lastError = componentAggregateStats.get_last_error();
        result.put("lastError", Objects.isNull(lastError) ?  "" : getTruncatedErrorString(lastError.get_error()));
        return result;
    }

    /**
     * getTruncatedErrorString.
     * @param errorString errorString
     * @return getTruncatedErrorString
     */
    private static String getTruncatedErrorString(String errorString) {
        return errorString.substring(0, Math.min(errorString.length(), 200));
    }
Copy the code
  • Note here to spout calls the getTopologySpoutAggStatsMap, to call the getTopologyBoltAggStatsMap bolt
  • Both methods perform a getTruncatedErrorString on lastError, up to substring(0,200)

crash log

2018-10-23 02:53:28.118 O.A.S.U til thread-10-print-executor [7 7] [ERROR] Async loop died! java.lang.RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at Org. Apache. Storm. The utils. DisruptorQueue. ConsumeBatchToCursor (DisruptorQueue. Java: 522) ~ [storm - core - 1.2.2. Jar: 1.2.2] the at Org. Apache. Storm. The utils. DisruptorQueue. ConsumeBatchWhenAvailable (DisruptorQueue. Java: 487) ~ [storm - core - 1.2.2. Jar: 1.2.2] at org.apache.storm.disruptor$consume_batch_when_availableInvoke (disruptor. CLJ: 74) ~ [storm - core - 1.2.2. Jar: 1.2.2] at org. Apache. Storm. The daemon. The executor$fn__10795$fn__10808$fn__10861Invoke (executor. CLJ: 861) ~ [storm - core - 1.2.2. Jar: 1.2.2] at org. Apache. Storm. The util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2] at Clojure.lang.afn.run (afn.java :22) [Clojure-1.7.0.jar :?] at invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2] at Clojure.lang Java. Lang. Thread. The run (Thread. Java: 748) [? : 1.8.0 comes with _171] under Caused by: Java. Lang. ClassCastException: java.lang.String cannot be cast to java.lang.Integer at org.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116) ~ [storm - core - 1.2.2. Jar: 1.2.2] at the example. The demo. Error. ErrorPrintBolt. Execute (ErrorPrintBolt. Java: 26) ~ / stormjar. Jar:? The at org. Apache. Storm. The topology. BasicBoltExecutor. Execute (BasicBoltExecutor. Java: 50) ~ [storm - core - 1.2.2. Jar: 1.2.2] the at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797Invoke (executor. CLJ: 739) ~ [storm - core - 1.2.2. Jar: 1.2.2] at org. Apache. Storm. The daemon. The executor$mk_task_receiver$fn__10716Invoke (executor. CLJ: 468) ~ [storm - core - 1.2.2. Jar: 1.2.2] at org. Apache. Storm. The disruptor$clojure_handler$reify__10135OnEvent (disruptor. CLJ: 41) ~ [storm - core - 1.2.2. Jar: 1.2.2] the at Org. Apache. Storm. The utils. DisruptorQueue. ConsumeBatchToCursor (DisruptorQueue. Java: 509) ~ [storm - core - 1.2.2. Jar: 1.2.2]... Six more 02:53:28 2018-10-23. 129 O.A.S.D.E xecutor Thread - 10 - print - executor 7 [7] [ERROR] Java. Lang. RuntimeException: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at Org. Apache. Storm. The utils. DisruptorQueue. ConsumeBatchToCursor (DisruptorQueue. Java: 522) ~ [storm - core - 1.2.2. Jar: 1.2.2] the at Org. Apache. Storm. The utils. DisruptorQueue. ConsumeBatchWhenAvailable (DisruptorQueue. Java: 487) ~ [storm - core - 1.2.2. Jar: 1.2.2] at org.apache.storm.disruptor$consume_batch_when_availableInvoke (disruptor. CLJ: 74) ~ [storm - core - 1.2.2. Jar: 1.2.2] at org. Apache. Storm. The daemon. The executor$fn__10795$fn__10808$fn__10861Invoke (executor. CLJ: 861) ~ [storm - core - 1.2.2. Jar: 1.2.2] at org. Apache. Storm. The util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2] at Clojure.lang.afn.run (afn.java :22) [Clojure-1.7.0.jar :?] at invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2] at Clojure.lang Java. Lang. Thread. The run (Thread. Java: 748) [? : 1.8.0 comes with _171] under Caused by: Java. Lang. ClassCastException: java.lang.String cannot be cast to java.lang.Integer at org.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116) ~ [storm - core - 1.2.2. Jar: 1.2.2] at the example. The demo. Error. ErrorPrintBolt. Execute (ErrorPrintBolt. Java: 26) ~ / stormjar. Jar:? The at org. Apache. Storm. The topology. BasicBoltExecutor. Execute (BasicBoltExecutor. Java: 50) ~ [storm - core - 1.2.2. Jar: 1.2.2] the at org.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797Invoke (executor. CLJ: 739) ~ [storm - core - 1.2.2. Jar: 1.2.2] at org. Apache. Storm. The daemon. The executor$mk_task_receiver$fn__10716Invoke (executor. CLJ: 468) ~ [storm - core - 1.2.2. Jar: 1.2.2] at org. Apache. Storm. The disruptor$clojure_handler$reify__10135OnEvent (disruptor. CLJ: 41) ~ [storm - core - 1.2.2. Jar: 1.2.2] the at Org. Apache. Storm. The utils. DisruptorQueue. ConsumeBatchToCursor (DisruptorQueue. Java: 509) ~ [storm - core - 1.2.2. Jar: 1.2.2]... 6 MORE 2018-10-23 02:53:28.175 O.A.S.U TIL Thread-10-print-Executor [7 7] [ERROR] Halting Process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_DoInvoke (util. CLJ: 341) [storm - core - 1.2.2. Jar: 1.2.2] at clojure. Lang. RestFn. Invoke (RestFn. Java: 423) [clojure - 1.7.0. Jar:?]  at org.apache.storm.daemon.worker$fn__11404$fn__11405Invoke (worker. CLJ: 792) [storm - core - 1.2.2. Jar: 1.2.2] at org. Apache. Storm. The daemon. The executor$mk_executor_data$fn__10612$fn__10613Invoke (executor. CLJ: 281) [storm - core - 1.2.2. Jar: 1.2.2] at org. Apache. Storm. The util$async_loop$fn__553.invoke(util.clj:494) [storm-core-1.2.2.jar:1.2.2] at Clojure.lang.afn.run (afn.java :22) [Clojure-1.7.0.jar :?] at invoke(util.clj:494) [storm-core-1.2.2.jar:1.2.2] at Clojure-1.7.0.jar: Java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] 2018-10-23 02:53:28.176 O.A.S.D. Walker Thread-41 [INFO] Shutting you down Down Worker ReporTerrorDemo-2-1540263136 f9856902-cfe9-45c7-b675-93a29d3D3D36 6700 2018-10-23 02:53:28.177 O.A.S.D. walker thread-41 [INFO] Messaging Context 2018-10-23 02:53:28.177 [INFO] Shutting Down Executors or Shutting down Celebuters-41 [INFO] Shutting down Exector Spout :[8 8] 2018-10-23 02:53:28.182 O.A.S.U til thread-3-disruptor-executor [8 8]- sate-queue [INFO] Async loop interrupted! 2018-10-23 02:53:28.186 O.A.S.U til Thread-4-spout-executor[8 8] [INFO] Async loop Interrupted! The 2018-10-23 02:53:28. 188 O.A.S.D.E xecutor Thread - 41 [INFO] Shut down executor spout: [8] 8 02:53:28 2018-10-23. 188 O.A.S.D. Xecutor Thread-41 [INFO] Shutting down Executor Spout :[12 12] 2018-10-23 02:53:28.189 O.A.S.Util Thread-5-disruptor-executor[12 12]-send-queue [INFO] Async loop interrupted! 2018-10-23 02:53:28.190 O.A.S.U til Thread-6-spout-executor[12 12] [INFO] Async loop Interrupted! 2018-10-23 02:53:28.190 O.A.S.D. executor Thread-41 [INFO] Shut down Executor spout:[12 12] 2018-10-23 02:53:28.190 O.A.S.D. Xecutor Thread-41 [INFO] Shutting down Executor Count :[2 2] 2018-10-23 02:53:28.191 O.A.S.Util Thread-7-disruptor-executor[2 2]-send-queue [INFO] Async loop interrupted! 2018-10-23 02:53:28.193 O.A.S.U til Thread-8-count executor[2 2] [INFO] Async loop Interrupted! 2018-10-23 02:53:28.194 O.A.S.D. executor Thread-41 [INFO] Shut down Executor count:[2 2] 2018-10-23 02:53:28.194 o.a.s.d.executor Thread-41 [INFO] Shutting down executorprint:[7 7] 2018-10-23 02:53:28.196 O.A.S.U til Thread-9-disruptor-executor[7 7]- ssend-queue [INFO] Async loop interrupted!Copy the code

summary

  • In spout or Bolt methods, if an exception is thrown, the whole worker will die, and the exception will be automatically recorded to ZK, but the cost is that the worker dies and is constantly restarted
  • ReportError can be combined with try catch to make the worker not die after exceptions and record the error information at the same time. However, the same component of a topology records only the last 10 exceptions, which are saved by the EPHEMERAL_SEQUENTIAL node and destroyed when the worker dies. LastError uses PERSISTENT nodes. In both cases the information is removed when the topology is killed.
  • Storm-ui displays lastError messages for each component, with error messages up to 200 in length

doc

  • What is the use of OutputCollector class’ reportError(Throwable) method?