Package org.apache.storm.cluster
Class StormClusterStateImpl
java.lang.Object
org.apache.storm.cluster.StormClusterStateImpl
- All Implemented Interfaces:
- IStormClusterState
- 
Constructor SummaryConstructorsConstructorDescriptionStormClusterStateImpl(IStateStorage stateStorage, ILocalAssignmentsBackend assignmentsassignmentsBackend, ClusterStateContext context, boolean shouldCloseStateStorageOnDisconnect) 
- 
Method SummaryModifier and TypeMethodDescriptionvoidvoidaddNimbusHost(String nimbusId, NimbusSummary nimbusSummary) voidaddPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion, PrivateWorkerKey key) Store a new version of a private key.assignmentInfo(String stormId, Runnable callback) Get the assignment based on storm id from local backend.assignmentInfoWithVersion(String stormId, Runnable callback) assignments(Runnable callback) Get all the topologies assignments mapping stormId -> Assignment from local backend.assignmentVersion(String stormId, Runnable callback) Get backpressure topologies.blobstoreInfo(String blobKey) credentials(String stormId, Runnable callback) voiddeleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest) voidneed to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock overrides all the timestamps.Get leader info from state store, which was written when a master gains leadership.longgetNextPrivateWorkerKeyVersion(WorkerTokenServiceType type, String topologyId) Get the next key version number that should be used for this topology id.getPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion) Get a private key used to validate a token is correct.getTopologyProfileRequests(String stormId) getWorkerHeartbeat(String stormId, String node, Long port) getWorkerProfileRequests(String stormId, NodeInfo nodeInfo) Get a list of all topologyIds that currently have private worker keys stored, of any kind.booleanFlag to indicate if the assignments synced successfully, seeIStormClusterState.syncRemoteAssignments(Map).booleanFlag to indicate if the Pacameker is backend store.protected voidprotected voidissueMapCallback(ConcurrentHashMap<String, Runnable> callbackConcurrentHashMap, String key) nimbuses()remoteAssignmentInfo(String stormId, Runnable callback) Get the assignment based on storm id from remote state store, eg: ZK.voidremoveAllPrivateWorkerKeys(String topologyId) Remove all of the worker keys for a given topology.voidremoveBackpressure(String stormId) Remove backpressure.voidremoveBlobstoreKey(String blobKey) voidremoveExpiredPrivateWorkerKeys(String topologyId) Remove all keys for the given topology that have expired.voidremoveKeyVersion(String blobKey) voidremoveStorm(String stormId) voidremoveStormBase(String stormId) voidremoveWorkerBackpressure(String stormId, String node, Long port) Remove worker backpressure.voidremoveWorkerHeartbeat(String stormId, String node, Long port) voidvoidsetAssignment(String stormId, Assignment info, Map<String, Object> topoConf) voidMark the assignments as synced successfully, seeIStormClusterState.isAssignmentsBackendSynchronized().voidsetCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) voidvoidsetupBackpressure(String stormId, Map<String, Object> topoConf) Setup backpressure.voidsetupBlob(String key, NimbusInfo nimbusInfo, Integer versionInfo) voidsetupErrors(String stormId, Map<String, Object> topoConf) voidsetupHeatbeats(String stormId, Map<String, Object> topoConf) voidsetWorkerProfileRequest(String stormId, ProfileRequest profileRequest) Get a storm base for a topology.Get storm id from passed name, null if the name doesn't exist on cluster.voidsupervisorHeartbeat(String supervisorId, SupervisorInfo info) supervisorInfo(String supervisorId) supervisors(Runnable callback) voidsyncRemoteAssignments(Map<String, byte[]> remote) Sync the remote state store assignments to local backend, used when master gains leadership, seeorg.apache.storm.nimbus.LeaderListenerCallback.voidsyncRemoteIds(Map<String, String> remote) Sync all the active storm ids of the cluster, used now when master gains leadership.voidteardownHeartbeats(String stormId) voidteardownTopologyErrors(String stormId) booleantopologyBackpressure(String stormId, long timeoutMs, Runnable callback) Check whether a topology is in throttle-on status or not: if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.topologyLogConfig(String stormId, Runnable cb) voidupdateStorm(String stormId, StormBase newElems) To update this function due to APersistentMap/APersistentSet is clojure's structure.voidworkerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) Methods inherited from class java.lang.Objectclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.apache.storm.cluster.IStormClusterStateallSupervisorInfo, allSupervisorInfo, getTopoId, topologyBases
- 
Constructor Details- 
StormClusterStateImplpublic StormClusterStateImpl(IStateStorage stateStorage, ILocalAssignmentsBackend assignmentsassignmentsBackend, ClusterStateContext context, boolean shouldCloseStateStorageOnDisconnect) throws Exception - Throws:
- Exception
 
 
- 
- 
Method Details- 
issueCallback
- 
issueMapCallbackprotected void issueMapCallback(ConcurrentHashMap<String, Runnable> callbackConcurrentHashMap, String key) 
- 
assignments- Specified by:
- assignmentsin interface- IStormClusterState
 
- 
assignmentInfoDescription copied from interface:IStormClusterStateGet the assignment based on storm id from local backend.- Specified by:
- assignmentInfoin interface- IStormClusterState
- Parameters:
- stormId- topology id
- callback- callback function
- Returns:
- Assignment
 
- 
remoteAssignmentInfoDescription copied from interface:IStormClusterStateGet the assignment based on storm id from remote state store, eg: ZK.- Specified by:
- remoteAssignmentInfoin interface- IStormClusterState
- Parameters:
- stormId- topology id
- callback- callback function
- Returns:
- Assignment
 
- 
assignmentsInfoDescription copied from interface:IStormClusterStateGet all the topologies assignments mapping stormId -> Assignment from local backend.- Specified by:
- assignmentsInfoin interface- IStormClusterState
- Returns:
- stormId -> Assignment mapping
 
- 
syncRemoteAssignmentsDescription copied from interface:IStormClusterStateSync the remote state store assignments to local backend, used when master gains leadership, seeorg.apache.storm.nimbus.LeaderListenerCallback.- Specified by:
- syncRemoteAssignmentsin interface- IStormClusterState
- Parameters:
- remote- assigned assignments for a specific- IStormClusterStateinstance, usually a supervisor/node.
 
- 
isAssignmentsBackendSynchronizedpublic boolean isAssignmentsBackendSynchronized()Description copied from interface:IStormClusterStateFlag to indicate if the assignments synced successfully, seeIStormClusterState.syncRemoteAssignments(Map).- Specified by:
- isAssignmentsBackendSynchronizedin interface- IStormClusterState
- Returns:
- true if is synced successfully
 
- 
isPacemakerStateStorepublic boolean isPacemakerStateStore()Description copied from interface:IStormClusterStateFlag to indicate if the Pacameker is backend store.- Specified by:
- isPacemakerStateStorein interface- IStormClusterState
- Returns:
- true if Pacemaker is being used as StateStore
 
- 
setAssignmentsBackendSynchronizedpublic void setAssignmentsBackendSynchronized()Description copied from interface:IStormClusterStateMark the assignments as synced successfully, seeIStormClusterState.isAssignmentsBackendSynchronized().- Specified by:
- setAssignmentsBackendSynchronizedin interface- IStormClusterState
 
- 
assignmentInfoWithVersion- Specified by:
- assignmentInfoWithVersionin interface- IStormClusterState
 
- 
assignmentVersion- Specified by:
- assignmentVersionin interface- IStormClusterState
- Throws:
- Exception
 
- 
blobstoreInfo- Specified by:
- blobstoreInfoin interface- IStormClusterState
 
- 
nimbuses- Specified by:
- nimbusesin interface- IStormClusterState
 
- 
addNimbusHost- Specified by:
- addNimbusHostin interface- IStormClusterState
 
- 
activeStorms- Specified by:
- activeStormsin interface- IStormClusterState
 
- 
stormBaseDescription copied from interface:IStormClusterStateGet a storm base for a topology.- Specified by:
- stormBasein interface- IStormClusterState
- Parameters:
- stormId- the id of the topology
- callback- something to call if the data changes (best effort)
- Returns:
- the StormBase or null if it is not alive.
 
- 
stormIdDescription copied from interface:IStormClusterStateGet storm id from passed name, null if the name doesn't exist on cluster.- Specified by:
- stormIdin interface- IStormClusterState
- Parameters:
- stormName- storm name
- Returns:
- storm id
 
- 
syncRemoteIdsDescription copied from interface:IStormClusterStateSync all the active storm ids of the cluster, used now when master gains leadership.- Specified by:
- syncRemoteIdsin interface- IStormClusterState
- Parameters:
- remote- stormName -> stormId mapping
 
- 
getWorkerHeartbeat- Specified by:
- getWorkerHeartbeatin interface- IStormClusterState
 
- 
getWorkerProfileRequests- Specified by:
- getWorkerProfileRequestsin interface- IStormClusterState
 
- 
getTopologyProfileRequests- Specified by:
- getTopologyProfileRequestsin interface- IStormClusterState
 
- 
setWorkerProfileRequest- Specified by:
- setWorkerProfileRequestin interface- IStormClusterState
 
- 
deleteTopologyProfileRequests- Specified by:
- deleteTopologyProfileRequestsin interface- IStormClusterState
 
- 
executorBeatspublic Map<ExecutorInfo,ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, we avoid situations like that.- Specified by:
- executorBeatsin interface- IStormClusterState
- Parameters:
- stormId- topology id
- executorNodePort- executor id -> node + port
- Returns:
- mapping of executorInfo -> executor beat
 
- 
supervisors- Specified by:
- supervisorsin interface- IStormClusterState
 
- 
supervisorInfo- Specified by:
- supervisorInfoin interface- IStormClusterState
 
- 
setupHeatbeats- Specified by:
- setupHeatbeatsin interface- IStormClusterState
 
- 
teardownHeartbeats- Specified by:
- teardownHeartbeatsin interface- IStormClusterState
 
- 
teardownTopologyErrors- Specified by:
- teardownTopologyErrorsin interface- IStormClusterState
 
- 
getLeaderDescription copied from interface:IStormClusterStateGet leader info from state store, which was written when a master gains leadership.Caution: it can not be used for fencing and is only for informational purposes because we use ZK as our backend now, which could have a overdue info of nodes. - Specified by:
- getLeaderin interface- IStormClusterState
- Parameters:
- callback- callback func
- Returns:
- NimbusInfo
 
- 
backpressureTopologiesDescription copied from interface:IStormClusterStateGet backpressure topologies. Note: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.- Specified by:
- backpressureTopologiesin interface- IStormClusterState
 
- 
heartbeatStorms- Specified by:
- heartbeatStormsin interface- IStormClusterState
 
- 
errorTopologies- Specified by:
- errorTopologiesin interface- IStormClusterState
 
- 
setTopologyLogConfig- Specified by:
- setTopologyLogConfigin interface- IStormClusterState
 
- 
topologyLogConfig- Specified by:
- topologyLogConfigin interface- IStormClusterState
 
- 
workerHeartbeat- Specified by:
- workerHeartbeatin interface- IStormClusterState
 
- 
removeWorkerHeartbeat- Specified by:
- removeWorkerHeartbeatin interface- IStormClusterState
 
- 
supervisorHeartbeat- Specified by:
- supervisorHeartbeatin interface- IStormClusterState
 
- 
topologyBackpressureCheck whether a topology is in throttle-on status or not: if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off. But if the backpresure/storm-id dir is not empty and has not been updated for more than timeoutMs, we treat it as throttle-off. This will prevent the spouts from getting stuck indefinitely if something wrong happens.- Specified by:
- topologyBackpressurein interface- IStormClusterState
- Parameters:
- stormId- The topology Id
- timeoutMs- How long until the backpressure znode is invalid.
- callback- The callback function
- Returns:
- True is backpresure/storm-id dir is not empty and at least one of the backpressure znodes has not timed out; false otherwise.
 
- 
setupBackpressureDescription copied from interface:IStormClusterStateSetup backpressure. Note: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.- Specified by:
- setupBackpressurein interface- IStormClusterState
 
- 
removeBackpressureDescription copied from interface:IStormClusterStateRemove backpressure. Note: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.- Specified by:
- removeBackpressurein interface- IStormClusterState
 
- 
removeWorkerBackpressureDescription copied from interface:IStormClusterStateRemove worker backpressure. Note: In Storm 2.0. Retained for enabling transition from 1.x. Will be removed soon.- Specified by:
- removeWorkerBackpressurein interface- IStormClusterState
 
- 
activateStorm- Specified by:
- activateStormin interface- IStormClusterState
 
- 
updateStormTo update this function due to APersistentMap/APersistentSet is clojure's structure.- Specified by:
- updateStormin interface- IStormClusterState
 
- 
removeStormBase- Specified by:
- removeStormBasein interface- IStormClusterState
 
- 
setAssignment- Specified by:
- setAssignmentin interface- IStormClusterState
 
- 
setupBlob- Specified by:
- setupBlobin interface- IStormClusterState
 
- 
activeKeys- Specified by:
- activeKeysin interface- IStormClusterState
 
- 
blobstore- Specified by:
- blobstorein interface- IStormClusterState
 
- 
removeStorm- Specified by:
- removeStormin interface- IStormClusterState
 
- 
removeBlobstoreKey- Specified by:
- removeBlobstoreKeyin interface- IStormClusterState
 
- 
removeKeyVersion- Specified by:
- removeKeyVersionin interface- IStormClusterState
 
- 
setupErrors- Specified by:
- setupErrorsin interface- IStormClusterState
 
- 
reportErrorpublic void reportError(String stormId, String componentId, String node, Long port, Throwable error) - Specified by:
- reportErrorin interface- IStormClusterState
 
- 
errors- Specified by:
- errorsin interface- IStormClusterState
 
- 
lastError- Specified by:
- lastErrorin interface- IStormClusterState
 
- 
setCredentials- Specified by:
- setCredentialsin interface- IStormClusterState
 
- 
credentials- Specified by:
- credentialsin interface- IStormClusterState
 
- 
disconnectpublic void disconnect()- Specified by:
- disconnectin interface- IStormClusterState
 
- 
getPrivateWorkerKeypublic PrivateWorkerKey getPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion) Description copied from interface:IStormClusterStateGet a private key used to validate a token is correct. This is expected to be called from a privileged daemon, and the ACLs should be set up to only allow nimbus and these privileged daemons access to these private keys.- Specified by:
- getPrivateWorkerKeyin interface- IStormClusterState
- Parameters:
- type- the type of service the key is for.
- topologyId- the topology id the key is for.
- keyVersion- the version of the key this is for.
- Returns:
- the private key or null if it could not be found.
 
- 
addPrivateWorkerKeypublic void addPrivateWorkerKey(WorkerTokenServiceType type, String topologyId, long keyVersion, PrivateWorkerKey key) Description copied from interface:IStormClusterStateStore a new version of a private key. This is expected to only ever be called from nimbus. All ACLs however need to be setup to allow the given services access to the stored information.- Specified by:
- addPrivateWorkerKeyin interface- IStormClusterState
- Parameters:
- type- the type of service this key is for.
- topologyId- the topology this key is for
- keyVersion- the version of the key this is for.
- key- the key to store.
 
- 
getNextPrivateWorkerKeyVersionDescription copied from interface:IStormClusterStateGet the next key version number that should be used for this topology id. This is expected to only ever be called from nimbus, but it is acceptable if the ACLs are setup so that it can work from a privileged daemon for the given service.- Specified by:
- getNextPrivateWorkerKeyVersionin interface- IStormClusterState
- Parameters:
- type- the type of service this is for.
- topologyId- the topology id this is for.
- Returns:
- the next version number. It should be 0 for a new topology id/service combination.
 
- 
removeExpiredPrivateWorkerKeysDescription copied from interface:IStormClusterStateRemove all keys for the given topology that have expired. The number of keys should be small enough that doing an exhaustive scan of them all is acceptable as there is no guarantee that expiration time and version number are related. This should be for all service types. This is expected to only ever be called from nimbus and some ACLs may be setup so being called from other daemons will cause it to fail.- Specified by:
- removeExpiredPrivateWorkerKeysin interface- IStormClusterState
- Parameters:
- topologyId- the id of the topology to scan.
 
- 
removeAllPrivateWorkerKeysDescription copied from interface:IStormClusterStateRemove all of the worker keys for a given topology. Used to clean up after a topology finishes. This is expected to only ever be called from nimbus and ideally should only ever work from nimbus.- Specified by:
- removeAllPrivateWorkerKeysin interface- IStormClusterState
- Parameters:
- topologyId- the topology to clean up after.
 
- 
idsOfTopologiesWithPrivateWorkerKeysDescription copied from interface:IStormClusterStateGet a list of all topologyIds that currently have private worker keys stored, of any kind. This is expected to only ever be called from nimbus.- Specified by:
- idsOfTopologiesWithPrivateWorkerKeysin interface- IStormClusterState
- Returns:
- the list of topology ids with any kind of private worker key stored.
 
 
-