Package org.apache.storm.stats
Class StatsUtil
java.lang.Object
org.apache.storm.stats.StatsUtil
- 
Field SummaryFields
- 
Constructor SummaryConstructors
- 
Method SummaryModifier and TypeMethodDescriptionaggregate windowed stats from a bolt executor stats with a Map of accumulated stats.aggBoltLatAndCount(Map<List<String>, Double> id2execAvg, Map<List<String>, Double> id2procAvg, Map<List<String>, Long> id2numExec) Aggregates number executed, process latency, and execute latency across all streams.aggBoltStreamsLatAndCount(Map<K, Double> id2execAvg, Map<K, Double> id2procAvg, Map<K, Long> id2numExec) aggregate number executed and process & execute latencies.static ComponentPageInfoaggCompExecsStats(Map exec2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, String window, boolean includeSys, String topologyId, StormTopology topology, String componentId) aggregate component executor stats.aggCompExecStats(String window, boolean includeSys, Map<String, Object> accStats, Map<String, Object> beat, String compType) Combines the aggregate stats of one executor with the given map, selecting the appropriate window and including system components as specified.aggPreMergeCompPageBolt(Map<String, Object> beat, String window, boolean includeSys) pre-merge component page bolt stats from an executor heartbeat 1.aggPreMergeCompPageSpout(Map<String, Object> beat, String window, boolean includeSys) pre-merge component page spout stats from an executor heartbeat 1.aggPreMergeTopoPageBolt(Map<String, Object> beat, String window, boolean includeSys) pre-merge component stats of specified bolt id.aggPreMergeTopoPageSpout(Map<String, Object> m, String window, boolean includeSys) pre-merge component stats of specified spout id and returns { comp id -> comp-stats }.compute an weighted average from a list of average maps and a corresponding count maps extracted from a list of ExecutorSummary.aggregate weighted average of all streams.aggregateBoltStats(List<ExecutorSummary> statsSeq, boolean includeSys) aggregate bolt stats.aggregateBoltStreams(Map<String, Map> stats) aggregate all bolt streams.aggregateCommonStats(List<ExecutorSummary> statsSeq) aggregate common stats from a spout/bolt, called in aggregateSpoutStats/aggregateBoltStats.aggregateCompStats(String window, boolean includeSys, List<Map<String, Object>> beats, String compType) Aggregate the stats for a component over a given window of time.aggregate a list of count maps into one map.aggregateCountStreams(Map<String, Map<K, V>> stats) aggregate count streams by window.aggregateSpoutStats(List<ExecutorSummary> statsSeq, boolean includeSys) aggregate spout stats.aggregateSpoutStreams(Map<String, Map> stats) aggregate all spout streams.aggregate windowed stats from a spout executor stats with a Map of accumulated stats.aggregate number acked and complete latencies across all streams.aggSpoutStreamsLatAndCount(Map<K, Double> id2compAvg, Map<K, Long> id2acked) Aggregates number acked and complete latencies.static TopologyPageInfoaggTopoExecsStats(String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) aggregate topo executors stats.aggTopoExecStats(String window, boolean includeSys, Map<String, Object> accStats, Map<String, Object> beat, String compType) A helper function that does the common work to aggregate stats of one executor with the given map for the topology page.static List<WorkerSummary>aggWorkerStats(String stormId, String stormName, Map<Integer, String> task2Component, Map<List<Integer>, Map<String, Object>> beats, Map<List<Long>, List<Object>> exec2NodePort, Map<String, String> nodeHost, Map<WorkerSlot, WorkerResources> worker2Resources, boolean includeSys, boolean userAuthorized, String filterSupervisor, String owner) aggregate statistics per worker for a topology.boltStreamsStats(List<ExecutorSummary> summs, boolean includeSys) aggregates bolt stream stats, returns a Map of {metric -> win -> aggregated value}.static StringcomponentType(StormTopology topology, String compId) Get the coponenet type for a give id.static doublecomputeBoltCapacity(List<ExecutorSummary> executorSumms) computes max bolt capacity.static doublecomputeExecutorCapacity(ExecutorSummary summary) Compute the capacity of a executor.convert thrift executor heartbeats into a java HashMap.static Map<List<Integer>,ExecutorStats> convert executors stats into a HashMap, note that ExecutorStats are remained unchanged.convert thrift ExecutorStats structure into a java HashMap.convertWorkerBeats(SupervisorWorkerHeartbeat workerHeartbeat) convertSupervisorWorkerHeartbeatto nimbus local report executor heartbeats.convert thrift ExecutorBeat into a java HashMap.convertZkWorkerHb(ClusterWorkerHeartbeat workerHb) convert a thrift worker heartbeat into a java HashMap.static StringerrorSubset(String errorStr) extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, boolean includeSys, StormTopology topology) extracts a list of executor data from heart beats.extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, boolean includeSys, StormTopology topology, String compId) extracts a list of executor data from heart beats.extractNodeInfosFromHbForComp(Map<List<? extends Number>, List<Object>> exec2hostPort, Map<Integer, String> task2component, boolean includeSys, String compId) extract a list of host port info for specified component.static StringConvert a float to a string for display.static List<ExecutorSummary>getFilledStats(List<ExecutorSummary> summs) filter ExecutorSummary whose stats is null.merge accumulated bolt stats with pre-merged component stats.merge accumulated bolt stats with pre-merged component stats.merge accumulated bolt stats with new bolt stats.merge accumulated bolt stats with new bolt stats.postAggregateCompStats(Map<String, Object> compStats) post aggregate component stats: 1.filter system streams of aggregated spout/bolt stats if necessary.spoutStreamsStats(List<ExecutorSummary> summs, boolean includeSys) aggregates spout stream stats, returns a Map of {metric -> win -> aggregated value}.static ExecutorStatsthriftifyExecutorStats(Map stats) Convert Executor stats to thrift data structure.static SupervisorWorkerHeartbeatthriftifyRpcWorkerHb(String stormId, List<Long> executorId) Used for local test.static <K> MapwindowSetConverter(Map stats, org.apache.storm.stats.ClientStatsUtil.KeyTransformer<K> firstKeyFunc) 
- 
Field Details- 
TYPE- See Also:
 
- 
TEN_MIN_IN_SECONDSpublic static final int TEN_MIN_IN_SECONDS- See Also:
 
- 
TEN_MIN_IN_SECONDS_STR- See Also:
 
 
- 
- 
Constructor Details- 
StatsUtilpublic StatsUtil()
 
- 
- 
Method Details- 
aggBoltLatAndCountpublic static Map<String,Number> aggBoltLatAndCount(Map<List<String>, Double> id2execAvg, Map<List<String>, Double> id2procAvg, Map<List<String>, Long> id2numExec) Aggregates number executed, process latency, and execute latency across all streams.- Parameters:
- id2execAvg- { global stream id -> exec avg value }, e.g., {["split" "default"] 0.44313}
- id2procAvg- { global stream id -> proc avg value }
- id2numExec- { global stream id -> executed }
 
- 
aggSpoutLatAndCountpublic static Map<String,Number> aggSpoutLatAndCount(Map<String, Double> id2compAvg, Map<String, Long> id2numAcked) aggregate number acked and complete latencies across all streams.
- 
aggBoltStreamsLatAndCountpublic static <K> Map<K,Map> aggBoltStreamsLatAndCount(Map<K, Double> id2execAvg, Map<K, Double> id2procAvg, Map<K, Long> id2numExec) aggregate number executed and process & execute latencies.
- 
aggSpoutStreamsLatAndCountpublic static <K> Map<K,Map> aggSpoutStreamsLatAndCount(Map<K, Double> id2compAvg, Map<K, Long> id2acked) Aggregates number acked and complete latencies.
- 
aggPreMergeCompPageBoltpublic static Map<String,Object> aggPreMergeCompPageBolt(Map<String, Object> beat, String window, boolean includeSys) pre-merge component page bolt stats from an executor heartbeat 1. computes component capacity 2. converts map keys of stats 3. filters streams if necessary- Parameters:
- beat- executor heartbeat data
- window- specified window
- includeSys- whether to include system streams
- Returns:
- per-merged stats
 
- 
aggPreMergeCompPageSpoutpublic static Map<String,Object> aggPreMergeCompPageSpout(Map<String, Object> beat, String window, boolean includeSys) pre-merge component page spout stats from an executor heartbeat 1. computes component capacity 2. converts map keys of stats 3. filters streams if necessary- Parameters:
- beat- executor heartbeat data
- window- specified window
- includeSys- whether to include system streams
- Returns:
- per-merged stats
 
- 
aggPreMergeTopoPageBoltpublic static <K,V extends Number> Map<String,Object> aggPreMergeTopoPageBolt(Map<String, Object> beat, String window, boolean includeSys) pre-merge component stats of specified bolt id.- Parameters:
- beat- executor heartbeat data
- window- specified window
- includeSys- whether to include system streams
- Returns:
- { comp id -> comp-stats }
 
- 
aggPreMergeTopoPageSpoutpublic static <K,V extends Number> Map<String,Object> aggPreMergeTopoPageSpout(Map<String, Object> m, String window, boolean includeSys) pre-merge component stats of specified spout id and returns { comp id -> comp-stats }.
- 
mergeAggCompStatsCompPageBoltpublic static Map<String,Object> mergeAggCompStatsCompPageBolt(Map<String, Object> accBoltStats, Map<String, Object> boltStats) merge accumulated bolt stats with pre-merged component stats.- Parameters:
- accBoltStats- accumulated bolt stats
- boltStats- pre-merged component stats
- Returns:
- merged stats
 
- 
mergeAggCompStatsCompPageSpoutpublic static Map<String,Object> mergeAggCompStatsCompPageSpout(Map<String, Object> accSpoutStats, Map<String, Object> spoutStats) merge accumulated bolt stats with pre-merged component stats.
- 
mergeAggCompStatsTopoPageBoltpublic static Map<String,Object> mergeAggCompStatsTopoPageBolt(Map<String, Object> accBoltStats, Map<String, Object> boltStats) merge accumulated bolt stats with new bolt stats.- Parameters:
- accBoltStats- accumulated bolt stats
- boltStats- new input bolt stats
- Returns:
- merged bolt stats
 
- 
mergeAggCompStatsTopoPageSpoutpublic static Map<String,Object> mergeAggCompStatsTopoPageSpout(Map<String, Object> accSpoutStats, Map<String, Object> spoutStats) merge accumulated bolt stats with new bolt stats.
- 
aggTopoExecStatspublic static Map<String,Object> aggTopoExecStats(String window, boolean includeSys, Map<String, Object> accStats, Map<String, Object> beat, String compType) A helper function that does the common work to aggregate stats of one executor with the given map for the topology page.
- 
aggTopoExecsStatspublic static TopologyPageInfo aggTopoExecsStats(String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) aggregate topo executors stats.- Parameters:
- topologyId- topology id
- exec2nodePort- executor -> host+port
- task2component- task -> component
- beats- executor[start, end] -> executor heartbeat
- topology- storm topology
- window- the window to be aggregated
- includeSys- whether to include system streams
- clusterState- cluster state
- Returns:
- TopologyPageInfo thrift structure
 
- 
aggregateBoltStatspublic static <T> Map<String,Map> aggregateBoltStats(List<ExecutorSummary> statsSeq, boolean includeSys) aggregate bolt stats.- Parameters:
- statsSeq- a seq of ExecutorStats
- includeSys- whether to include system streams
- Returns:
- aggregated bolt stats: {metric -> win -> global stream id -> value}
 
- 
aggregateSpoutStatspublic static Map<String,Map> aggregateSpoutStats(List<ExecutorSummary> statsSeq, boolean includeSys) aggregate spout stats.- Parameters:
- statsSeq- a seq of ExecutorStats
- includeSys- whether to include system streams
- Returns:
- aggregated spout stats: {metric -> win -> global stream id -> value}
 
- 
aggregateCommonStatspublic static <T> Map<String,Map<String, aggregateCommonStatsMap<T, Long>>> (List<ExecutorSummary> statsSeq) aggregate common stats from a spout/bolt, called in aggregateSpoutStats/aggregateBoltStats.
- 
preProcessStreamSummarypublic static <T> Map<String,Map<String, preProcessStreamSummaryMap<T, Long>>> (Map<String, Map<String, Map<T, Long>>> streamSummary, boolean includeSys) filter system streams of aggregated spout/bolt stats if necessary.
- 
aggregateCountStreamspublic static <K,V extends Number> Map<String,Long> aggregateCountStreams(Map<String, Map<K, V>> stats) aggregate count streams by window.- Parameters:
- stats- a Map of value: {win -> stream -> value}
- Returns:
- a Map of value: {win -> value}
 
- 
aggregateAveragespublic static <K> Map<String,Map<K, aggregateAveragesDouble>> (List<Map<String, Map<K, Double>>> avgSeq, List<Map<String, Map<K, Long>>> countSeq) compute an weighted average from a list of average maps and a corresponding count maps extracted from a list of ExecutorSummary.- Parameters:
- avgSeq- a list of {win -> global stream id -> avg value}
- countSeq- a list of {win -> global stream id -> count value}
- Returns:
- a Map of {win -> global stream id -> weighted avg value}
 
- 
aggregateAvgStreamspublic static <K> Map<String,Double> aggregateAvgStreams(Map<String, Map<K, Double>> avgs, Map<String, Map<K, Long>> counts) aggregate weighted average of all streams.- Parameters:
- avgs- a Map of {win -> stream -> average value}
- counts- a Map of {win -> stream -> count value}
- Returns:
- a Map of {win -> aggregated value}
 
- 
spoutStreamsStatsaggregates spout stream stats, returns a Map of {metric -> win -> aggregated value}.
- 
boltStreamsStatsaggregates bolt stream stats, returns a Map of {metric -> win -> aggregated value}.
- 
aggregateSpoutStreamsaggregate all spout streams.- Parameters:
- stats- a Map of {metric -> win -> stream id -> value}
- Returns:
- a Map of {metric -> win -> aggregated value}
 
- 
aggregateBoltStreamsaggregate all bolt streams.- Parameters:
- stats- a Map of {metric -> win -> stream id -> value}
- Returns:
- a Map of {metric -> win -> aggregated value}
 
- 
aggBoltExecWinStatspublic static Map<String,Object> aggBoltExecWinStats(Map<String, Object> accStats, Map<String, Object> newStats, boolean includeSys) aggregate windowed stats from a bolt executor stats with a Map of accumulated stats.
- 
aggSpoutExecWinStatspublic static Map<String,Object> aggSpoutExecWinStats(Map<String, Object> accStats, Map<String, Object> beat, boolean includeSys) aggregate windowed stats from a spout executor stats with a Map of accumulated stats.
- 
aggregateCountsaggregate a list of count maps into one map.- Parameters:
- countsSeq- a seq of {win -> GlobalStreamId -> value}
 
- 
aggregateCompStatspublic static Map<String,Object> aggregateCompStats(String window, boolean includeSys, List<Map<String, Object>> beats, String compType) Aggregate the stats for a component over a given window of time.
- 
aggCompExecStatspublic static Map<String,Object> aggCompExecStats(String window, boolean includeSys, Map<String, Object> accStats, Map<String, Object> beat, String compType) Combines the aggregate stats of one executor with the given map, selecting the appropriate window and including system components as specified.
- 
postAggregateCompStatspost aggregate component stats: 1. computes execute-latency/process-latency from execute/process latency total 2. computes windowed weight avgs 3. transform Map keys- Parameters:
- compStats- accumulated comp stats
 
- 
aggCompExecsStatspublic static ComponentPageInfo aggCompExecsStats(Map exec2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, String window, boolean includeSys, String topologyId, StormTopology topology, String componentId) aggregate component executor stats.- Parameters:
- exec2hostPort- a Map of {executor -> host+port}
- task2component- a Map of {task id -> component}
- beats- a converted HashMap of executor heartbeats, {executor -> heartbeat}
- window- specified window
- includeSys- whether to include system streams
- topologyId- topology id
- topology- storm topology
- componentId- component id
- Returns:
- ComponentPageInfo thrift structure
 
- 
aggWorkerStatspublic static List<WorkerSummary> aggWorkerStats(String stormId, String stormName, Map<Integer, String> task2Component, Map<List<Integer>, Map<String, Object>> beats, Map<List<Long>, List<Object>> exec2NodePort, Map<String, String> nodeHost, Map<WorkerSlot, WorkerResources> worker2Resources, boolean includeSys, boolean userAuthorized, String filterSupervisor, String owner) aggregate statistics per worker for a topology. Optionally filtering on specific supervisors- Parameters:
- stormId- topology id
- stormName- storm topology
- task2Component- a Map of {task id -> component}
- beats- a converted HashMap of executor heartbeats, {executor -> heartbeat}
- exec2NodePort- a Map of {executor -> host+port}
- includeSys- whether to include system streams
- userAuthorized- whether the user is authorized to view topology info
- filterSupervisor- if not null, only return WorkerSummaries for that supervisor
- owner- owner of the topology
 
- 
convertExecutorBeatspublic static Map<List<Integer>,Map<String, convertExecutorBeatsObject>> (Map<ExecutorInfo, ExecutorBeat> beats) convert thrift executor heartbeats into a java HashMap.
- 
convertWorkerBeatspublic static Map<List<Integer>,Map<String, convertWorkerBeatsObject>> (SupervisorWorkerHeartbeat workerHeartbeat) convertSupervisorWorkerHeartbeatto nimbus local report executor heartbeats.
- 
convertZkExecutorHbconvert thrift ExecutorBeat into a java HashMap.
- 
convertZkWorkerHbconvert a thrift worker heartbeat into a java HashMap.
- 
convertExecutorsStatspublic static Map<List<Integer>,ExecutorStats> convertExecutorsStats(Map<ExecutorInfo, ExecutorStats> stats) convert executors stats into a HashMap, note that ExecutorStats are remained unchanged.
- 
convertExecutorStatsconvert thrift ExecutorStats structure into a java HashMap.
- 
extractNodeInfosFromHbForComppublic static List<Map<String,Object>> extractNodeInfosFromHbForComp(Map<List<? extends Number>, List<Object>> exec2hostPort, Map<Integer, String> task2component, boolean includeSys, String compId) extract a list of host port info for specified component.- Parameters:
- exec2hostPort- {executor -> host+port}
- task2component- {task id -> component}
- includeSys- whether to include system streams
- compId- component id
- Returns:
- a list of host+port
 
- 
extractDataFromHbpublic static List<Map<String,Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, boolean includeSys, StormTopology topology) extracts a list of executor data from heart beats.
- 
extractDataFromHbpublic static List<Map<String,Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, boolean includeSys, StormTopology topology, String compId) extracts a list of executor data from heart beats.
- 
computeBoltCapacitycomputes max bolt capacity.- Parameters:
- executorSumms- a list of ExecutorSummary
- Returns:
- max bolt capacity
 
- 
computeExecutorCapacityCompute the capacity of a executor. approximation of the % of time spent doing real work.- Parameters:
- summary- the stats for the executor.
- Returns:
- the capacity of the executor.
 
- 
getFilledStatsfilter ExecutorSummary whose stats is null.- Parameters:
- summs- a list of ExecutorSummary
- Returns:
- filtered summs
 
- 
thriftifyRpcWorkerHbUsed for local test.
- 
thriftifyExecutorStatsConvert Executor stats to thrift data structure.- Parameters:
- stats- the stats in the form of a map.
- Returns:
- teh thrift structure for the stats.
 
- 
componentTypeGet the coponenet type for a give id.- Parameters:
- topology- the topology this is a part of.
- compId- the id of the component.
- Returns:
- the type as a String "BOLT" or "SPOUT".
 
- 
floatStrConvert a float to a string for display.- Parameters:
- n- the value to format.
- Returns:
- the string ready for display.
 
- 
errorSubset
- 
windowSetConverter
 
-